You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/09/28 18:15:08 UTC

[12/12] flink git commit: [FLINK-2677] Add a general-purpose keyed-window operator

[FLINK-2677] Add a general-purpose keyed-window operator


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dd51c977
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dd51c977
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dd51c977

Branch: refs/heads/master
Commit: dd51c97741b336d3a11e319183537eef864a84fd
Parents: 3be2dc1
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Sep 25 12:27:35 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Sep 28 17:09:25 2015 +0200

----------------------------------------------------------------------
 .../api/datastream/KeyedDataStream.java         |  19 +
 .../KeyedTriggerWindowDataStream.java           | 255 +++++++++++
 .../windowing/ReduceWindowFunction.java         |  70 +++
 .../ReduceWindowFunctionWithWindow.java         |  71 +++
 .../windowing/RichKeyedWindowFunction.java      |  25 +
 .../flink/streaming/api/graph/StreamGraph.java  |   6 +
 .../api/windowing/assigners/GlobalWindows.java  |  55 +++
 .../assigners/SlidingProcessingTimeWindows.java |  92 ++++
 .../windowing/assigners/SlidingTimeWindows.java |  81 ++++
 .../TumblingProcessingTimeWindows.java          |  67 +++
 .../assigners/TumblingTimeWindows.java          |  67 +++
 .../api/windowing/assigners/WindowAssigner.java |  32 ++
 .../api/windowing/evictors/CountEvictor.java    |  44 ++
 .../api/windowing/evictors/DeltaEvictor.java    |  58 +++
 .../api/windowing/evictors/Evictor.java         |  28 ++
 .../api/windowing/evictors/TimeEvictor.java     |  54 +++
 .../ContinuousProcessingTimeTrigger.java        |  79 ++++
 .../triggers/ContinuousWatermarkTrigger.java    |  65 +++
 .../api/windowing/triggers/CountTrigger.java    |  61 +++
 .../api/windowing/triggers/DeltaTrigger.java    |  66 +++
 .../triggers/ProcessingTimeTrigger.java         |  57 +++
 .../api/windowing/triggers/PurgingTrigger.java  |  76 +++
 .../api/windowing/triggers/Trigger.java         |  40 ++
 .../windowing/triggers/WatermarkTrigger.java    |  57 +++
 .../api/windowing/windows/GlobalWindow.java     |  65 +++
 .../api/windowing/windows/TimeWindow.java       |  75 +++
 .../streaming/api/windowing/windows/Window.java |  27 ++
 .../operators/BucketStreamSortOperator.java     |  93 ++++
 .../windowing/EvictingWindowOperator.java       | 115 +++++
 .../operators/windowing/PolicyToOperator.java   | 166 ++++++-
 .../operators/windowing/WindowOperator.java     | 320 +++++++++++++
 .../windowing/buffers/EvictingWindowBuffer.java |  22 +
 .../windowing/buffers/HeapWindowBuffer.java     |  88 ++++
 .../buffers/PreAggregatingHeapWindowBuffer.java |  91 ++++
 .../windowing/buffers/WindowBuffer.java         |  34 ++
 .../windowing/buffers/WindowBufferFactory.java  |  30 ++
 .../windowing/EvictingWindowOperatorTest.java   | 179 ++++++++
 .../windowing/PolicyWindowTranslationTest.java  | 216 +++++++++
 .../windowing/TriggerWindowTranslationTest.java | 201 ++++++++
 .../operators/windowing/WindowOperatorTest.java | 459 +++++++++++++++++++
 40 files changed, 3701 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
