You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/01 11:54:17 UTC

[6/9] flink git commit: [hotfix] Simplify new windowing API

[hotfix] Simplify new windowing API

Before, there would be three different window() methods on
KeyedDataStream: one that takes two policies, one that takes one policy
and one that takes a window assigner.

Now, there is only one window() method that takes a window assigner and
creates a KeyedWindowDataStream.

For conveniece, there are two methods timeWindows() that take either one
argument (tumbling windows) or two arguments (sliding windows). These
create a KeyedWindowDataStream with either a SlidingWindows or
TumblingWindows assigner.

When the window operator is created we pick the optimized aligned time
windows operator if the combination of window assigner/trigger/evictor
allows it.

All of this behaviour is verified in tests.

This closes #1195


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5623c15b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5623c15b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5623c15b

Branch: refs/heads/master
Commit: 5623c15b0933487aea11fa8962feef29433133b7
Parents: 937793e
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Sep 29 20:22:11 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Oct 1 11:04:00 2015 +0200

----------------------------------------------------------------------
 .../api/datastream/KeyedDataStream.java         |  64 +++--
 .../KeyedTriggerWindowDataStream.java           | 255 -------------------
 .../api/datastream/KeyedWindowDataStream.java   | 237 +++++++++++++----
 .../windowing/ReduceKeyedWindowFunction.java    |  70 +++++
 .../api/windowing/evictors/TimeEvictor.java     |   6 +
 .../api/windowing/time/AbstractTime.java        |  95 +++++++
 .../streaming/api/windowing/time/EventTime.java |  62 +++++
 .../api/windowing/time/ProcessingTime.java      |  63 +++++
 .../streaming/api/windowing/time/Time.java      |  66 +++++
 .../ContinuousProcessingTimeTrigger.java        |   6 +
 .../triggers/ContinuousWatermarkTrigger.java    |   6 +
 .../windowpolicy/AbstractTimePolicy.java        | 109 --------
 .../api/windowing/windowpolicy/Count.java       |  58 -----
 .../api/windowing/windowpolicy/Delta.java       |  68 -----
 .../api/windowing/windowpolicy/EventTime.java   |  64 -----
 .../windowing/windowpolicy/ProcessingTime.java  |  65 -----
 .../api/windowing/windowpolicy/Time.java        |  68 -----
 .../windowing/windowpolicy/WindowPolicy.java    |  57 -----
 .../operators/windowing/PolicyToOperator.java   | 239 -----------------
 .../windowing/EvictingWindowOperatorTest.java   |   4 +-
 .../windowing/PolicyWindowTranslationTest.java  | 216 ----------------
 .../windowing/TimeWindowTranslationTest.java    |  97 +++++++
 .../windowing/TriggerWindowTranslationTest.java | 201 ---------------
 .../operators/windowing/WindowOperatorTest.java |  10 +-
 .../windowing/WindowTranslationTest.java        | 201 +++++++++++++++
 .../GroupedProcessingTimeWindowExample.java     |   4 +-
 26 files changed, 916 insertions(+), 1475 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5623c15b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