index 611953e..ce105e5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
@@ -23,7 +23,9 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy;
+import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.partitioner.HashPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 
@@ -115,4 +117,21 @@ public class KeyedDataStream<T, KEY> extends DataStream<T> {
 	public KeyedWindowDataStream<T, KEY> window(WindowPolicy window, WindowPolicy slide) {
 		return new KeyedWindowDataStream<T, KEY>(this, window, slide);
 	}
+
+	/**
+	 * Windows this data stream to a {@code KeyedTriggerWindowDataStream}, which evaluates windows
+	 * over a key grouped stream. Elements are put into windows by a {@link WindowAssigner}. The
+	 * grouping of elements is done both by key and by window.
+	 *
+	 * <p>
+	 * A {@link org.apache.flink.streaming.api.windowing.triggers.Trigger} can be defined to specify
+	 * when windows are evaluated. However, {@code WindowAssigners} have a default {@code Trigger}
+	 * that is used if a {@code Trigger} is not specified.
+	 *
+	 * @param assigner The {@code WindowAssigner} that assigns elements to windows.
+	 * @return The trigger windows data stream.
+	 */
+	public <W extends Window> KeyedTriggerWindowDataStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
+		return new KeyedTriggerWindowDataStream<T, KEY, W>(this, assigner);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedTriggerWindowDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedTriggerWindowDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedTriggerWindowDataStream.java
new file mode 100644
index 0000000..5b39775
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedTriggerWindowDataStream.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.evictors.Evictor;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windowpolicy.ProcessingTime;
+import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.PolicyToOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A {@code KeyedTriggerWindowDataStream} represents a data stream where elements are grouped by
+ * key, and for each key, the stream of elements is split into windows based on a
+ * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. Window emission
+ * is triggered based on a {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.
+ *
+ * <p>
+ * The windows are conceptually evaluated for each key individually, meaning windows can trigger at
+ * different points for each key.
+ *
+ * <p>
+ * If an {@link Evictor} is specified it will be used to evict elements from the window after
+ * evaluation was triggered by the {@code Trigger} but before the actual evaluation of the window.
+ * When using an evictor window performance will degrade significantly, since
+ * pre-aggregation of window results cannot be used.
+ *
+ * <p>
+ * Note that the {@code KeyedTriggerWindowDataStream} is purely and API construct, during runtime
+ * the {@code KeyedTriggerWindowDataStream} will be collapsed together with the
+ * {@code KeyedDataStream} and the operation over the window into one single operation.
+ * 
+ * @param <T> The type of elements in the stream.
+ * @param <K> The type of the key by which elements are grouped.
+ * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns the elements to.
+ */
+public class KeyedTriggerWindowDataStream<T, K, W extends Window> {
+
+	/** The keyed data stream that is windowed by this stream */
+	private final KeyedDataStream<T, K> input;
+
+	/** The window assigner */
+	private final WindowAssigner<? super T, W> windowAssigner;
+
+	/** The trigger that is used for window evaluation/emission. */
+	private Trigger<? super T, ? super W> trigger;
+
+	/** The evictor that is used for evicting elements before window evaluation. */
+	private Evictor<? super T, ? super W> evictor;
+
+
+	public KeyedTriggerWindowDataStream(KeyedDataStream<T, K> input, WindowAssigner<? super T, W> windowAssigner) {
+		this.input = input;
+		this.windowAssigner = windowAssigner;
+		this.trigger = windowAssigner.getDefaultTrigger();
+	}
+
+	/**
+	 * Sets the {@code Trigger} that should be used to trigger window emission.
+	 */
+	public KeyedTriggerWindowDataStream<T, K, W> trigger(Trigger<? super T, ? super W> trigger) {
+		this.trigger = trigger;
+		return this;
+	}
+
+	/**
+	 * Sets the {@code Evictor} that should be used to evict elements from a window before emission.
+	 *
+	 * <p>
+	 * Note: When using an evictor window performance will degrade significantly, since
+	 * pre-aggregation of window results cannot be used.
+	 */
+	public KeyedTriggerWindowDataStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor) {
+		this.evictor = evictor;
+		return this;
+	}
+
+
+	// ------------------------------------------------------------------------
+	//  Operations on the keyed windows
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Applies a reduce function to the window. The window function is called for each evaluation
+	 * of the window for each key individually. The output of the reduce function is interpreted
+	 * as a regular non-windowed stream.
+	 * <p>
+	 * This window will try and pre-aggregate data as much as the window policies permit. For example,
+	 * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
+	 * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide interval,
+	 * so a few elements are stored per key (one per slide interval).
+	 * Custom windows may not be able to pre-aggregate, or may need to store extra values in an
+	 * aggregation tree.
+	 * 
+	 * @param function The reduce function.
+	 * @return The data stream that is the result of applying the reduce function to the window. 
+	 */
+	public DataStream<T> reduceWindow(ReduceFunction<T> function) {
+		String callLocation = Utils.getCallLocationName();
+		String udfName = "Reduce at " + callLocation;
+
+		DataStream<T> result = createFastTimeOperatorIfValid(function, input.getType(), udfName);
+		if (result != null) {
+			return result;
+		}
+
+		String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
+		KeySelector<T, K> keySel = input.getKeySelector();
+
+		OneInputStreamOperator<T, T> operator;
+
+		if (evictor != null) {
+			operator = new EvictingWindowOperator<>(windowAssigner,
+					keySel,
+					new HeapWindowBuffer.Factory<T>(),
+					new ReduceWindowFunction<K, W, T>(function),
+					trigger,
+					evictor);
+
+		} else {
+			// we need to copy because we need our own instance of the pre aggregator
+			@SuppressWarnings("unchecked")
+			ReduceFunction<T> functionCopy = (ReduceFunction<T>) SerializationUtils.clone(function);
+
+			operator = new WindowOperator<>(windowAssigner,
+					keySel,
+					new PreAggregatingHeapWindowBuffer.Factory<>(functionCopy),
+					new ReduceWindowFunction<K, W, T>(function),
+					trigger);
+		}
+
+		return input.transform(opName, input.getType(), operator);
+	}
+
+	/**
+	 * Applies a window function to the window. The window function is called for each evaluation
+	 * of the window for each key individually. The output of the window function is interpreted
+	 * as a regular non-windowed stream.
+	 * <p>
+	 * Not that this function requires that all data in the windows is buffered until the window
+	 * is evaluated, as the function provides no means of pre-aggregation.
+	 * 
+	 * @param function The window function.
+	 * @return The data stream that is the result of applying the window function to the window.
+	 */
+	public <R> DataStream<R> mapWindow(KeyedWindowFunction<T, R, K, W> function) {
+		TypeInformation<T> inType = input.getType();
+		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
+				function, KeyedWindowFunction.class, true, true, inType, null, false);
+
+		String callLocation = Utils.getCallLocationName();
+		String udfName = "MapWindow at " + callLocation;
+
+		DataStream<R> result = createFastTimeOperatorIfValid(function, resultType, udfName);
+		if (result != null) {
+			return result;
+		}
+
+
+		String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
+		KeySelector<T, K> keySel = input.getKeySelector();
+
+		OneInputStreamOperator<T, R> operator;
+
+		if (evictor != null) {
+			operator = new EvictingWindowOperator<>(windowAssigner,
+					keySel,
+					new HeapWindowBuffer.Factory<T>(),
+					function,
+					trigger,
+					evictor);
+
+		} else {
+			operator = new WindowOperator<>(windowAssigner,
+					keySel,
+					new HeapWindowBuffer.Factory<T>(),
+					function,
+					trigger);
+		}
+
+
+
+		return input.transform(opName, resultType, operator);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	private <R> DataStream<R> createFastTimeOperatorIfValid(
+			Function function,
+			TypeInformation<R> resultType,
+			String functionName) {
+
+		WindowPolicy windowPolicy = null;
+		WindowPolicy slidePolicy = null;
+
+		if (windowAssigner instanceof SlidingProcessingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
+			SlidingProcessingTimeWindows timeWindows = (SlidingProcessingTimeWindows) windowAssigner;
+			windowPolicy = ProcessingTime.of(timeWindows.getSize(), TimeUnit.MILLISECONDS);
+			slidePolicy = ProcessingTime.of(timeWindows.getSlide(), TimeUnit.MILLISECONDS);
+		} else if (windowAssigner instanceof TumblingProcessingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
+			TumblingProcessingTimeWindows timeWindows = (TumblingProcessingTimeWindows) windowAssigner;
+			windowPolicy = ProcessingTime.of(timeWindows.getSize(), TimeUnit.MILLISECONDS);
+		}
+
+		if (windowPolicy == null) {
+			return null;
+		}
+
+		String opName = windowPolicy.toString(slidePolicy) + " of " + functionName;
+		KeySelector<T, K> keySel = input.getKeySelector();
+
+		OneInputStreamOperator<T, R> operator =
+				PolicyToOperator.createOperatorForPolicies(windowPolicy, slidePolicy, function, keySel);
+
+		return input.transform(opName, resultType, operator);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
new file mode 100644
index 0000000..1c9578a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+public class ReduceWindowFunction<K, W extends Window, T> extends RichKeyedWindowFunction<T, T, K, W> {
+	private static final long serialVersionUID = 1L;
+
+	private final ReduceFunction<T> reduceFunction;
+
+	public ReduceWindowFunction(ReduceFunction<T> reduceFunction) {
+		this.reduceFunction = reduceFunction;
+	}
+
+	@Override
+	public void setRuntimeContext(RuntimeContext ctx) {
+		super.setRuntimeContext(ctx);
+		FunctionUtils.setFunctionRuntimeContext(reduceFunction, ctx);
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		FunctionUtils.openFunction(reduceFunction, parameters);
+	}
+
+	@Override
+	public void close() throws Exception {
+		super.close();
+		FunctionUtils.closeFunction(reduceFunction);
+	}
+
+	@Override
+	public void evaluate(K k, W window, Iterable<T> values, Collector<T> out) throws Exception {
+		T result = null;
+
+		for (T v: values) {
+			if (result == null) {
+				result = v;
+			} else {
+				result = reduceFunction.reduce(result, v);
+			}
+		}
+
+		if (result != null) {
+			out.collect(result);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java
new file mode 100644
index 0000000..bceff82
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+public class ReduceWindowFunctionWithWindow<K, W extends Window, T> extends RichKeyedWindowFunction<T, Tuple2<W, T>, K, W> {
+	private static final long serialVersionUID = 1L;
+
+	private final ReduceFunction<T> reduceFunction;
+
+	public ReduceWindowFunctionWithWindow(ReduceFunction<T> reduceFunction) {
+		this.reduceFunction = reduceFunction;
+	}
+
+	@Override
+	public void setRuntimeContext(RuntimeContext ctx) {
+		super.setRuntimeContext(ctx);
+		FunctionUtils.setFunctionRuntimeContext(reduceFunction, ctx);
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		FunctionUtils.openFunction(reduceFunction, parameters);
+	}
+
+	@Override
+	public void close() throws Exception {
+		super.close();
+		FunctionUtils.closeFunction(reduceFunction);
+	}
+
+	@Override
+	public void evaluate(K k, W window, Iterable<T> values, Collector<Tuple2<W, T>> out) throws Exception {
+		T result = null;
+
+		for (T v: values) {
+			if (result == null) {
+				result = v;
+			} else {
+				result = reduceFunction.reduce(result, v);
+			}
+		}
+
+		if (result != null) {
+			out.collect(Tuple2.of(window, result));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichKeyedWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichKeyedWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichKeyedWindowFunction.java
new file mode 100644
index 0000000..90ccb40
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichKeyedWindowFunction.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+public abstract class RichKeyedWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction implements KeyedWindowFunction<IN, OUT, KEY, W> {
+	private static final long serialVersionUID = 1L;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index cda5686..cfa6d93 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -38,6 +38,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
 import org.apache.flink.api.java.typeutils.MissingTypeInfo;
 import org.apache.flink.optimizer.plan.StreamingPlan;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -217,6 +218,11 @@ public class StreamGraph extends StreamingPlan {
 			outputTypeConfigurable.setOutputType(outTypeInfo, executionConfig);
 		}
 
+		if (operatorObject instanceof InputTypeConfigurable) {
+			InputTypeConfigurable inputTypeConfigurable = (InputTypeConfigurable) operatorObject;
+			inputTypeConfigurable.setInputType(inTypeInfo, executionConfig);
+		}
+
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("Vertex: {}", vertexID);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
new file mode 100644
index 0000000..391a6a4
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
+	private static final long serialVersionUID = 1L;
+
+	private GlobalWindows() {}
+
+	@Override
+	public Collection<GlobalWindow> assignWindows(Object element, long timestamp) {
+		return Collections.singletonList(GlobalWindow.get());
+	}
+
+	@Override
+	public Trigger<Object, GlobalWindow> getDefaultTrigger() {
+		return null;
+	}
+
+	@Override
+	public String toString() {
+		return "GlobalWindows()";
+	}
+
+	/**
+	 * Creates a new {@code GlobalWindows} {@link WindowAssigner} that assigns
+	 * all elements to the same {@link GlobalWindow}.
+	 *
+	 * @return The global window policy.
+	 */
+	public static GlobalWindows create() {
+		return new GlobalWindows();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
new file mode 100644
index 0000000..a2d95c2
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Collection;
+import java.util.List;
+
+public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
+	private static final long serialVersionUID = 1L;
+
+	private final long size;
+
+	private final long slide;
+
+	private transient List<TimeWindow> result;
+
+	private SlidingProcessingTimeWindows(long size, long slide) {
+		this.size = size;
+		this.slide = slide;
+		this.result = Lists.newArrayListWithCapacity((int) (size / slide));
+	}
+
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+		in.defaultReadObject();
+		this.result = Lists.newArrayListWithCapacity((int) (size / slide));
+	}
+
+	@Override
+	public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
+		result.clear();
+		long time = System.currentTimeMillis();
+		long lastStart = time - time % slide;
+		for (long start = lastStart;
+			start > time - size;
+			start -= slide) {
+			result.add(new TimeWindow(start, size));
+		}
+		return result;
+	}
+
+	public long getSize() {
+		return size;
+	}
+
+	public long getSlide() {
+		return slide;
+	}
+
+	@Override
+	public Trigger<Object, TimeWindow> getDefaultTrigger() {
+		return ProcessingTimeTrigger.create();
+	}
+
+	@Override
+	public String toString() {
+		return "SlidingProcessingTimeWindows(" + size + ", " + slide + ")";
+	}
+
+	/**
+	 * Creates a new {@code SlidingProcessingTimeWindows} {@link WindowAssigner} that assigns
+	 * elements to sliding time windows based on the current processing time.
+	 *
+	 * @param size The size of the generated windows.
+	 * @param slide The slide interval of the generated windows.
+	 * @return The time policy.
+	 */
+	public static SlidingProcessingTimeWindows of(long size, long slide) {
+		return new SlidingProcessingTimeWindows(size, slide);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
new file mode 100644
index 0000000..cb5a7a1
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+public class SlidingTimeWindows extends WindowAssigner<Object, TimeWindow> {
+	private static final long serialVersionUID = 1L;
+
+	private final long size;
+
+	private final long slide;
+
+	private SlidingTimeWindows(long size, long slide) {
+		this.size = size;
+		this.slide = slide;
+	}
+
+	@Override
+	public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
+		List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
+		long lastStart = timestamp - timestamp % slide;
+		for (long start = lastStart;
+			start > timestamp - size;
+			start -= slide) {
+			windows.add(new TimeWindow(start, size));
+		}
+		return windows;
+	}
+
+	public long getSize() {
+		return size;
+	}
+
+	public long getSlide() {
+		return slide;
+	}
+
+	@Override
+	public Trigger<Object, TimeWindow> getDefaultTrigger() {
+		return WatermarkTrigger.create();
+	}
+
+	@Override
+	public String toString() {
+		return "SlidingTimeWindows(" + size + ", " + slide + ")";
+	}
+
+	/**
+	 * Creates a new {@code SlidingTimeWindows} {@link WindowAssigner} that assigns
+	 * elements to sliding time windows based on the element timestamp.
+	 *
+	 * @param size The size of the generated windows.
+	 * @param slide The slide interval of the generated windows.
+	 * @return The time policy.
+	 */
+	public static SlidingTimeWindows of(long size, long slide) {
+		return new SlidingTimeWindows(size, slide);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
new file mode 100644
index 0000000..b1ef857
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
+	private static final long serialVersionUID = 1L;
+
+	private long size;
+
+	private TumblingProcessingTimeWindows(long size) {
+		this.size = size;
+	}
+
+	@Override
+	public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
+		long time = System.currentTimeMillis();
+		long start = time - (time % size);
+		return Collections.singletonList(new TimeWindow(start, size));
+	}
+
+	public long getSize() {
+		return size;
+	}
+
+	@Override
+	public Trigger<Object, TimeWindow> getDefaultTrigger() {
+		return ProcessingTimeTrigger.create();
+	}
+
+	@Override
+	public String toString() {
+		return "TumblingProcessingTimeWindows(" + size + ")";
+	}
+
+	/**
+	 * Creates a new {@code TumblingProcessingTimeWindows} {@link WindowAssigner} that assigns
+	 * elements to time windows based on the current processing time.
+	 *
+	 * @param size The size of the generated windows.
+	 * @return The time policy.
+	 */
+	public static TumblingProcessingTimeWindows of(long size) {
+		return new TumblingProcessingTimeWindows(size);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
new file mode 100644
index 0000000..d19c97d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+public class TumblingTimeWindows extends WindowAssigner<Object, TimeWindow> {
+	private static final long serialVersionUID = 1L;
+
+	private long size;
+
+	private TumblingTimeWindows(long size) {
+		this.size = size;
+	}
+
+	@Override
+	public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
+		long start = timestamp - (timestamp % size);
+		return Collections.singletonList(new TimeWindow(start, size));
+	}
+
+	public long getSize() {
+		return size;
+	}
+
+	@Override
+	public Trigger<Object, TimeWindow> getDefaultTrigger() {
+		return WatermarkTrigger.create();
+	}
+
+	@Override
+	public String toString() {
+		return "TumblingTimeWindows(" + size + ")";
+	}
+
+	/**
+	 * Creates a new {@code TumblingTimeWindows} {@link WindowAssigner} that assigns
+	 * elements to time windows based on the element timestamp.
+	 *
+	 * @param size The size of the generated windows.
+	 * @return The time policy.
+	 */
+	public static TumblingTimeWindows of(long size) {
+		return new TumblingTimeWindows(size);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
new file mode 100644
index 0000000..5996426
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import scala.Serializable;
+
+import java.util.Collection;
+
+public abstract class WindowAssigner<T, W extends Window> implements Serializable {
+	private static final long serialVersionUID = 1L;
+
+	public abstract Collection<W> assignWindows(T element, long timestamp);
+
+	public abstract Trigger<T, W> getDefaultTrigger();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java
new file mode 100644
index 0000000..04636ee
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.evictors;
+
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+public class CountEvictor<W extends Window> implements Evictor<Object, W> {
+	private static final long serialVersionUID = 1L;
+
+	private final long maxCount;
+
+	private CountEvictor(long count) {
+		this.maxCount = count;
+	}
+
+	@Override
+	public int evict(Iterable<StreamRecord<Object>> elements, int size, W window) {
+		if (size > maxCount) {
+			return (int) (size - maxCount);
+		} else {
+			return 0;
+		}
+	}
+
+	public static <W extends Window> CountEvictor<W> of(long maxCount) {
+		return new CountEvictor<>(maxCount);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java
new file mode 100644
index 0000000..c7872ce
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.evictors;
+
+import com.google.common.collect.Iterables;
+import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+public class DeltaEvictor<T, W extends Window> implements Evictor<T, W> {
+	private static final long serialVersionUID = 1L;
+
+	DeltaFunction<T> deltaFunction;
+	private double threshold;
+
+	private DeltaEvictor(double threshold, DeltaFunction<T> deltaFunction) {
+		this.deltaFunction = deltaFunction;
+		this.threshold = threshold;
+	}
+
+	@Override
+	public int evict(Iterable<StreamRecord<T>> elements, int size, W window) {
+		StreamRecord<T> lastElement = Iterables.getLast(elements);
+		int toEvict = 0;
+		for (StreamRecord<T> element : elements) {
+			if (deltaFunction.getDelta(element.getValue(), lastElement.getValue()) < this.threshold) {
+				break;
+			}
+			toEvict++;
+		}
+
+		return toEvict;
+	}
+
+	@Override
+	public String toString() {
+		return "DeltaEvictor(" +  deltaFunction + ", " + threshold + ")";
+	}
+
+	public static <T, W extends Window> DeltaEvictor<T, W> of(double threshold, DeltaFunction<T> deltaFunction) {
+		return new DeltaEvictor<>(threshold, deltaFunction);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
new file mode 100644
index 0000000..db04ac4
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.evictors;
+
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import scala.Serializable;
+
+public interface Evictor<T, W extends Window> extends Serializable {
+
+	public abstract int evict(Iterable<StreamRecord<T>> elements, int size, W window);
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
new file mode 100644
index 0000000..450b132
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.evictors;
+
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+public class TimeEvictor<W extends Window> implements Evictor<Object, W> {
+	private static final long serialVersionUID = 1L;
+
+	private final long windowSize;
+
+	public TimeEvictor(long windowSize) {
+		this.windowSize = windowSize;
+	}
+
+	@Override
+	public int evict(Iterable<StreamRecord<Object>> elements, int size, W window) {
+		int toEvict = 0;
+		long currentTime = System.currentTimeMillis();
+		long evictCutoff = currentTime - windowSize;
+		for (StreamRecord<Object> record: elements) {
+			if (record.getTimestamp() > evictCutoff) {
+				break;
+			}
+			toEvict++;
+		}
+		return toEvict;
+	}
+
+	@Override
+	public String toString() {
+		return "TimeEvictor(" + windowSize + ")";
+	}
+
+	public static <W extends Window> TimeEvictor<W> of(long windowSize) {
+		return new TimeEvictor<>(windowSize);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
new file mode 100644
index 0000000..64850a2
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+public class ContinuousProcessingTimeTrigger<W extends Window> implements Trigger<Object, W> {
+	private static final long serialVersionUID = 1L;
+
+	private long granularity;
+
+	private long nextFireTimestamp = 0;
+
+	private ContinuousProcessingTimeTrigger(long granularity) {
+		this.granularity = granularity;
+	}
+
+	@Override
+	public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) {
+		long currentTime = System.currentTimeMillis();
+		if (nextFireTimestamp == 0) {
+			long start = currentTime - (currentTime % granularity);
+			nextFireTimestamp = start + granularity;
+
+			ctx.registerProcessingTimeTimer(nextFireTimestamp);
+			return TriggerResult.CONTINUE;
+		}
+		if (currentTime > nextFireTimestamp) {
+			long start = currentTime - (currentTime % granularity);
+			nextFireTimestamp = start + granularity;
+
+			ctx.registerProcessingTimeTimer(nextFireTimestamp);
+
+			return TriggerResult.FIRE;
+		}
+		return TriggerResult.CONTINUE;
+	}
+
+	@Override
+	public TriggerResult onTime(long time, TriggerContext ctx) {
+		// only fire if an element didn't already fire
+		long currentTime = System.currentTimeMillis();
+		if (currentTime > nextFireTimestamp) {
+			long start = currentTime - (currentTime % granularity);
+			nextFireTimestamp = start + granularity;
+			return TriggerResult.FIRE;
+		}
+		return TriggerResult.CONTINUE;
+	}
+
+	@Override
+	public Trigger<Object, W> duplicate() {
+		return new ContinuousProcessingTimeTrigger<>(granularity);
+	}
+
+	@Override
+	public String toString() {
+		return "ContinuousProcessingTimeTrigger(" + granularity + ")";
+	}
+
+	public static <W extends Window> ContinuousProcessingTimeTrigger<W> of(long granularity) {
+		return new ContinuousProcessingTimeTrigger<>(granularity);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
new file mode 100644
index 0000000..b7f085a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+public class ContinuousWatermarkTrigger<W extends Window> implements Trigger<Object, W> {
+	private static final long serialVersionUID = 1L;
+
+	private long granularity;
+
+	private boolean first = true;
+
+	private ContinuousWatermarkTrigger(long granularity) {
+		this.granularity = granularity;
+	}
+
+	@Override
+	public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) {
+		if (first) {
+			long start = timestamp - (timestamp % granularity);
+			long nextFireTimestamp = start + granularity;
+
+			ctx.registerWatermarkTimer(nextFireTimestamp);
+			first = false;
+			return TriggerResult.CONTINUE;
+		}
+		return TriggerResult.CONTINUE;
+	}
+
+	@Override
+	public TriggerResult onTime(long time, TriggerContext ctx) {
+		ctx.registerWatermarkTimer(time + granularity);
+		return TriggerResult.FIRE;
+	}
+
+	@Override
+	public Trigger<Object, W> duplicate() {
+		return new ContinuousWatermarkTrigger<>(granularity);
+	}
+
+	@Override
+	public String toString() {
+		return "ContinuousProcessingTimeTrigger(" + granularity + ")";
+	}
+
+	public static <W extends Window> ContinuousWatermarkTrigger<W> of(long granularity) {
+		return new ContinuousWatermarkTrigger<>(granularity);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
new file mode 100644
index 0000000..a51fae6
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+public class CountTrigger<W extends Window> implements Trigger<Object, W> {
+	private static final long serialVersionUID = 1L;
+
+	private long maxCount;
+	private long count;
+
+	private CountTrigger(long maxCount) {
+		this.maxCount = maxCount;
+		count = 0;
+	}
+
+	@Override
+	public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) {
+		count++;
+		if (count >= maxCount) {
+			count = 0;
+			return TriggerResult.FIRE;
+		}
+		return TriggerResult.CONTINUE;
+	}
+
+	@Override
+	public TriggerResult onTime(long time, TriggerContext ctx) {
+		return null;
+	}
+
+	@Override
+	public Trigger<Object, W> duplicate() {
+		return new CountTrigger<>(maxCount);
+	}
+
+	@Override
+	public String toString() {
+		return "CountTrigger(" +  maxCount + ")";
+	}
+
+	public static <W extends Window> CountTrigger<W> of(long maxCount) {
+		return new CountTrigger<>(maxCount);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
new file mode 100644
index 0000000..ecd7ed0
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+public class DeltaTrigger<T, W extends Window> implements Trigger<T, W> {
+	private static final long serialVersionUID = 1L;
+
+	DeltaFunction<T> deltaFunction;
+	private double threshold;
+	private transient T lastElement;
+
+	private DeltaTrigger(double threshold, DeltaFunction<T> deltaFunction) {
+		this.deltaFunction = deltaFunction;
+		this.threshold = threshold;
+	}
+
+	@Override
+	public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) {
+		if (lastElement == null) {
+			lastElement = element;
+			return TriggerResult.CONTINUE;
+		}
+		if (deltaFunction.getDelta(lastElement, element) > this.threshold) {
+			lastElement = element;
+			return TriggerResult.FIRE;
+		}
+		return TriggerResult.CONTINUE;
+	}
+
+	@Override
+	public TriggerResult onTime(long time, TriggerContext ctx) {
+		return null;
+	}
+
+	@Override
+	public Trigger<T, W> duplicate() {
+		return new DeltaTrigger<>(threshold, deltaFunction);
+	}
+
+	@Override
+	public String toString() {
+		return "DeltaTrigger(" +  deltaFunction + ", " + threshold + ")";
+	}
+
+	public static <T, W extends Window> DeltaTrigger<T, W> of(double threshold, DeltaFunction<T> deltaFunction) {
+		return new DeltaTrigger<>(threshold, deltaFunction);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
new file mode 100644
index 0000000..f693a67
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+public class ProcessingTimeTrigger implements Trigger<Object, TimeWindow> {
+	private static final long serialVersionUID = 1L;
+
+	boolean isFirst = true;
+
+	private ProcessingTimeTrigger() {}
+
+	@Override
+	public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
+		if (isFirst) {
+			ctx.registerProcessingTimeTimer(window.getEnd());
+			isFirst = false;
+		}
+		return TriggerResult.CONTINUE;
+	}
+
+	@Override
+	public TriggerResult onTime(long time, TriggerContext ctx) {
+		return TriggerResult.FIRE_AND_PURGE;
+	}
+
+	@Override
+	public Trigger<Object, TimeWindow> duplicate() {
+		return new ProcessingTimeTrigger();
+	}
+
+	@Override
+	public String toString() {
+		return "ProcessingTimeTrigger()";
+	}
+
+	public static ProcessingTimeTrigger create() {
+		return new ProcessingTimeTrigger();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
new file mode 100644
index 0000000..88e22cd
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+public class PurgingTrigger<T, W extends Window> implements Trigger<T, W> {
+	private static final long serialVersionUID = 1L;
+
+	private Trigger<T, W> nestedTrigger;
+
+	private  PurgingTrigger(Trigger<T, W> nestedTrigger) {
+		this.nestedTrigger = nestedTrigger;
+	}
+
+	@Override
+	public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) {
+		TriggerResult triggerResult = nestedTrigger.onElement(element, timestamp, window, ctx);
+		switch (triggerResult) {
+			case FIRE:
+				return TriggerResult.FIRE_AND_PURGE;
+			case FIRE_AND_PURGE:
+				return TriggerResult.FIRE_AND_PURGE;
+			default:
+				return TriggerResult.CONTINUE;
+		}
+	}
+
+	@Override
+	public TriggerResult onTime(long time, TriggerContext ctx) {
+		TriggerResult triggerResult = nestedTrigger.onTime(time, ctx);
+		switch (triggerResult) {
+			case FIRE:
+				return TriggerResult.FIRE_AND_PURGE;
+			case FIRE_AND_PURGE:
+				return TriggerResult.FIRE_AND_PURGE;
+			default:
+				return TriggerResult.CONTINUE;
+		}
+	}
+
+	@Override
+	public Trigger<T, W> duplicate() {
+		return new PurgingTrigger<>(nestedTrigger.duplicate());
+	}
+
+	@Override
+	public String toString() {
+		return "PurgingTrigger(" + nestedTrigger.toString() + ")";
+	}
+
+	public static <T, W extends Window> PurgingTrigger<T, W> of(Trigger<T, W> nestedTrigger) {
+		return new PurgingTrigger<>(nestedTrigger);
+	}
+
+	@VisibleForTesting
+	public Trigger<T, W> getNestedTrigger() {
+		return nestedTrigger;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
new file mode 100644
index 0000000..b04aacf
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import scala.Serializable;
+
+public interface Trigger<T, W extends Window> extends Serializable {
+
+	public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx);
+
+	public TriggerResult onTime(long time, TriggerContext ctx);
+
+	public Trigger<T, W> duplicate();
+
+	public static enum TriggerResult {
+		CONTINUE, FIRE_AND_PURGE, FIRE
+	}
+
+	public interface TriggerContext {
+		void registerProcessingTimeTimer(long time);
+
+		void registerWatermarkTimer(long time);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java
new file mode 100644
index 0000000..6ba8890
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+public class WatermarkTrigger implements Trigger<Object, TimeWindow> {
+	private static final long serialVersionUID = 1L;
+
+	boolean isFirst = true;
+
+	private WatermarkTrigger() {}
+
+	@Override
+	public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
+		if (isFirst) {
+			ctx.registerWatermarkTimer(window.maxTimestamp());
+			isFirst = false;
+		}
+		return TriggerResult.CONTINUE;
+	}
+
+	@Override
+	public TriggerResult onTime(long time, TriggerContext ctx) {
+		return TriggerResult.FIRE_AND_PURGE;
+	}
+
+	@Override
+	public Trigger<Object, TimeWindow> duplicate() {
+		return new WatermarkTrigger();
+	}
+
+	@Override
+	public String toString() {
+		return "WatermarkTrigger()";
+	}
+
+	public static WatermarkTrigger create() {
+		return new WatermarkTrigger();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java
new file mode 100644
index 0000000..e0df19d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.windows;
+
+public class GlobalWindow extends Window {
+
+	private static GlobalWindow INSTANCE = new GlobalWindow();
+
+	private GlobalWindow() { }
+
+	public static GlobalWindow get() {
+		return INSTANCE;
+	}
+
+	@Override
+	public long getStart() {
+		return Long.MIN_VALUE;
+	}
+
+	@Override
+	public long getEnd() {
+		return Long.MAX_VALUE;
+	}
+
+	@Override
+	public long maxTimestamp() {
+		return Long.MAX_VALUE;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		return true;
+	}
+
+	@Override
+	public int hashCode() {
+		return 0;
+	}
+
+	@Override
+	public String toString() {
+		return "GlobalWindow";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
new file mode 100644
index 0000000..20080c0
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.windows;
+
+public class TimeWindow extends Window {
+	long start;
+	long end;
+
+	public TimeWindow() {
+	}
+
+	public TimeWindow(long start, long size) {
+		this.start = start;
+		this.end = start + size - 1;
+	}
+
+	@Override
+	public long getStart() {
+		return start;
+	}
+
+	@Override
+	public long getEnd() {
+		return end;
+	}
+
+	@Override
+	public long maxTimestamp() {
+		return end;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+
+		TimeWindow window = (TimeWindow) o;
+
+		return end == window.end && start == window.start;
+	}
+
+	@Override
+	public int hashCode() {
+		int result = (int) (start ^ (start >>> 32));
+		result = 31 * result + (int) (end ^ (end >>> 32));
+		return result;
+	}
+
+	@Override
+	public String toString() {
+		return "TimeWindow{" +
+				"start=" + start +
+				", end=" + end +
+				'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java
new file mode 100644
index 0000000..4e22c32
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.windows;
+
+public abstract class Window {
+
+	public abstract long getStart();
+
+	public abstract long getEnd();
+
+	public abstract long maxTimestamp();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/BucketStreamSortOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/BucketStreamSortOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/BucketStreamSortOperator.java
new file mode 100644
index 0000000..145ad25
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/BucketStreamSortOperator.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+
+public class BucketStreamSortOperator<T> extends AbstractStreamOperator<T> implements OneInputStreamOperator<T, T> {
+	private static final long serialVersionUID = 1L;
+
+	private long granularity;
+
+	private transient Map<Long, List<StreamRecord<T>>> buckets;
+
+	public BucketStreamSortOperator(long granularity) {
+		this.granularity = granularity;
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		buckets = Maps.newHashMap();
+
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public void processElement(StreamRecord<T> record) throws Exception {
+		long bucketId = record.getTimestamp() - (record.getTimestamp() % granularity);
+		List<StreamRecord<T>> bucket = buckets.get(bucketId);
+		if (bucket == null) {
+			bucket = Lists.newArrayList();
+			buckets.put(bucketId, bucket);
+		}
+		bucket.add(record);
+	}
+
+	@Override
+	public void processWatermark(Watermark mark) throws Exception {
+		long maxBucketId = mark.getTimestamp() - (mark.getTimestamp() % granularity);
+		Set<Long> toRemove = Sets.newHashSet();
+		for (Map.Entry<Long, List<StreamRecord<T>>> bucket: buckets.entrySet()) {
+			if (bucket.getKey() < maxBucketId) {
+				Collections.sort(bucket.getValue(), new Comparator<StreamRecord<T>>() {
+					@Override
+					public int compare(StreamRecord<T> o1, StreamRecord<T> o2) {
+						return (int) (o1.getTimestamp() - o2.getTimestamp());
+					}
+				});
+				for (StreamRecord<T> r: bucket.getValue()) {
+					output.collect(r);
+				}
+				toRemove.add(bucket.getKey());
+			}
+		}
+
+		for (Long l: toRemove) {
+			buckets.remove(l);
+		}
+
+		output.emitWatermark(mark);
+	}
+
+}