index ce105e5..2ae07b2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
@@ -23,8 +23,14 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
-import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy;
+import org.apache.flink.streaming.api.windowing.time.AbstractTime;
+import org.apache.flink.streaming.api.windowing.time.EventTime;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.partitioner.HashPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
@@ -92,34 +98,50 @@ public class KeyedDataStream<T, KEY> extends DataStream<T> {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Windows this data stream to a KeyedWindowDataStream, which evaluates windows over a key
-	 * grouped stream. The window is defined by a single policy.
+	 * Windows this {@code KeyedDataStream} into tumbling time windows.
+	 *
 	 * <p>
-	 * For time windows, these single-policy windows result in tumbling time windows.
-	 *     
-	 * @param policy The policy that defines the window.
-	 * @return The windows data stream. 
+	 * This is a shortcut for either {@code .window(TumblingTimeWindows.of(size))} or
+	 * {@code .window(TumblingProcessingTimeWindows.of(size))} depending on the time characteristic
+	 * set using
+	 * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
+	 *
+	 * @param size The size of the window.
 	 */
-	public KeyedWindowDataStream<T, KEY> window(WindowPolicy policy) {
-		return new KeyedWindowDataStream<T, KEY>(this, policy);
+	public KeyedWindowDataStream<T, KEY, TimeWindow> timeWindow(AbstractTime size) {
+		AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
+
+		if (actualSize instanceof EventTime) {
+			return window(TumblingTimeWindows.of(actualSize.toMilliseconds()));
+		} else {
+			return window(TumblingProcessingTimeWindows.of(actualSize.toMilliseconds()));
+		}
 	}
 
 	/**
-	 * Windows this data stream to a KeyedWindowDataStream, which evaluates windows over a key
-	 * grouped stream. The window is defined by a window policy, plus a slide policy.
+	 * Windows this {@code KeyedDataStream} into sliding time windows.
+	 *
 	 * <p>
-	 * For time windows, these slide policy windows result in sliding time windows.
-	 * 
-	 * @param window The policy that defines the window.
-	 * @param slide The additional policy defining the slide of the window. 
-	 * @return The windows data stream.
+	 * This is a shortcut for either {@code .window(SlidingTimeWindows.of(size, slide))} or
+	 * {@code .window(SlidingProcessingTimeWindows.of(size, slide))} depending on the time characteristic
+	 * set using
+	 * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
+	 *
+	 * @param size The size of the window.
 	 */
-	public KeyedWindowDataStream<T, KEY> window(WindowPolicy window, WindowPolicy slide) {
-		return new KeyedWindowDataStream<T, KEY>(this, window, slide);
+	public KeyedWindowDataStream<T, KEY, TimeWindow> timeWindow(AbstractTime size, AbstractTime slide) {
+		AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
+		AbstractTime actualSlide = slide.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
+
+		if (actualSize instanceof EventTime) {
+			return window(SlidingTimeWindows.of(actualSize.toMilliseconds(), actualSlide.toMilliseconds()));
+		} else {
+			return window(SlidingProcessingTimeWindows.of(actualSize.toMilliseconds(), actualSlide.toMilliseconds()));
+		}
 	}
 
 	/**
-	 * Windows this data stream to a {@code KeyedTriggerWindowDataStream}, which evaluates windows
+	 * Windows this data stream to a {@code KeyedWindowDataStream}, which evaluates windows
 	 * over a key grouped stream. Elements are put into windows by a {@link WindowAssigner}. The
 	 * grouping of elements is done both by key and by window.
 	 *
@@ -131,7 +153,7 @@ public class KeyedDataStream<T, KEY> extends DataStream<T> {
 	 * @param assigner The {@code WindowAssigner} that assigns elements to windows.
 	 * @return The trigger windows data stream.
 	 */
-	public <W extends Window> KeyedTriggerWindowDataStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
-		return new KeyedTriggerWindowDataStream<T, KEY, W>(this, assigner);
+	public <W extends Window> KeyedWindowDataStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
+		return new KeyedWindowDataStream<>(this, assigner);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5623c15b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedTriggerWindowDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedTriggerWindowDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedTriggerWindowDataStream.java
deleted file mode 100644
index 5b39775..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedTriggerWindowDataStream.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.commons.lang.SerializationUtils;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.Utils;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
-import org.apache.flink.streaming.api.windowing.evictors.Evictor;
-import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.windowpolicy.ProcessingTime;
-import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator;
-import org.apache.flink.streaming.runtime.operators.windowing.PolicyToOperator;
-import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * A {@code KeyedTriggerWindowDataStream} represents a data stream where elements are grouped by
- * key, and for each key, the stream of elements is split into windows based on a
- * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. Window emission
- * is triggered based on a {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.
- *
- * <p>
- * The windows are conceptually evaluated for each key individually, meaning windows can trigger at
- * different points for each key.
- *
- * <p>
- * If an {@link Evictor} is specified it will be used to evict elements from the window after
- * evaluation was triggered by the {@code Trigger} but before the actual evaluation of the window.
- * When using an evictor window performance will degrade significantly, since
- * pre-aggregation of window results cannot be used.
- *
- * <p>
- * Note that the {@code KeyedTriggerWindowDataStream} is purely and API construct, during runtime
- * the {@code KeyedTriggerWindowDataStream} will be collapsed together with the
- * {@code KeyedDataStream} and the operation over the window into one single operation.
- * 
- * @param <T> The type of elements in the stream.
- * @param <K> The type of the key by which elements are grouped.
- * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns the elements to.
- */
-public class KeyedTriggerWindowDataStream<T, K, W extends Window> {
-
-	/** The keyed data stream that is windowed by this stream */
-	private final KeyedDataStream<T, K> input;
-
-	/** The window assigner */
-	private final WindowAssigner<? super T, W> windowAssigner;
-
-	/** The trigger that is used for window evaluation/emission. */
-	private Trigger<? super T, ? super W> trigger;
-
-	/** The evictor that is used for evicting elements before window evaluation. */
-	private Evictor<? super T, ? super W> evictor;
-
-
-	public KeyedTriggerWindowDataStream(KeyedDataStream<T, K> input, WindowAssigner<? super T, W> windowAssigner) {
-		this.input = input;
-		this.windowAssigner = windowAssigner;
-		this.trigger = windowAssigner.getDefaultTrigger();
-	}
-
-	/**
-	 * Sets the {@code Trigger} that should be used to trigger window emission.
-	 */
-	public KeyedTriggerWindowDataStream<T, K, W> trigger(Trigger<? super T, ? super W> trigger) {
-		this.trigger = trigger;
-		return this;
-	}
-
-	/**
-	 * Sets the {@code Evictor} that should be used to evict elements from a window before emission.
-	 *
-	 * <p>
-	 * Note: When using an evictor window performance will degrade significantly, since
-	 * pre-aggregation of window results cannot be used.
-	 */
-	public KeyedTriggerWindowDataStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor) {
-		this.evictor = evictor;
-		return this;
-	}
-
-
-	// ------------------------------------------------------------------------
-	//  Operations on the keyed windows
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Applies a reduce function to the window. The window function is called for each evaluation
-	 * of the window for each key individually. The output of the reduce function is interpreted
-	 * as a regular non-windowed stream.
-	 * <p>
-	 * This window will try and pre-aggregate data as much as the window policies permit. For example,
-	 * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
-	 * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide interval,
-	 * so a few elements are stored per key (one per slide interval).
-	 * Custom windows may not be able to pre-aggregate, or may need to store extra values in an
-	 * aggregation tree.
-	 * 
-	 * @param function The reduce function.
-	 * @return The data stream that is the result of applying the reduce function to the window. 
-	 */
-	public DataStream<T> reduceWindow(ReduceFunction<T> function) {
-		String callLocation = Utils.getCallLocationName();
-		String udfName = "Reduce at " + callLocation;
-
-		DataStream<T> result = createFastTimeOperatorIfValid(function, input.getType(), udfName);
-		if (result != null) {
-			return result;
-		}
-
-		String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
-		KeySelector<T, K> keySel = input.getKeySelector();
-
-		OneInputStreamOperator<T, T> operator;
-
-		if (evictor != null) {
-			operator = new EvictingWindowOperator<>(windowAssigner,
-					keySel,
-					new HeapWindowBuffer.Factory<T>(),
-					new ReduceWindowFunction<K, W, T>(function),
-					trigger,
-					evictor);
-
-		} else {
-			// we need to copy because we need our own instance of the pre aggregator
-			@SuppressWarnings("unchecked")
-			ReduceFunction<T> functionCopy = (ReduceFunction<T>) SerializationUtils.clone(function);
-
-			operator = new WindowOperator<>(windowAssigner,
-					keySel,
-					new PreAggregatingHeapWindowBuffer.Factory<>(functionCopy),
-					new ReduceWindowFunction<K, W, T>(function),
-					trigger);
-		}
-
-		return input.transform(opName, input.getType(), operator);
-	}
-
-	/**
-	 * Applies a window function to the window. The window function is called for each evaluation
-	 * of the window for each key individually. The output of the window function is interpreted
-	 * as a regular non-windowed stream.
-	 * <p>
-	 * Not that this function requires that all data in the windows is buffered until the window
-	 * is evaluated, as the function provides no means of pre-aggregation.
-	 * 
-	 * @param function The window function.
-	 * @return The data stream that is the result of applying the window function to the window.
-	 */
-	public <R> DataStream<R> mapWindow(KeyedWindowFunction<T, R, K, W> function) {
-		TypeInformation<T> inType = input.getType();
-		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
-				function, KeyedWindowFunction.class, true, true, inType, null, false);
-
-		String callLocation = Utils.getCallLocationName();
-		String udfName = "MapWindow at " + callLocation;
-
-		DataStream<R> result = createFastTimeOperatorIfValid(function, resultType, udfName);
-		if (result != null) {
-			return result;
-		}
-
-
-		String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
-		KeySelector<T, K> keySel = input.getKeySelector();
-
-		OneInputStreamOperator<T, R> operator;
-
-		if (evictor != null) {
-			operator = new EvictingWindowOperator<>(windowAssigner,
-					keySel,
-					new HeapWindowBuffer.Factory<T>(),
-					function,
-					trigger,
-					evictor);
-
-		} else {
-			operator = new WindowOperator<>(windowAssigner,
-					keySel,
-					new HeapWindowBuffer.Factory<T>(),
-					function,
-					trigger);
-		}
-
-
-
-		return input.transform(opName, resultType, operator);
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	private <R> DataStream<R> createFastTimeOperatorIfValid(
-			Function function,
-			TypeInformation<R> resultType,
-			String functionName) {
-
-		WindowPolicy windowPolicy = null;
-		WindowPolicy slidePolicy = null;
-
-		if (windowAssigner instanceof SlidingProcessingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
-			SlidingProcessingTimeWindows timeWindows = (SlidingProcessingTimeWindows) windowAssigner;
-			windowPolicy = ProcessingTime.of(timeWindows.getSize(), TimeUnit.MILLISECONDS);
-			slidePolicy = ProcessingTime.of(timeWindows.getSlide(), TimeUnit.MILLISECONDS);
-		} else if (windowAssigner instanceof TumblingProcessingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
-			TumblingProcessingTimeWindows timeWindows = (TumblingProcessingTimeWindows) windowAssigner;
-			windowPolicy = ProcessingTime.of(timeWindows.getSize(), TimeUnit.MILLISECONDS);
-		}
-
-		if (windowPolicy == null) {
-			return null;
-		}
-
-		String opName = windowPolicy.toString(slidePolicy) + " of " + functionName;
-		KeySelector<T, K> keySel = input.getKeySelector();
-
-		OneInputStreamOperator<T, R> operator =
-				PolicyToOperator.createOperatorForPolicies(windowPolicy, slidePolicy, function, keySel);
-
-		return input.transform(opName, resultType, operator);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/5623c15b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
index e658bdd..ad7ca37 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
@@ -18,61 +18,98 @@
 
 package org.apache.flink.streaming.api.datastream;
 
+import org.apache.commons.lang.SerializationUtils;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ReduceKeyedWindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.evictors.Evictor;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.operators.windowing.PolicyToOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
 
 /**
- * A KeyedWindowDataStream represents a data stream where elements are grouped by key, and 
- * for each key, the stream of elements is split into windows. The windows are conceptually
- * evaluated for each key individually, meaning windows can trigger at different points
- * for each key.
+ * A {@code KeyedWindowDataStream} represents a data stream where elements are grouped by
+ * key, and for each key, the stream of elements is split into windows based on a
+ * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. Window emission
+ * is triggered based on a {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.
+ *
+ * <p>
+ * The windows are conceptually evaluated for each key individually, meaning windows can trigger at
+ * different points for each key.
+ *
  * <p>
- * In many cases, however, the windows are "aligned", meaning they trigger at the
- * same time for all keys. The most common example for that are the regular time windows.
+ * If an {@link Evictor} is specified it will be used to evict elements from the window after
+ * evaluation was triggered by the {@code Trigger} but before the actual evaluation of the window.
+ * When using an evictor window performance will degrade significantly, since
+ * pre-aggregation of window results cannot be used.
+ *
  * <p>
- * Note that the KeyedWindowDataStream is purely and API construct, during runtime the
- * KeyedWindowDataStream will be collapsed together with the KeyedDataStream and the operation
- * over the window into one single operation.
+ * Note that the {@code KeyedWindowDataStream} is purely and API construct, during runtime
+ * the {@code KeyedWindowDataStream} will be collapsed together with the
+ * {@code KeyedDataStream} and the operation over the window into one single operation.
  * 
  * @param <T> The type of elements in the stream.
  * @param <K> The type of the key by which elements are grouped.
+ * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns the elements to.
  */
-public class KeyedWindowDataStream<T, K> {
+public class KeyedWindowDataStream<T, K, W extends Window> {
 
 	/** The keyed data stream that is windowed by this stream */
 	private final KeyedDataStream<T, K> input;
 
-	/** The core window policy */
-	private final WindowPolicy windowPolicy;
+	/** The window assigner */
+	private final WindowAssigner<? super T, W> windowAssigner;
 
-	/** The optional additional slide policy */
-	private final WindowPolicy slidePolicy;
-	
-	
-	public KeyedWindowDataStream(KeyedDataStream<T, K> input, WindowPolicy windowPolicy) {
-		this(input, windowPolicy, null);
-	}
+	/** The trigger that is used for window evaluation/emission. */
+	private Trigger<? super T, ? super W> trigger;
+
+	/** The evictor that is used for evicting elements before window evaluation. */
+	private Evictor<? super T, ? super W> evictor;
 
-	public KeyedWindowDataStream(KeyedDataStream<T, K> input,
-								WindowPolicy windowPolicy, WindowPolicy slidePolicy) 
-	{
-		TimeCharacteristic time = input.getExecutionEnvironment().getStreamTimeCharacteristic();
 
+	public KeyedWindowDataStream(KeyedDataStream<T, K> input,
+			WindowAssigner<? super T, W> windowAssigner) {
 		this.input = input;
-		this.windowPolicy = windowPolicy.makeSpecificBasedOnTimeCharacteristic(time);
-		this.slidePolicy = slidePolicy == null ? null : slidePolicy.makeSpecificBasedOnTimeCharacteristic(time);
+		this.windowAssigner = windowAssigner;
+		this.trigger = windowAssigner.getDefaultTrigger();
 	}
-	
+
+	/**
+	 * Sets the {@code Trigger} that should be used to trigger window emission.
+	 */
+	public KeyedWindowDataStream<T, K, W> trigger(Trigger<? super T, ? super W> trigger) {
+		this.trigger = trigger;
+		return this;
+	}
+
+	/**
+	 * Sets the {@code Evictor} that should be used to evict elements from a window before emission.
+	 *
+	 * <p>
+	 * Note: When using an evictor window performance will degrade significantly, since
+	 * pre-aggregation of window results cannot be used.
+	 */
+	public KeyedWindowDataStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor) {
+		this.evictor = evictor;
+		return this;
+	}
+
+
 	// ------------------------------------------------------------------------
 	//  Operations on the keyed windows
 	// ------------------------------------------------------------------------
@@ -94,7 +131,39 @@ public class KeyedWindowDataStream<T, K> {
 	 */
 	public DataStream<T> reduceWindow(ReduceFunction<T> function) {
 		String callLocation = Utils.getCallLocationName();
-		return createWindowOperator(function, input.getType(), "Reduce at " + callLocation);
+		String udfName = "Reduce at " + callLocation;
+
+		DataStream<T> result = createFastTimeOperatorIfValid(function, input.getType(), udfName);
+		if (result != null) {
+			return result;
+		}
+
+		String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
+		KeySelector<T, K> keySel = input.getKeySelector();
+
+		OneInputStreamOperator<T, T> operator;
+
+		if (evictor != null) {
+			operator = new EvictingWindowOperator<>(windowAssigner,
+					keySel,
+					new HeapWindowBuffer.Factory<T>(),
+					new ReduceKeyedWindowFunction<K, W, T>(function),
+					trigger,
+					evictor);
+
+		} else {
+			// we need to copy because we need our own instance of the pre aggregator
+			@SuppressWarnings("unchecked")
+			ReduceFunction<T> functionCopy = (ReduceFunction<T>) SerializationUtils.clone(function);
+
+			operator = new WindowOperator<>(windowAssigner,
+					keySel,
+					new PreAggregatingHeapWindowBuffer.Factory<>(functionCopy),
+					new ReduceKeyedWindowFunction<K, W, T>(function),
+					trigger);
+		}
+
+		return input.transform(opName, input.getType(), operator);
 	}
 
 	/**
@@ -108,29 +177,107 @@ public class KeyedWindowDataStream<T, K> {
 	 * @param function The window function.
 	 * @return The data stream that is the result of applying the window function to the window.
 	 */
-	public <Result> DataStream<Result> mapWindow(KeyedWindowFunction<T, Result, K, Window> function) {
-		String callLocation = Utils.getCallLocationName();
-
+	public <R> DataStream<R> mapWindow(KeyedWindowFunction<T, R, K, W> function) {
 		TypeInformation<T> inType = input.getType();
-		TypeInformation<Result> resultType = TypeExtractor.getUnaryOperatorReturnType(
+		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
 				function, KeyedWindowFunction.class, true, true, inType, null, false);
 
-		return createWindowOperator(function, resultType, "KeyedWindowFunction at " + callLocation);
+		String callLocation = Utils.getCallLocationName();
+		String udfName = "MapWindow at " + callLocation;
+
+		DataStream<R> result = createFastTimeOperatorIfValid(function, resultType, udfName);
+		if (result != null) {
+			return result;
+		}
+
+
+		String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
+		KeySelector<T, K> keySel = input.getKeySelector();
+
+		OneInputStreamOperator<T, R> operator;
+
+		if (evictor != null) {
+			operator = new EvictingWindowOperator<>(windowAssigner,
+					keySel,
+					new HeapWindowBuffer.Factory<T>(),
+					function,
+					trigger,
+					evictor);
+
+		} else {
+			operator = new WindowOperator<>(windowAssigner,
+					keySel,
+					new HeapWindowBuffer.Factory<T>(),
+					function,
+					trigger);
+		}
+
+
+
+		return input.transform(opName, resultType, operator);
 	}
 
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
-	
-	private <Result> DataStream<Result> createWindowOperator(
-			Function function, TypeInformation<Result> resultType, String functionName) {
 
-		String opName = windowPolicy.toString(slidePolicy) + " of " + functionName;
-		KeySelector<T, K> keySel = input.getKeySelector();
-		
-		OneInputStreamOperator<T, Result> operator =
-				PolicyToOperator.createOperatorForPolicies(windowPolicy, slidePolicy, function, keySel);
-		
-		return input.transform(opName, resultType, operator);
+	private <R> DataStream<R> createFastTimeOperatorIfValid(
+			Function function,
+			TypeInformation<R> resultType,
+			String functionName) {
+
+		if (windowAssigner instanceof SlidingProcessingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
+			SlidingProcessingTimeWindows timeWindows = (SlidingProcessingTimeWindows) windowAssigner;
+			final long windowLength = timeWindows.getSize();
+			final long windowSlide = timeWindows.getSlide();
+
+			String opName = "Fast " + timeWindows + " of " + functionName;
+
+			if (function instanceof ReduceFunction) {
+				@SuppressWarnings("unchecked")
+				ReduceFunction<T> reducer = (ReduceFunction<T>) function;
+
+				@SuppressWarnings("unchecked")
+				OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
+						new AggregatingProcessingTimeWindowOperator<>(
+								reducer, input.getKeySelector(), windowLength, windowSlide);
+				return input.transform(opName, resultType, op);
+			}
+			else if (function instanceof KeyedWindowFunction) {
+				@SuppressWarnings("unchecked")
+				KeyedWindowFunction<T, R, K, Window> wf = (KeyedWindowFunction<T, R, K, Window>) function;
+
+				OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
+						wf, input.getKeySelector(), windowLength, windowSlide);
+				return input.transform(opName, resultType, op);
+			}
+		} else if (windowAssigner instanceof TumblingProcessingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
+			TumblingProcessingTimeWindows timeWindows = (TumblingProcessingTimeWindows) windowAssigner;
+			final long windowLength = timeWindows.getSize();
+			final long windowSlide = timeWindows.getSize();
+
+			String opName = "Fast " + timeWindows + " of " + functionName;
+
+			if (function instanceof ReduceFunction) {
+				@SuppressWarnings("unchecked")
+				ReduceFunction<T> reducer = (ReduceFunction<T>) function;
+
+				@SuppressWarnings("unchecked")
+				OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
+						new AggregatingProcessingTimeWindowOperator<>(
+								reducer, input.getKeySelector(), windowLength, windowSlide);
+				return input.transform(opName, resultType, op);
+			}
+			else if (function instanceof KeyedWindowFunction) {
+				@SuppressWarnings("unchecked")
+				KeyedWindowFunction<T, R, K, Window> wf = (KeyedWindowFunction<T, R, K, Window>) function;
+
+				OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
+						wf, input.getKeySelector(), windowLength, windowSlide);
+				return input.transform(opName, resultType, op);
+			}
+		}
+
+		return null;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5623c15b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceKeyedWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceKeyedWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceKeyedWindowFunction.java
new file mode 100644
index 0000000..70627f0
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceKeyedWindowFunction.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+public class ReduceKeyedWindowFunction<K, W extends Window, T> extends RichKeyedWindowFunction<T, T, K, W> {
+	private static final long serialVersionUID = 1L;
+
+	private final ReduceFunction<T> reduceFunction;
+
+	public ReduceKeyedWindowFunction(ReduceFunction<T> reduceFunction) {
+		this.reduceFunction = reduceFunction;
+	}
+
+	@Override
+	public void setRuntimeContext(RuntimeContext ctx) {
+		super.setRuntimeContext(ctx);
+		FunctionUtils.setFunctionRuntimeContext(reduceFunction, ctx);
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		FunctionUtils.openFunction(reduceFunction, parameters);
+	}
+
+	@Override
+	public void close() throws Exception {
+		super.close();
+		FunctionUtils.closeFunction(reduceFunction);
+	}
+
+	@Override
+	public void evaluate(K k, W window, Iterable<T> values, Collector<T> out) throws Exception {
+		T result = null;
+
+		for (T v: values) {
+			if (result == null) {
+				result = v;
+			} else {
+				result = reduceFunction.reduce(result, v);
+			}
+		}
+
+		if (result != null) {
+			out.collect(result);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5623c15b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
index 450b132..c38100c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.streaming.api.windowing.evictors;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
@@ -48,6 +49,11 @@ public class TimeEvictor<W extends Window> implements Evictor<Object, W> {
 		return "TimeEvictor(" + windowSize + ")";
 	}
 
+	@VisibleForTesting
+	public long getWindowSize() {
+		return windowSize;
+	}
+
 	public static <W extends Window> TimeEvictor<W> of(long windowSize) {
 		return new TimeEvictor<>(windowSize);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/5623c15b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java
new file mode 100644
index 0000000..1264c2a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.time;
+
+import org.apache.flink.streaming.api.TimeCharacteristic;
+
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public abstract class AbstractTime {
+
+	/** The time unit for this policy's time interval */
+	private final TimeUnit unit;
+	
+	/** The size of the windows generated by this policy */
+	private final long size;
+
+
+	protected AbstractTime(long size, TimeUnit unit) {
+		this.unit = checkNotNull(unit, "time unit may not be null");
+		this.size = size;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Properties
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Gets the time unit for this policy's time interval.
+	 * @return The time unit for this policy's time interval.
+	 */
+	public TimeUnit getUnit() {
+		return unit;
+	}
+
+	/**
+	 * Gets the length of this policy's time interval.
+	 * @return The length of this policy's time interval.
+	 */
+	public long getSize() {
+		return size;
+	}
+
+	/**
+	 * Converts the time interval to milliseconds.
+	 * @return The time interval in milliseconds.
+	 */
+	public long toMilliseconds() {
+		return unit.toMillis(size);
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	public abstract AbstractTime makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic characteristic);
+
+	@Override
+	public int hashCode() {
+		return 31 * (int) (size ^ (size >>> 32)) + unit.hashCode();
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj != null && obj.getClass() == getClass()) {
+			AbstractTime that = (AbstractTime) obj;
+			return this.size == that.size && this.unit.equals(that.unit);
+		}
+		else {
+			return false;
+		}
+	}
+
+	@Override
+	public String toString() {
+		return getClass().getSimpleName() + " (" + size + ' ' + unit.name() + ')';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5623c15b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/EventTime.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/EventTime.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/EventTime.java
new file mode 100644
index 0000000..6a4349c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/EventTime.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.time;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The definition of an event time interval for windowing. See
+ * {@link org.apache.flink.streaming.api.TimeCharacteristic#EventTime} for a definition
+ * of event time.
+ */
+public final class EventTime extends AbstractTime {
+
+	/** Instantiation only via factory method */
+	private EventTime(long size, TimeUnit unit) {
+		super(size, unit);
+	}
+
+	@Override
+	public EventTime makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic characteristic) {
+		if (characteristic == TimeCharacteristic.EventTime || characteristic == TimeCharacteristic.IngestionTime) {
+			return this;
+		}
+		else {
+			throw new InvalidProgramException(
+					"Cannot use EventTime policy in a dataflow that runs on " + characteristic);
+		}
+	}
+	// ------------------------------------------------------------------------
+	//  Factory
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates an event time policy describing an event time interval.
+	 *
+	 * @param size The size of the generated windows.
+	 * @param unit The init (seconds, milliseconds) of the time interval.
+	 * @return The event time policy.
+	 */
+	public static EventTime of(long size, TimeUnit unit) {
+		return new EventTime(size, unit);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5623c15b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/ProcessingTime.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/ProcessingTime.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/ProcessingTime.java
new file mode 100644
index 0000000..4be6ed0
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/ProcessingTime.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.time;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The definition of a processing time interval for windowing. See
+ * {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime} for a definition
+ * of processing time.
+ */
+public final class ProcessingTime extends AbstractTime {
+
+	/** Instantiation only via factory method */
+	private ProcessingTime(long size, TimeUnit unit) {
+		super(size, unit);
+	}
+
+	@Override
+	public ProcessingTime makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic characteristic) {
+		if (characteristic == TimeCharacteristic.ProcessingTime) {
+			return this;
+		}
+		else {
+			throw new InvalidProgramException(
+					"Cannot use ProcessingTime policy in a dataflow that runs on " + characteristic);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Factory
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a processing time policy describing a processing time interval.
+	 *
+	 * @param size The size of the generated windows.
+	 * @param unit The init (seconds, milliseconds) of the time interval.
+	 * @return The processing time policy.
+	 */
+	public static ProcessingTime of(long size, TimeUnit unit) {
+		return new ProcessingTime(size, unit);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5623c15b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java
new file mode 100644
index 0000000..d1b3fe3
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.time;
+
+import org.apache.flink.streaming.api.TimeCharacteristic;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The definition of a time interval for windowing. The time characteristic referred
+ * to is the default time characteristic set on the execution environment.
+ */
+public final class Time extends AbstractTime {
+
+	/** Instantiation only via factory method */
+	private Time(long size, TimeUnit unit) {
+		super(size, unit);
+	}
+
+	@Override
+	public AbstractTime makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic timeCharacteristic) {
+		switch (timeCharacteristic) {
+			case ProcessingTime:
+				return ProcessingTime.of(getSize(), getUnit());
+			case IngestionTime:
+			case EventTime:
+				return EventTime.of(getSize(), getUnit());
+			default:
+				throw new IllegalArgumentException("Unknown time characteristic");
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Factory
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a time policy describing a processing time interval. The policy refers to the
+	 * time characteristic that is set on the dataflow via
+	 * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#
+	 * setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}.
+	 *
+	 * @param size The size of the generated windows.
+	 * @param unit The init (seconds, milliseconds) of the time interval.
+	 * @return The time policy.
+	 */
+	public static Time of(long size, TimeUnit unit) {
+		return new Time(size, unit);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5623c15b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
index 64850a2..da198be 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.streaming.api.windowing.triggers;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
 public class ContinuousProcessingTimeTrigger<W extends Window> implements Trigger<Object, W> {
@@ -68,6 +69,11 @@ public class ContinuousProcessingTimeTrigger<W extends Window> implements Trigge
 		return new ContinuousProcessingTimeTrigger<>(granularity);
 	}
 
+	@VisibleForTesting
+	public long getGranularity() {
+		return granularity;
+	}
+
 	@Override
 	public String toString() {
 		return "ContinuousProcessingTimeTrigger(" + granularity + ")";

http://git-wip-us.apache.org/repos/asf/flink/blob/5623c15b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
index b7f085a..3b6dc6d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.streaming.api.windowing.triggers;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
 public class ContinuousWatermarkTrigger<W extends Window> implements Trigger<Object, W> {
@@ -59,6 +60,11 @@ public class ContinuousWatermarkTrigger<W extends Window> implements Trigger<Obj
 		return "ContinuousProcessingTimeTrigger(" + granularity + ")";
 	}
 
+	@VisibleForTesting
+	public long getGranularity() {
+		return granularity;
+	}
+
 	public static <W extends Window> ContinuousWatermarkTrigger<W> of(long granularity) {
 		return new ContinuousWatermarkTrigger<>(granularity);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/5623c15b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/AbstractTimePolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/AbstractTimePolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/AbstractTimePolicy.java
deleted file mode 100644
index 6e382bd..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/AbstractTimePolicy.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowpolicy;
-
-import java.util.concurrent.TimeUnit;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-public class AbstractTimePolicy extends WindowPolicy {
-
-	private static final long serialVersionUID = 6593098375698927728L;
-	
-	/** The time unit for this policy's time interval */
-	private final TimeUnit unit;
-	
-	/** The size of the windows generated by this policy */
-	private final long size;
-
-
-	protected AbstractTimePolicy(long size, TimeUnit unit) {
-		this.unit = checkNotNull(unit, "time unit may not be null");
-		this.size = size;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Properties
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Gets the time unit for this policy's time interval.
-	 * @return The time unit for this policy's time interval.
-	 */
-	public TimeUnit getUnit() {
-		return unit;
-	}
-
-	/**
-	 * Gets the length of this policy's time interval.
-	 * @return The length of this policy's time interval.
-	 */
-	public long getSize() {
-		return size;
-	}
-
-	/**
-	 * Converts the time interval to milliseconds.
-	 * @return The time interval in milliseconds.
-	 */
-	public long toMilliseconds() {
-		return unit.toMillis(size);
-	}
-	
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	@Override
-	public String toString(WindowPolicy slidePolicy) {
-		if (slidePolicy == null) {
-			return "Tumbling Window (" + getClass().getSimpleName() + ") (" + size + ' ' + unit.name() + ')';
-		}
-		else if (slidePolicy.getClass() == getClass()) {
-			AbstractTimePolicy timeSlide = (AbstractTimePolicy) slidePolicy;
-			
-			return "Sliding Window (" + getClass().getSimpleName() + ") (length="
-					+ size + ' ' + unit.name() + ", slide=" + timeSlide.size + ' ' + timeSlide.unit.name() + ')';
-		}
-		else {
-			return super.toString(slidePolicy);
-		}
-	}
-	
-	@Override
-	public int hashCode() {
-		return 31 * (int) (size ^ (size >>> 32)) + unit.hashCode();
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj != null && obj.getClass() == getClass()) {
-			AbstractTimePolicy that = (AbstractTimePolicy) obj;
-			return this.size == that.size && this.unit.equals(that.unit);
-		}
-		else {
-			return false;
-		}
-	}
-
-	@Override
-	public String toString() {
-		return getClass().getSimpleName() + " (" + size + ' ' + unit.name() + ')';
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/5623c15b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/Count.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/Count.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/Count.java
deleted file mode 100644
index 5fb7d58..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/Count.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowpolicy;
-
-
-/**
- * A windowing policy that generates windows based on element counts.
- */
-public final class Count extends WindowPolicy {
-
-	private static final long serialVersionUID = 3197290738634320211L;
-
-	private long size;
-
-	/** Instantiation only via factory method */
-	private Count(long size) {
-		this.size = size;
-	}
-
-	public long getSize() {
-		return size;
-	}
-
-	@Override
-	public String toString() {
-		return "Count Window (" + size + ')';
-	}
-
-	// ------------------------------------------------------------------------
-	//  Factory
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates a count based windowing policy
-	 *
-	 * @param size The size of the generated windows.
-	 * @return The time policy.
-	 */
-	public static Count of(long size) {
-		return new Count(size);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/5623c15b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/Delta.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/Delta.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/Delta.java
deleted file mode 100644
index 4a3082c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/Delta.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowpolicy;
-
-
-import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
-
-/**
- * A windowing policy that generates windows based on a delta between elements.
- */
-public final class Delta<T> extends WindowPolicy {
-
-	private static final long serialVersionUID = 3197290738634320211L;
-
-	private DeltaFunction<T> deltaFunction;
-
-	private double threshold;
-
-	/** Instantiation only via factory method */
-	private Delta(DeltaFunction<T> deltaFunction, double threshold) {
-		this.deltaFunction = deltaFunction;
-		this.threshold = threshold;
-	}
-
-	public DeltaFunction<T> getDeltaFunction() {
-		return deltaFunction;
-	}
-
-	public double getThreshold() {
-		return threshold;
-	}
-
-	@Override
-	public String toString() {
-		return "Delta Window (" + deltaFunction + ", " + threshold + ')';
-	}
-
-	// ------------------------------------------------------------------------
-	//  Factory
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates a delta based windowing policy
-	 *
-	 * @param threshold The threshold for deltas at which to trigger windows
-	 * @param deltaFunction The delta function
-	 * @return The time policy.
-	 */
-	public static <T> Delta<T> of(double threshold, DeltaFunction<T> deltaFunction) {
-		return new Delta<T>(deltaFunction, threshold);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/5623c15b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/EventTime.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/EventTime.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/EventTime.java
deleted file mode 100644
index c32a0b0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/EventTime.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowpolicy;
-
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * The definition of an event time interval for windowing. See
- * {@link org.apache.flink.streaming.api.TimeCharacteristic#EventTime} for a definition
- * of event time.
- */
-public final class EventTime extends AbstractTimePolicy {
-
-	private static final long serialVersionUID = 8333566691833596747L;
-
-	/** Instantiation only via factory method */
-	private EventTime(long size, TimeUnit unit) {
-		super(size, unit);
-	}
-
-	@Override
-	public EventTime makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic characteristic) {
-		if (characteristic == TimeCharacteristic.EventTime || characteristic == TimeCharacteristic.IngestionTime) {
-			return this;
-		}
-		else {
-			throw new InvalidProgramException(
-					"Cannot use EventTime policy in a dataflow that runs on " + characteristic);
-		}
-	}
-	// ------------------------------------------------------------------------
-	//  Factory
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates an event time policy describing an event time interval.
-	 *
-	 * @param size The size of the generated windows.
-	 * @param unit The init (seconds, milliseconds) of the time interval.
-	 * @return The event time policy.
-	 */
-	public static EventTime of(long size, TimeUnit unit) {
-		return new EventTime(size, unit);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/5623c15b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/ProcessingTime.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/ProcessingTime.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/ProcessingTime.java
deleted file mode 100644
index a71ba1d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/ProcessingTime.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowpolicy;
-
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * The definition of a processing time interval for windowing. See
- * {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime} for a definition
- * of processing time.
- */
-public final class ProcessingTime extends AbstractTimePolicy {
-
-	private static final long serialVersionUID = 7546166721132583007L;
-
-	/** Instantiation only via factory method */
-	private ProcessingTime(long size, TimeUnit unit) {
-		super(size, unit);
-	}
-
-	@Override
-	public ProcessingTime makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic characteristic) {
-		if (characteristic == TimeCharacteristic.ProcessingTime) {
-			return this;
-		}
-		else {
-			throw new InvalidProgramException(
-					"Cannot use ProcessingTime policy in a dataflow that runs on " + characteristic);
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Factory
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates a processing time policy describing a processing time interval.
-	 *
-	 * @param size The size of the generated windows.
-	 * @param unit The init (seconds, milliseconds) of the time interval.
-	 * @return The processing time policy.
-	 */
-	public static ProcessingTime of(long size, TimeUnit unit) {
-		return new ProcessingTime(size, unit);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/5623c15b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/Time.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/Time.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/Time.java
deleted file mode 100644
index efc9bf2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/Time.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowpolicy;
-
-import org.apache.flink.streaming.api.TimeCharacteristic;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * The definition of a time interval for windowing. The time characteristic referred
- * to is the default time characteristic set on the execution environment.
- */
-public final class Time extends AbstractTimePolicy {
-
-	private static final long serialVersionUID = 3197290738634320211L;
-
-	/** Instantiation only via factory method */
-	private Time(long size, TimeUnit unit) {
-		super(size, unit);
-	}
-
-	@Override
-	public AbstractTimePolicy makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic timeCharacteristic) {
-		switch (timeCharacteristic) {
-			case ProcessingTime:
-				return ProcessingTime.of(getSize(), getUnit());
-			case IngestionTime:
-			case EventTime:
-				return EventTime.of(getSize(), getUnit());
-			default:
-				throw new IllegalArgumentException("Unknown time characteristic");
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Factory
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates a time policy describing a processing time interval. The policy refers to the
-	 * time characteristic that is set on the dataflow via
-	 * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#
-	 * setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}.
-	 *
-	 * @param size The size of the generated windows.
-	 * @param unit The init (seconds, milliseconds) of the time interval.
-	 * @return The time policy.
-	 */
-	public static Time of(long size, TimeUnit unit) {
-		return new Time(size, unit);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/5623c15b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/WindowPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/WindowPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/WindowPolicy.java
deleted file mode 100644
index 2e1a387..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/WindowPolicy.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowpolicy;
-
-import org.apache.flink.streaming.api.TimeCharacteristic;
-
-/**
- * The base class of all window policies. Window policies define how windows
- * are formed over the data stream.
- */
-public abstract class WindowPolicy implements java.io.Serializable {
-
-	private static final long serialVersionUID = -8696529489282723113L;
-	
-	/**
-	 * If the concrete instantiation of a window policy depends on the time characteristic of the
-	 * dataflow (processing time, event time), then this method must be overridden to convert this
-	 * policy to the respective specific instantiation.
-	 * <p>
-	 * The {@link Time} policy for example, will convert itself to an {@link ProcessingTime} policy,
-	 * if the time characteristic is set to {@link TimeCharacteristic#ProcessingTime}.
-	 * <p>
-	 * By default, this method does nothing and simply returns this object itself.
-	 * 
-	 * @param characteristic The time characteristic of the dataflow.
-	 * @return The specific instantiation of this policy, or the policy itself. 
-	 */
-	public WindowPolicy makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic characteristic) {
-		return this;
-	}
-	
-	
-	public String toString(WindowPolicy slidePolicy) {
-		if (slidePolicy != null) {
-			return "Window [" + toString() + ", slide=" + slidePolicy + ']';
-		}
-		else {
-			return "Window [" + toString() + ']';
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/5623c15b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java
deleted file mode 100644
index 880c85c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.commons.lang.SerializationUtils;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
-import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
-import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
-import org.apache.flink.streaming.api.windowing.evictors.DeltaEvictor;
-import org.apache.flink.streaming.api.windowing.evictors.Evictor;
-import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
-import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.ContinuousWatermarkTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger;
-import org.apache.flink.streaming.api.windowing.windowpolicy.Count;
-import org.apache.flink.streaming.api.windowing.windowpolicy.Delta;
-import org.apache.flink.streaming.api.windowing.windowpolicy.EventTime;
-import org.apache.flink.streaming.api.windowing.windowpolicy.ProcessingTime;
-import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy;
-import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.EvictingWindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
-
-/**
- * This class implements the conversion from window policies to concrete operator
- * implementations.
- */
-public class PolicyToOperator {
-
-	/**
-	 * Entry point to create an operator for the given window policies and the window function.
-	 */
-	public static <IN, OUT, KEY> OneInputStreamOperator<IN, OUT> createOperatorForPolicies(
-			WindowPolicy window, WindowPolicy slide, Function function, KeySelector<IN, KEY> keySelector)
-	{
-		if (window == null || function == null) {
-			throw new NullPointerException();
-		}
-		
-		// -- case 1: both policies are processing time policies
-		if (window instanceof ProcessingTime && (slide == null || slide instanceof ProcessingTime)) {
-			final long windowLength = ((ProcessingTime) window).toMilliseconds();
-			final long windowSlide = slide == null ? windowLength : ((ProcessingTime) slide).toMilliseconds();
-			
-			if (function instanceof ReduceFunction) {
-				@SuppressWarnings("unchecked")
-				ReduceFunction<IN> reducer = (ReduceFunction<IN>) function;
-
-				@SuppressWarnings("unchecked")
-				OneInputStreamOperator<IN, OUT> op = (OneInputStreamOperator<IN, OUT>)
-						new AggregatingProcessingTimeWindowOperator<>(
-								reducer, keySelector, windowLength, windowSlide);
-				return op;
-			}
-			else if (function instanceof KeyedWindowFunction) {
-				@SuppressWarnings("unchecked")
-				KeyedWindowFunction<IN, OUT, KEY, Window> wf = (KeyedWindowFunction<IN, OUT, KEY, Window>) function;
-
-				return new AccumulatingProcessingTimeWindowOperator<>(
-								wf, keySelector, windowLength, windowSlide);
-			}
-		}
-
-		// -- case 2: both policies are event time policies
-		if (window instanceof EventTime && (slide == null || slide instanceof EventTime)) {
-			final long windowLength = ((EventTime) window).toMilliseconds();
-			final long windowSlide = slide == null ? windowLength : ((EventTime) slide).toMilliseconds();
-
-			WindowAssigner<? super IN, TimeWindow> assigner;
-			if (windowSlide == windowLength) {
-				assigner = TumblingTimeWindows.of(windowLength);
-			} else {
-				assigner = SlidingTimeWindows.of(windowLength, windowSlide);
-			}
-			WindowBufferFactory<IN, ? extends WindowBuffer<IN>> windowBuffer;
-			if (function instanceof ReduceFunction) {
-				@SuppressWarnings("unchecked")
-				ReduceFunction<IN> reducer = (ReduceFunction<IN>) SerializationUtils.clone(function);
-				function = new ReduceWindowFunction<>(reducer);
-				windowBuffer = new PreAggregatingHeapWindowBuffer.Factory<>(reducer);
-			} else {
-				windowBuffer = new HeapWindowBuffer.Factory<>();
-			}
-			@SuppressWarnings("unchecked")
-			KeyedWindowFunction<IN, OUT, KEY, TimeWindow> windowFunction = (KeyedWindowFunction<IN, OUT, KEY, TimeWindow>) function;
-
-			return new WindowOperator<>(
-					assigner,
-					keySelector,
-					windowBuffer,
-					windowFunction,
-					WatermarkTrigger.create());
-		}
-
-		// -- case 3: arbitrary trigger, no eviction
-		if (slide == null) {
-			Trigger<? super IN, GlobalWindow> trigger = policyToTrigger(window);
-			// we need to make them purging triggers because the trigger/eviction policy model
-			// expects that the window is purged when no slide is used
-			Trigger<? super IN, GlobalWindow> purgingTrigger = PurgingTrigger.of(trigger);
-
-			WindowBufferFactory<IN, ? extends WindowBuffer<IN>> windowBuffer;
-			if (function instanceof ReduceFunction) {
-				@SuppressWarnings("unchecked")
-				ReduceFunction<IN> reducer = (ReduceFunction<IN>) SerializationUtils.clone(function);
-				function = new ReduceWindowFunction<>(reducer);
-				windowBuffer = new PreAggregatingHeapWindowBuffer.Factory<>(reducer);
-			} else {
-				windowBuffer = new HeapWindowBuffer.Factory<>();
-			}
-
-			if (!(function instanceof KeyedWindowFunction)) {
-				throw new IllegalStateException("Windowing function is not of type EvaluateKeyedWindowFunction.");
-			}
-			@SuppressWarnings("unchecked")
-			KeyedWindowFunction<IN, OUT, KEY, GlobalWindow> windowFunction = (KeyedWindowFunction<IN, OUT, KEY, GlobalWindow>) function;
-
-			return new WindowOperator<>(
-					GlobalWindows.<IN>create(),
-					keySelector,
-					windowBuffer,
-					windowFunction,
-					purgingTrigger);
-		}
-
-		// -- case 4: arbitrary trigger, arbitrary eviction
-		Trigger<? super IN, GlobalWindow> trigger = policyToTrigger(slide);
-		Evictor<? super IN, GlobalWindow> evictor = policyToEvictor(window);
-
-		WindowBufferFactory<IN, ? extends EvictingWindowBuffer<IN>> windowBuffer = new HeapWindowBuffer.Factory<>();
-		if (function instanceof ReduceFunction) {
-			@SuppressWarnings("unchecked")
-			ReduceFunction<IN> reducer = (ReduceFunction<IN>) SerializationUtils.clone(function);
-			function = new ReduceWindowFunction<>(reducer);
-		}
-
-		if (!(function instanceof KeyedWindowFunction)) {
-			throw new IllegalStateException("Windowing function is not of type EvaluateKeyedWindowFunction.");
-		}
-
-		@SuppressWarnings("unchecked")
-		KeyedWindowFunction<IN, OUT, KEY, GlobalWindow> windowFunction = (KeyedWindowFunction<IN, OUT, KEY, GlobalWindow>) function;
-
-		EvictingWindowOperator<KEY, IN, OUT, GlobalWindow> op = new EvictingWindowOperator<>(
-				GlobalWindows.<IN>create(),
-				keySelector,
-				windowBuffer,
-				windowFunction,
-				trigger,
-				evictor);
-
-		if (window instanceof ProcessingTime) {
-			// special case, we need to instruct the window operator to store the processing time in
-			// the elements so that the evictor can work on that
-			op.enableSetProcessingTime(true);
-		}
-
-		return op;
-	}
-
-	private static <IN> Trigger<? super IN, GlobalWindow> policyToTrigger(WindowPolicy policy) {
-		if (policy instanceof EventTime) {
-			EventTime eventTime = (EventTime) policy;
-			return ContinuousWatermarkTrigger.of(eventTime.getSize());
-		} else if (policy instanceof ProcessingTime) {
-			ProcessingTime processingTime = (ProcessingTime) policy;
-			return ContinuousProcessingTimeTrigger.of(processingTime.getSize());
-		} else if (policy instanceof Count) {
-			Count count = (Count) policy;
-			return CountTrigger.of(count.getSize());
-		} else if (policy instanceof Delta) {
-			@SuppressWarnings("unchecked,rawtypes")
-			Delta<IN> delta = (Delta) policy;
-			return DeltaTrigger.of(delta.getThreshold(), delta.getDeltaFunction());
-
-		}
-
-		throw new UnsupportedOperationException("Unsupported policy " + policy);
-	}
-
-	private static <IN> Evictor<? super IN, GlobalWindow> policyToEvictor(WindowPolicy policy) {
-		if (policy instanceof EventTime) {
-			EventTime eventTime = (EventTime) policy;
-			return TimeEvictor.of(eventTime.getSize());
-		} else if (policy instanceof ProcessingTime) {
-			ProcessingTime processingTime = (ProcessingTime) policy;
-			return TimeEvictor.of(processingTime.getSize());
-		} else if (policy instanceof Count) {
-			Count count = (Count) policy;
-			return CountEvictor.of(count.getSize());
-		} else if (policy instanceof Delta) {
-			@SuppressWarnings("unchecked,rawtypes")
-			Delta<IN> delta = (Delta) policy;
-			return DeltaEvictor.of(delta.getThreshold(), delta.getDeltaFunction());
-
-		}
-
-
-		throw new UnsupportedOperationException("Unsupported policy " + policy);
-	}
-	
-	// ------------------------------------------------------------------------
-	
-	/** Don't instantiate */
-	private PolicyToOperator() {}
-}