You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/05 16:42:39 UTC

[05/13] flink git commit: [FLINK-2550] Simplify Stream Java API Class Names

[FLINK-2550] Simplify Stream Java API Class Names

KeyedDataStream -> KeyedStream
KeyedWindowDataStream -> WindowedStream
NonParallelWindowDataStream -> AllWindowedStream

KeyedWindowFunction -> WindowFunction
WindowFunction -> AllWindowFunction
(along with rich functions and reduce function wrappers)

WindowedStream.mapWindow -> WindowedStream.apply
AllWindowedStream.mapWindow -> AllWindowedStream.apply

Also renamed the tests to match the new names.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9e6e0aec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9e6e0aec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9e6e0aec

Branch: refs/heads/master
Commit: 9e6e0aeca01c50640827adbdd60089761cd5e8d2
Parents: 68c1afc
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Oct 1 15:58:52 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Oct 5 16:36:35 2015 +0200

----------------------------------------------------------------------
 .../api/datastream/AllWindowedStream.java       | 231 ++++++++++++++
 .../streaming/api/datastream/DataStream.java    |  34 +--
 .../api/datastream/GroupedDataStream.java       |   2 +-
 .../api/datastream/KeyedDataStream.java         | 159 ----------
 .../streaming/api/datastream/KeyedStream.java   | 160 ++++++++++
 .../api/datastream/KeyedWindowDataStream.java   | 287 ------------------
 .../datastream/NonParallelWindowDataStream.java | 218 -------------
 .../api/datastream/WindowedStream.java          | 302 +++++++++++++++++++
 .../functions/windowing/AllWindowFunction.java  |  45 +++
 .../windowing/KeyedWindowFunction.java          |  45 ---
 .../windowing/ReduceAllWindowFunction.java      |  70 +++++
 .../windowing/ReduceKeyedWindowFunction.java    |  70 -----
 .../windowing/ReduceWindowFunction.java         |   4 +-
 .../ReduceWindowFunctionWithWindow.java         |   4 +-
 .../windowing/RichAllWindowFunction.java        |  25 ++
 .../windowing/RichKeyedWindowFunction.java      |  25 --
 .../functions/windowing/RichWindowFunction.java |   2 +-
 .../api/functions/windowing/WindowFunction.java |  16 +-
 .../windowing/AccumulatingKeyedTimePanes.java   |  14 +-
 ...ccumulatingProcessingTimeWindowOperator.java |   8 +-
 .../EvictingNonKeyedWindowOperator.java         |   6 +-
 .../windowing/EvictingWindowOperator.java       |   6 +-
 .../windowing/NonKeyedWindowOperator.java       |   8 +-
 .../operators/windowing/WindowOperator.java     |   8 +-
 .../runtime/tasks/StreamingRuntimeContext.java  |   2 +-
 .../api/state/StatefulOperatorTest.java         |   4 +-
 ...AlignedProcessingTimeWindowOperatorTest.java |  22 +-
 .../windowing/AllWindowTranslationTest.java     | 198 ++++++++++++
 .../EvictingNonKeyedWindowOperatorTest.java     |   5 +-
 .../windowing/EvictingWindowOperatorTest.java   |   4 +-
 .../windowing/NonKeyedWindowOperatorTest.java   |  11 +-
 ...ParallelWindowDataStreamTranslationTest.java | 198 ------------
 .../windowing/TimeWindowTranslationTest.java    |  13 +-
 .../operators/windowing/WindowOperatorTest.java |  10 +-
 .../windowing/WindowTranslationTest.java        |  17 +-
 .../GroupedProcessingTimeWindowExample.java     |  10 +-
 .../flink/streaming/api/scala/DataStream.scala  |   5 +-
 37 files changed, 1144 insertions(+), 1104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
new file mode 100644
index 0000000..e5c7c18
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.functions.windowing.ReduceAllWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.evictors.Evictor;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.operators.windowing.EvictingNonKeyedWindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.NonKeyedWindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
+
+/**
+ * A {@code AllWindowedStream} represents a data stream where the stream of
+ * elements is split into windows based on a
+ * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. Window emission
+ * is triggered based on a {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.
+ *
+ * <p>
+ * If an {@link org.apache.flink.streaming.api.windowing.evictors.Evictor} is specified it will be
+ * used to evict elements from the window after
+ * evaluation was triggered by the {@code Trigger} but before the actual evaluation of the window.
+ * When using an evictor window performance will degrade significantly, since
+ * pre-aggregation of window results cannot be used.
+ *
+ * <p>
+ * Note that the {@code AllWindowedStream} is purely and API construct, during runtime
+ * the {@code AllWindowedStream} will be collapsed together with the
+ * operation over the window into one single operation.
+ *
+ * @param <T> The type of elements in the stream.
+ * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns the elements to.
+ */
+public class AllWindowedStream<T, W extends Window> {
+
+	/** The data stream that is windowed by this stream */
+	private final DataStream<T> input;
+
+	/** The window assigner */
+	private final WindowAssigner<? super T, W> windowAssigner;
+
+	/** The trigger that is used for window evaluation/emission. */
+	private Trigger<? super T, ? super W> trigger;
+
+	/** The evictor that is used for evicting elements before window evaluation. */
+	private Evictor<? super T, ? super W> evictor;
+
+
+	public AllWindowedStream(DataStream<T> input,
+			WindowAssigner<? super T, W> windowAssigner) {
+		this.input = input;
+		this.windowAssigner = windowAssigner;
+		this.trigger = windowAssigner.getDefaultTrigger();
+	}
+
+	/**
+	 * Sets the {@code Trigger} that should be used to trigger window emission.
+	 */
+	public AllWindowedStream<T, W> trigger(Trigger<? super T, ? super W> trigger) {
+		this.trigger = trigger;
+		return this;
+	}
+
+	/**
+	 * Sets the {@code Evictor} that should be used to evict elements from a window before emission.
+	 *
+	 * <p>
+	 * Note: When using an evictor window performance will degrade significantly, since
+	 * pre-aggregation of window results cannot be used.
+	 */
+	public AllWindowedStream<T, W> evictor(Evictor<? super T, ? super W> evictor) {
+		this.evictor = evictor;
+		return this;
+	}
+
+
+	// ------------------------------------------------------------------------
+	//  Operations on the keyed windows
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Applies a reduce function to the window. The window function is called for each evaluation
+	 * of the window for each key individually. The output of the reduce function is interpreted
+	 * as a regular non-windowed stream.
+	 * <p>
+	 * This window will try and pre-aggregate data as much as the window policies permit. For example,
+	 * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
+	 * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide interval,
+	 * so a few elements are stored per key (one per slide interval).
+	 * Custom windows may not be able to pre-aggregate, or may need to store extra values in an
+	 * aggregation tree.
+	 * 
+	 * @param function The reduce function.
+	 * @return The data stream that is the result of applying the reduce function to the window. 
+	 */
+	public DataStream<T> reduceWindow(ReduceFunction<T> function) {
+		String callLocation = Utils.getCallLocationName();
+		String udfName = "Reduce at " + callLocation;
+
+		DataStream<T> result = createFastTimeOperatorIfValid(function, input.getType(), udfName);
+		if (result != null) {
+			return result;
+		}
+
+		String opName = "NonParallelTriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
+
+		OneInputStreamOperator<T, T> operator;
+
+		if (evictor != null) {
+			operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
+					new HeapWindowBuffer.Factory<T>(),
+					new ReduceAllWindowFunction<W, T>(function),
+					trigger,
+					evictor);
+
+		} else {
+			// we need to copy because we need our own instance of the pre aggregator
+			@SuppressWarnings("unchecked")
+			ReduceFunction<T> functionCopy = (ReduceFunction<T>) SerializationUtils.clone(function);
+
+			operator = new NonKeyedWindowOperator<>(windowAssigner,
+					new PreAggregatingHeapWindowBuffer.Factory<>(functionCopy),
+					new ReduceAllWindowFunction<W, T>(function),
+					trigger);
+		}
+
+		return input.transform(opName, input.getType(), operator).setParallelism(1);
+	}
+
+	/**
+	 * Applies a window function to the window. The window function is called for each evaluation
+	 * of the window for each key individually. The output of the window function is interpreted
+	 * as a regular non-windowed stream.
+	 * <p>
+	 * Not that this function requires that all data in the windows is buffered until the window
+	 * is evaluated, as the function provides no means of pre-aggregation.
+	 * 
+	 * @param function The window function.
+	 * @return The data stream that is the result of applying the window function to the window.
+	 */
+	public <R> DataStream<R> apply(AllWindowFunction<T, R, W> function) {
+		TypeInformation<T> inType = input.getType();
+		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
+				function, AllWindowFunction.class, true, true, inType, null, false);
+
+		return apply(function, resultType);
+	}
+
+	/**
+	 * Applies the given window function to each window. The window function is called for each evaluation
+	 * of the window for each key individually. The output of the window function is interpreted
+	 * as a regular non-windowed stream.
+	 * <p>
+	 * Not that this function requires that all data in the windows is buffered until the window
+	 * is evaluated, as the function provides no means of pre-aggregation.
+	 *
+	 * @param function The window function.
+	 * @return The data stream that is the result of applying the window function to the window.
+	 */
+	public <R> DataStream<R> apply(AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
+		String callLocation = Utils.getCallLocationName();
+		String udfName = "MapWindow at " + callLocation;
+
+		DataStream<R> result = createFastTimeOperatorIfValid(function, resultType, udfName);
+		if (result != null) {
+			return result;
+		}
+
+
+		String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
+
+		OneInputStreamOperator<T, R> operator;
+
+		if (evictor != null) {
+			operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
+					new HeapWindowBuffer.Factory<T>(),
+					function,
+					trigger,
+					evictor);
+
+		} else {
+			operator = new NonKeyedWindowOperator<>(windowAssigner,
+					new HeapWindowBuffer.Factory<T>(),
+					function,
+					trigger);
+		}
+
+		return input.transform(opName, resultType, operator).setParallelism(1);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+
+	private <R> DataStream<R> createFastTimeOperatorIfValid(
+			Function function,
+			TypeInformation<R> resultType,
+			String functionName) {
+
+		// TODO: add once non-parallel fast aligned time windows operator is ready
+		return null;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index c2be055..ad159f9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -233,15 +233,15 @@ public class DataStream<T> {
 
 	/**
 	 * 
-	 * It creates a new {@link KeyedDataStream} that uses the provided key for partitioning
+	 * It creates a new {@link KeyedStream} that uses the provided key for partitioning
 	 * its operator states. 
 	 *
 	 * @param key
 	 *            The KeySelector to be used for extracting the key for partitioning
-	 * @return The {@link DataStream} with partitioned state (i.e. KeyedDataStream)
+	 * @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
 	 */
-	public <K> KeyedDataStream<T, K> keyBy(KeySelector<T, K> key){
-		return new KeyedDataStream<T, K>(this, clean(key));
+	public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key){
+		return new KeyedStream<T, K>(this, clean(key));
 	}
 
 	/**
@@ -250,9 +250,9 @@ public class DataStream<T> {
 	 * @param fields
 	 *            The position of the fields on which the {@link DataStream}
 	 *            will be grouped.
-	 * @return The {@link DataStream} with partitioned state (i.e. KeyedDataStream)
+	 * @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
 	 */
-	public KeyedDataStream<T, Tuple> keyBy(int... fields) {
+	public KeyedStream<T, Tuple> keyBy(int... fields) {
 		if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
 			return keyBy(new KeySelectorUtil.ArrayKeySelector<T>(fields));
 		} else {
@@ -269,14 +269,14 @@ public class DataStream<T> {
 	 * @param fields
 	 *            One or more field expressions on which the state of the {@link DataStream} operators will be
 	 *            partitioned.
-	 * @return The {@link DataStream} with partitioned state (i.e. KeyedDataStream)
+	 * @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
 	 **/
-	public KeyedDataStream<T, Tuple> keyBy(String... fields) {
+	public KeyedStream<T, Tuple> keyBy(String... fields) {
 		return keyBy(new Keys.ExpressionKeys<T>(fields, getType()));
 	}
 
-	private KeyedDataStream<T, Tuple> keyBy(Keys<T> keys) {
-		return new KeyedDataStream<T, Tuple>(this, clean(KeySelectorUtil.getSelectorForKeys(keys,
+	private KeyedStream<T, Tuple> keyBy(Keys<T> keys) {
+		return new KeyedStream<T, Tuple>(this, clean(KeySelectorUtil.getSelectorForKeys(keys,
 				getType(), getExecutionConfig())));
 	}
 	
@@ -288,7 +288,7 @@ public class DataStream<T> {
 	 * @param fields
 	 *            The position of the fields on which the states of the {@link DataStream}
 	 *            will be partitioned.
-	 * @return The {@link DataStream} with partitioned state (i.e. KeyedDataStream)
+	 * @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
 	 */
 	public GroupedDataStream<T, Tuple> groupBy(int... fields) {
 		if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
@@ -816,7 +816,7 @@ public class DataStream<T> {
 	}
 
 	/**
-	 * Windows this {@code KeyedDataStream} into tumbling time windows.
+	 * Windows this {@code DataStream} into tumbling time windows.
 	 *
 	 * <p>
 	 * This is a shortcut for either {@code .window(TumblingTimeWindows.of(size))} or
@@ -826,7 +826,7 @@ public class DataStream<T> {
 	 *
 	 * @param size The size of the window.
 	 */
-	public NonParallelWindowDataStream<T, TimeWindow> timeWindowAll(AbstractTime size) {
+	public AllWindowedStream<T, TimeWindow> timeWindowAll(AbstractTime size) {
 		AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
 
 		if (actualSize instanceof EventTime) {
@@ -837,7 +837,7 @@ public class DataStream<T> {
 	}
 
 	/**
-	 * Windows this {@code KeyedDataStream} into sliding time windows.
+	 * Windows this {@code DataStream} into sliding time windows.
 	 *
 	 * <p>
 	 * This is a shortcut for either {@code .window(SlidingTimeWindows.of(size, slide))} or
@@ -847,7 +847,7 @@ public class DataStream<T> {
 	 *
 	 * @param size The size of the window.
 	 */
-	public NonParallelWindowDataStream<T, TimeWindow> timeWindowAll(AbstractTime size, AbstractTime slide) {
+	public AllWindowedStream<T, TimeWindow> timeWindowAll(AbstractTime size, AbstractTime slide) {
 		AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
 		AbstractTime actualSlide = slide.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
 
@@ -879,8 +879,8 @@ public class DataStream<T> {
 	 * @param assigner The {@code WindowAssigner} that assigns elements to windows.
 	 * @return The trigger windows data stream.
 	 */
-	public <W extends Window> NonParallelWindowDataStream<T, W> windowAll(WindowAssigner<? super T, W> assigner) {
-		return new NonParallelWindowDataStream<>(this, assigner);
+	public <W extends Window> AllWindowedStream<T, W> windowAll(WindowAssigner<? super T, W> assigner) {
+		return new AllWindowedStream<>(this, assigner);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index fde5a6d..ebaeb56 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -39,7 +39,7 @@ import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
  * @param <T> The type of the elements in the Grouped Stream.
  * @param <KEY> The type of the key in the Keyed Stream.
  */
-public class GroupedDataStream<T, KEY> extends KeyedDataStream<T, KEY> {
+public class GroupedDataStream<T, KEY> extends KeyedStream<T, KEY> {
 
 	/**
 	 * Creates a new {@link GroupedDataStream}, group inclusion is determined using

http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
deleted file mode 100644
index 2ae07b2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.transformations.OneInputTransformation;
-import org.apache.flink.streaming.api.transformations.PartitionTransformation;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
-import org.apache.flink.streaming.api.windowing.time.AbstractTime;
-import org.apache.flink.streaming.api.windowing.time.EventTime;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.partitioner.HashPartitioner;
-import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
-
-/**
- * A KeyedDataStream represents a {@link DataStream} on which operator state is
- * partitioned by key using a provided {@link KeySelector}. Typical operations supported by a {@link DataStream}
- * are also possible on a KeyedDataStream, with the exception of partitioning methods such as shuffle, forward and groupBy.
- * 
- * 
- * @param <T> The type of the elements in the Keyed Stream.
- * @param <KEY> The type of the key in the Keyed Stream.
- */
-public class KeyedDataStream<T, KEY> extends DataStream<T> {
-	
-	protected final KeySelector<T, KEY> keySelector;
-
-	/**
-	 * Creates a new {@link KeyedDataStream} using the given {@link KeySelector}
-	 * to partition operator state by key.
-	 * 
-	 * @param dataStream
-	 *            Base stream of data
-	 * @param keySelector
-	 *            Function for determining state partitions
-	 */
-	public KeyedDataStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector) {
-		super(dataStream.getExecutionEnvironment(), new PartitionTransformation<T>(dataStream.getTransformation(), new HashPartitioner<T>(keySelector)));
-		this.keySelector = keySelector;
-	}
-
-	
-	public KeySelector<T, KEY> getKeySelector() {
-		return this.keySelector;
-	}
-
-	
-	@Override
-	protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
-		throw new UnsupportedOperationException("Cannot override partitioning for KeyedDataStream.");
-	}
-
-	
-	@Override
-	public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName,
-			TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
-
-		SingleOutputStreamOperator<R, ?> returnStream = super.transform(operatorName, outTypeInfo,operator);
-
-		((OneInputTransformation<T, R>) returnStream.getTransformation()).setStateKeySelector(keySelector);
-		return returnStream;
-	}
-
-	
-	
-	@Override
-	public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
-		DataStreamSink<T> result = super.addSink(sinkFunction);
-		result.getTransformation().setStateKeySelector(keySelector);
-		return result;
-	}
-	
-	// ------------------------------------------------------------------------
-	//  Windowing
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Windows this {@code KeyedDataStream} into tumbling time windows.
-	 *
-	 * <p>
-	 * This is a shortcut for either {@code .window(TumblingTimeWindows.of(size))} or
-	 * {@code .window(TumblingProcessingTimeWindows.of(size))} depending on the time characteristic
-	 * set using
-	 * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
-	 *
-	 * @param size The size of the window.
-	 */
-	public KeyedWindowDataStream<T, KEY, TimeWindow> timeWindow(AbstractTime size) {
-		AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
-
-		if (actualSize instanceof EventTime) {
-			return window(TumblingTimeWindows.of(actualSize.toMilliseconds()));
-		} else {
-			return window(TumblingProcessingTimeWindows.of(actualSize.toMilliseconds()));
-		}
-	}
-
-	/**
-	 * Windows this {@code KeyedDataStream} into sliding time windows.
-	 *
-	 * <p>
-	 * This is a shortcut for either {@code .window(SlidingTimeWindows.of(size, slide))} or
-	 * {@code .window(SlidingProcessingTimeWindows.of(size, slide))} depending on the time characteristic
-	 * set using
-	 * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
-	 *
-	 * @param size The size of the window.
-	 */
-	public KeyedWindowDataStream<T, KEY, TimeWindow> timeWindow(AbstractTime size, AbstractTime slide) {
-		AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
-		AbstractTime actualSlide = slide.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
-
-		if (actualSize instanceof EventTime) {
-			return window(SlidingTimeWindows.of(actualSize.toMilliseconds(), actualSlide.toMilliseconds()));
-		} else {
-			return window(SlidingProcessingTimeWindows.of(actualSize.toMilliseconds(), actualSlide.toMilliseconds()));
-		}
-	}
-
-	/**
-	 * Windows this data stream to a {@code KeyedWindowDataStream}, which evaluates windows
-	 * over a key grouped stream. Elements are put into windows by a {@link WindowAssigner}. The
-	 * grouping of elements is done both by key and by window.
-	 *
-	 * <p>
-	 * A {@link org.apache.flink.streaming.api.windowing.triggers.Trigger} can be defined to specify
-	 * when windows are evaluated. However, {@code WindowAssigners} have a default {@code Trigger}
-	 * that is used if a {@code Trigger} is not specified.
-	 *
-	 * @param assigner The {@code WindowAssigner} that assigns elements to windows.
-	 * @return The trigger windows data stream.
-	 */
-	public <W extends Window> KeyedWindowDataStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
-		return new KeyedWindowDataStream<>(this, assigner);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
new file mode 100644
index 0000000..b3cfb55
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.time.AbstractTime;
+import org.apache.flink.streaming.api.windowing.time.EventTime;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.partitioner.HashPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+
+/**
+ * A {@code KeyedStream} represents a {@link DataStream} on which operator state is
+ * partitioned by key using a provided {@link KeySelector}. Typical operations supported by a
+ * {@code DataStream} are also possible on a {@code KeyedStream}, with the exception of
+ * partitioning methods such as shuffle, forward and groupBy.
+ * 
+ * 
+ * @param <T> The type of the elements in the Keyed Stream.
+ * @param <KEY> The type of the key in the Keyed Stream.
+ */
+public class KeyedStream<T, KEY> extends DataStream<T> {
+	
+	protected final KeySelector<T, KEY> keySelector;
+
+	/**
+	 * Creates a new {@link KeyedStream} using the given {@link KeySelector}
+	 * to partition operator state by key.
+	 * 
+	 * @param dataStream
+	 *            Base stream of data
+	 * @param keySelector
+	 *            Function for determining state partitions
+	 */
+	public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector) {
+		super(dataStream.getExecutionEnvironment(), new PartitionTransformation<T>(dataStream.getTransformation(), new HashPartitioner<T>(keySelector)));
+		this.keySelector = keySelector;
+	}
+
+	
+	public KeySelector<T, KEY> getKeySelector() {
+		return this.keySelector;
+	}
+
+	
+	@Override
+	protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
+		throw new UnsupportedOperationException("Cannot override partitioning for KeyedStream.");
+	}
+
+	
+	@Override
+	public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName,
+			TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
+
+		SingleOutputStreamOperator<R, ?> returnStream = super.transform(operatorName, outTypeInfo,operator);
+
+		((OneInputTransformation<T, R>) returnStream.getTransformation()).setStateKeySelector(keySelector);
+		return returnStream;
+	}
+
+	
+	
+	@Override
+	public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
+		DataStreamSink<T> result = super.addSink(sinkFunction);
+		result.getTransformation().setStateKeySelector(keySelector);
+		return result;
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Windowing
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Windows this {@code KeyedStream} into tumbling time windows.
+	 *
+	 * <p>
+	 * This is a shortcut for either {@code .window(TumblingTimeWindows.of(size))} or
+	 * {@code .window(TumblingProcessingTimeWindows.of(size))} depending on the time characteristic
+	 * set using
+	 * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
+	 *
+	 * @param size The size of the window.
+	 */
+	public WindowedStream<T, KEY, TimeWindow> timeWindow(AbstractTime size) {
+		AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
+
+		if (actualSize instanceof EventTime) {
+			return window(TumblingTimeWindows.of(actualSize.toMilliseconds()));
+		} else {
+			return window(TumblingProcessingTimeWindows.of(actualSize.toMilliseconds()));
+		}
+	}
+
+	/**
+	 * Windows this {@code KeyedStream} into sliding time windows.
+	 *
+	 * <p>
+	 * This is a shortcut for either {@code .window(SlidingTimeWindows.of(size, slide))} or
+	 * {@code .window(SlidingProcessingTimeWindows.of(size, slide))} depending on the time characteristic
+	 * set using
+	 * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
+	 *
+	 * @param size The size of the window.
+	 */
+	public WindowedStream<T, KEY, TimeWindow> timeWindow(AbstractTime size, AbstractTime slide) {
+		AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
+		AbstractTime actualSlide = slide.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
+
+		if (actualSize instanceof EventTime) {
+			return window(SlidingTimeWindows.of(actualSize.toMilliseconds(), actualSlide.toMilliseconds()));
+		} else {
+			return window(SlidingProcessingTimeWindows.of(actualSize.toMilliseconds(), actualSlide.toMilliseconds()));
+		}
+	}
+
+	/**
+	 * Windows this data stream to a {@code WindowedStream}, which evaluates windows
+	 * over a key grouped stream. Elements are put into windows by a {@link WindowAssigner}. The
+	 * grouping of elements is done both by key and by window.
+	 *
+	 * <p>
+	 * A {@link org.apache.flink.streaming.api.windowing.triggers.Trigger} can be defined to specify
+	 * when windows are evaluated. However, {@code WindowAssigners} have a default {@code Trigger}
+	 * that is used if a {@code Trigger} is not specified.
+	 *
+	 * @param assigner The {@code WindowAssigner} that assigns elements to windows.
+	 * @return The trigger windows data stream.
+	 */
+	public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
+		return new WindowedStream<>(this, assigner);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
deleted file mode 100644
index 9d05b8c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
+++ /dev/null
@@ -1,287 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.commons.lang.SerializationUtils;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.Utils;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.ReduceKeyedWindowFunction;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
-import org.apache.flink.streaming.api.windowing.evictors.Evictor;
-import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
-import org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator;
-import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator;
-import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
-
-/**
- * A {@code KeyedWindowDataStream} represents a data stream where elements are grouped by
- * key, and for each key, the stream of elements is split into windows based on a
- * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. Window emission
- * is triggered based on a {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.
- *
- * <p>
- * The windows are conceptually evaluated for each key individually, meaning windows can trigger at
- * different points for each key.
- *
- * <p>
- * If an {@link Evictor} is specified it will be used to evict elements from the window after
- * evaluation was triggered by the {@code Trigger} but before the actual evaluation of the window.
- * When using an evictor window performance will degrade significantly, since
- * pre-aggregation of window results cannot be used.
- *
- * <p>
- * Note that the {@code KeyedWindowDataStream} is purely and API construct, during runtime
- * the {@code KeyedWindowDataStream} will be collapsed together with the
- * {@code KeyedDataStream} and the operation over the window into one single operation.
- * 
- * @param <T> The type of elements in the stream.
- * @param <K> The type of the key by which elements are grouped.
- * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns the elements to.
- */
-public class KeyedWindowDataStream<T, K, W extends Window> {
-
-	/** The keyed data stream that is windowed by this stream */
-	private final KeyedDataStream<T, K> input;
-
-	/** The window assigner */
-	private final WindowAssigner<? super T, W> windowAssigner;
-
-	/** The trigger that is used for window evaluation/emission. */
-	private Trigger<? super T, ? super W> trigger;
-
-	/** The evictor that is used for evicting elements before window evaluation. */
-	private Evictor<? super T, ? super W> evictor;
-
-
-	public KeyedWindowDataStream(KeyedDataStream<T, K> input,
-			WindowAssigner<? super T, W> windowAssigner) {
-		this.input = input;
-		this.windowAssigner = windowAssigner;
-		this.trigger = windowAssigner.getDefaultTrigger();
-	}
-
-	/**
-	 * Sets the {@code Trigger} that should be used to trigger window emission.
-	 */
-	public KeyedWindowDataStream<T, K, W> trigger(Trigger<? super T, ? super W> trigger) {
-		this.trigger = trigger;
-		return this;
-	}
-
-	/**
-	 * Sets the {@code Evictor} that should be used to evict elements from a window before emission.
-	 *
-	 * <p>
-	 * Note: When using an evictor window performance will degrade significantly, since
-	 * pre-aggregation of window results cannot be used.
-	 */
-	public KeyedWindowDataStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor) {
-		this.evictor = evictor;
-		return this;
-	}
-
-
-	// ------------------------------------------------------------------------
-	//  Operations on the keyed windows
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Applies a reduce function to the window. The window function is called for each evaluation
-	 * of the window for each key individually. The output of the reduce function is interpreted
-	 * as a regular non-windowed stream.
-	 * <p>
-	 * This window will try and pre-aggregate data as much as the window policies permit. For example,
-	 * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
-	 * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide interval,
-	 * so a few elements are stored per key (one per slide interval).
-	 * Custom windows may not be able to pre-aggregate, or may need to store extra values in an
-	 * aggregation tree.
-	 * 
-	 * @param function The reduce function.
-	 * @return The data stream that is the result of applying the reduce function to the window. 
-	 */
-	public DataStream<T> reduceWindow(ReduceFunction<T> function) {
-		String callLocation = Utils.getCallLocationName();
-		String udfName = "Reduce at " + callLocation;
-
-		DataStream<T> result = createFastTimeOperatorIfValid(function, input.getType(), udfName);
-		if (result != null) {
-			return result;
-		}
-
-		String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
-		KeySelector<T, K> keySel = input.getKeySelector();
-
-		OneInputStreamOperator<T, T> operator;
-
-		if (evictor != null) {
-			operator = new EvictingWindowOperator<>(windowAssigner,
-					keySel,
-					new HeapWindowBuffer.Factory<T>(),
-					new ReduceKeyedWindowFunction<K, W, T>(function),
-					trigger,
-					evictor);
-
-		} else {
-			// we need to copy because we need our own instance of the pre aggregator
-			@SuppressWarnings("unchecked")
-			ReduceFunction<T> functionCopy = (ReduceFunction<T>) SerializationUtils.clone(function);
-
-			operator = new WindowOperator<>(windowAssigner,
-					keySel,
-					new PreAggregatingHeapWindowBuffer.Factory<>(functionCopy),
-					new ReduceKeyedWindowFunction<K, W, T>(function),
-					trigger);
-		}
-
-		return input.transform(opName, input.getType(), operator);
-	}
-
-	/**
-	 * Applies a window function to the window. The window function is called for each evaluation
-	 * of the window for each key individually. The output of the window function is interpreted
-	 * as a regular non-windowed stream.
-	 * <p>
-	 * Not that this function requires that all data in the windows is buffered until the window
-	 * is evaluated, as the function provides no means of pre-aggregation.
-	 * 
-	 * @param function The window function.
-	 * @return The data stream that is the result of applying the window function to the window.
-	 */
-	public <R> DataStream<R> mapWindow(KeyedWindowFunction<T, R, K, W> function) {
-		// clean the closure
-		function = input.getExecutionEnvironment().clean(function);
-		
-		TypeInformation<T> inType = input.getType();
-		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
-				function, KeyedWindowFunction.class, true, true, inType, null, false);
-
-		String callLocation = Utils.getCallLocationName();
-		String udfName = "MapWindow at " + callLocation;
-
-		DataStream<R> result = createFastTimeOperatorIfValid(function, resultType, udfName);
-		if (result != null) {
-			return result;
-		}
-
-
-		String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
-		KeySelector<T, K> keySel = input.getKeySelector();
-
-		OneInputStreamOperator<T, R> operator;
-
-		if (evictor != null) {
-			operator = new EvictingWindowOperator<>(windowAssigner,
-					keySel,
-					new HeapWindowBuffer.Factory<T>(),
-					function,
-					trigger,
-					evictor);
-
-		} else {
-			operator = new WindowOperator<>(windowAssigner,
-					keySel,
-					new HeapWindowBuffer.Factory<T>(),
-					function,
-					trigger);
-		}
-
-
-
-		return input.transform(opName, resultType, operator);
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	private <R> DataStream<R> createFastTimeOperatorIfValid(
-			Function function,
-			TypeInformation<R> resultType,
-			String functionName) {
-
-		if (windowAssigner instanceof SlidingProcessingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
-			SlidingProcessingTimeWindows timeWindows = (SlidingProcessingTimeWindows) windowAssigner;
-			final long windowLength = timeWindows.getSize();
-			final long windowSlide = timeWindows.getSlide();
-
-			String opName = "Fast " + timeWindows + " of " + functionName;
-
-			if (function instanceof ReduceFunction) {
-				@SuppressWarnings("unchecked")
-				ReduceFunction<T> reducer = (ReduceFunction<T>) function;
-
-				@SuppressWarnings("unchecked")
-				OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
-						new AggregatingProcessingTimeWindowOperator<>(
-								reducer, input.getKeySelector(), windowLength, windowSlide);
-				return input.transform(opName, resultType, op);
-			}
-			else if (function instanceof KeyedWindowFunction) {
-				@SuppressWarnings("unchecked")
-				KeyedWindowFunction<T, R, K, TimeWindow> wf = (KeyedWindowFunction<T, R, K, TimeWindow>) function;
-
-				OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
-						wf, input.getKeySelector(), windowLength, windowSlide);
-				return input.transform(opName, resultType, op);
-			}
-		} else if (windowAssigner instanceof TumblingProcessingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
-			TumblingProcessingTimeWindows timeWindows = (TumblingProcessingTimeWindows) windowAssigner;
-			final long windowLength = timeWindows.getSize();
-			final long windowSlide = timeWindows.getSize();
-
-			String opName = "Fast " + timeWindows + " of " + functionName;
-
-			if (function instanceof ReduceFunction) {
-				@SuppressWarnings("unchecked")
-				ReduceFunction<T> reducer = (ReduceFunction<T>) function;
-
-				@SuppressWarnings("unchecked")
-				OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
-						new AggregatingProcessingTimeWindowOperator<>(
-								reducer, input.getKeySelector(), windowLength, windowSlide);
-				return input.transform(opName, resultType, op);
-			}
-			else if (function instanceof KeyedWindowFunction) {
-				@SuppressWarnings("unchecked")
-				KeyedWindowFunction<T, R, K, TimeWindow> wf = (KeyedWindowFunction<T, R, K, TimeWindow>) function;
-
-				OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
-						wf, input.getKeySelector(), windowLength, windowSlide);
-				return input.transform(opName, resultType, op);
-			}
-		}
-
-		return null;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/NonParallelWindowDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/NonParallelWindowDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/NonParallelWindowDataStream.java
deleted file mode 100644
index 5cb3b6b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/NonParallelWindowDataStream.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.commons.lang.SerializationUtils;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.Utils;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
-import org.apache.flink.streaming.api.windowing.evictors.Evictor;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.operators.windowing.EvictingNonKeyedWindowOperator;
-import org.apache.flink.streaming.runtime.operators.windowing.NonKeyedWindowOperator;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
-
-/**
- * A {@code NonParallelWindowDataStream} represents a data stream where the stream of
- * elements is split into windows based on a
- * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. Window emission
- * is triggered based on a {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.
- *
- * <p>
- * If an {@link org.apache.flink.streaming.api.windowing.evictors.Evictor} is specified it will be
- * used to evict elements from the window after
- * evaluation was triggered by the {@code Trigger} but before the actual evaluation of the window.
- * When using an evictor window performance will degrade significantly, since
- * pre-aggregation of window results cannot be used.
- *
- * <p>
- * Note that the {@code NonParallelWindowDataStream} is purely and API construct, during runtime
- * the {@code NonParallelWindowDataStream} will be collapsed together with the
- * operation over the window into one single operation.
- *
- * @param <T> The type of elements in the stream.
- * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns the elements to.
- */
-public class NonParallelWindowDataStream<T, W extends Window> {
-
-	/** The data stream that is windowed by this stream */
-	private final DataStream<T> input;
-
-	/** The window assigner */
-	private final WindowAssigner<? super T, W> windowAssigner;
-
-	/** The trigger that is used for window evaluation/emission. */
-	private Trigger<? super T, ? super W> trigger;
-
-	/** The evictor that is used for evicting elements before window evaluation. */
-	private Evictor<? super T, ? super W> evictor;
-
-
-	public NonParallelWindowDataStream(DataStream<T> input,
-			WindowAssigner<? super T, W> windowAssigner) {
-		this.input = input;
-		this.windowAssigner = windowAssigner;
-		this.trigger = windowAssigner.getDefaultTrigger();
-	}
-
-	/**
-	 * Sets the {@code Trigger} that should be used to trigger window emission.
-	 */
-	public NonParallelWindowDataStream<T, W> trigger(Trigger<? super T, ? super W> trigger) {
-		this.trigger = trigger;
-		return this;
-	}
-
-	/**
-	 * Sets the {@code Evictor} that should be used to evict elements from a window before emission.
-	 *
-	 * <p>
-	 * Note: When using an evictor window performance will degrade significantly, since
-	 * pre-aggregation of window results cannot be used.
-	 */
-	public NonParallelWindowDataStream<T, W> evictor(Evictor<? super T, ? super W> evictor) {
-		this.evictor = evictor;
-		return this;
-	}
-
-
-	// ------------------------------------------------------------------------
-	//  Operations on the keyed windows
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Applies a reduce function to the window. The window function is called for each evaluation
-	 * of the window for each key individually. The output of the reduce function is interpreted
-	 * as a regular non-windowed stream.
-	 * <p>
-	 * This window will try and pre-aggregate data as much as the window policies permit. For example,
-	 * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
-	 * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide interval,
-	 * so a few elements are stored per key (one per slide interval).
-	 * Custom windows may not be able to pre-aggregate, or may need to store extra values in an
-	 * aggregation tree.
-	 * 
-	 * @param function The reduce function.
-	 * @return The data stream that is the result of applying the reduce function to the window. 
-	 */
-	public DataStream<T> reduceWindow(ReduceFunction<T> function) {
-		String callLocation = Utils.getCallLocationName();
-		String udfName = "Reduce at " + callLocation;
-
-		DataStream<T> result = createFastTimeOperatorIfValid(function, input.getType(), udfName);
-		if (result != null) {
-			return result;
-		}
-
-		String opName = "NonParallelTriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
-
-		OneInputStreamOperator<T, T> operator;
-
-		if (evictor != null) {
-			operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
-					new HeapWindowBuffer.Factory<T>(),
-					new ReduceWindowFunction<W, T>(function),
-					trigger,
-					evictor);
-
-		} else {
-			// we need to copy because we need our own instance of the pre aggregator
-			@SuppressWarnings("unchecked")
-			ReduceFunction<T> functionCopy = (ReduceFunction<T>) SerializationUtils.clone(function);
-
-			operator = new NonKeyedWindowOperator<>(windowAssigner,
-					new PreAggregatingHeapWindowBuffer.Factory<>(functionCopy),
-					new ReduceWindowFunction<W, T>(function),
-					trigger);
-		}
-
-		return input.transform(opName, input.getType(), operator).setParallelism(1);
-	}
-
-	/**
-	 * Applies a window function to the window. The window function is called for each evaluation
-	 * of the window for each key individually. The output of the window function is interpreted
-	 * as a regular non-windowed stream.
-	 * <p>
-	 * Not that this function requires that all data in the windows is buffered until the window
-	 * is evaluated, as the function provides no means of pre-aggregation.
-	 * 
-	 * @param function The window function.
-	 * @return The data stream that is the result of applying the window function to the window.
-	 */
-	public <R> DataStream<R> mapWindow(WindowFunction<T, R, W> function) {
-		TypeInformation<T> inType = input.getType();
-		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
-				function, WindowFunction.class, true, true, inType, null, false);
-
-		String callLocation = Utils.getCallLocationName();
-		String udfName = "MapWindow at " + callLocation;
-
-		DataStream<R> result = createFastTimeOperatorIfValid(function, resultType, udfName);
-		if (result != null) {
-			return result;
-		}
-
-
-		String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
-
-		OneInputStreamOperator<T, R> operator;
-
-		if (evictor != null) {
-			operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
-					new HeapWindowBuffer.Factory<T>(),
-					function,
-					trigger,
-					evictor);
-
-		} else {
-			operator = new NonKeyedWindowOperator<>(windowAssigner,
-					new HeapWindowBuffer.Factory<T>(),
-					function,
-					trigger);
-		}
-
-
-
-		return input.transform(opName, resultType, operator).setParallelism(1);
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-
-	private <R> DataStream<R> createFastTimeOperatorIfValid(
-			Function function,
-			TypeInformation<R> resultType,
-			String functionName) {
-
-		// TODO: add once non-parallel fast aligned time windows operator is ready
-		return null;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
new file mode 100644
index 0000000..16898dd
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.evictors.Evictor;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
+
+/**
+ * A {@code WindowedStream} represents a data stream where elements are grouped by
+ * key, and for each key, the stream of elements is split into windows based on a
+ * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. Window emission
+ * is triggered based on a {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.
+ *
+ * <p>
+ * The windows are conceptually evaluated for each key individually, meaning windows can trigger at
+ * different points for each key.
+ *
+ * <p>
+ * If an {@link Evictor} is specified it will be used to evict elements from the window after
+ * evaluation was triggered by the {@code Trigger} but before the actual evaluation of the window.
+ * When using an evictor window performance will degrade significantly, since
+ * pre-aggregation of window results cannot be used.
+ *
+ * <p>
+ * Note that the {@code WindowedStream} is purely and API construct, during runtime
+ * the {@code WindowedStream} will be collapsed together with the
+ * {@code KeyedStream} and the operation over the window into one single operation.
+ * 
+ * @param <T> The type of elements in the stream.
+ * @param <K> The type of the key by which elements are grouped.
+ * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns the elements to.
+ */
+public class WindowedStream<T, K, W extends Window> {
+
+	/** The keyed data stream that is windowed by this stream */
+	private final KeyedStream<T, K> input;
+
+	/** The window assigner */
+	private final WindowAssigner<? super T, W> windowAssigner;
+
+	/** The trigger that is used for window evaluation/emission. */
+	private Trigger<? super T, ? super W> trigger;
+
+	/** The evictor that is used for evicting elements before window evaluation. */
+	private Evictor<? super T, ? super W> evictor;
+
+
+	public WindowedStream(KeyedStream<T, K> input,
+			WindowAssigner<? super T, W> windowAssigner) {
+		this.input = input;
+		this.windowAssigner = windowAssigner;
+		this.trigger = windowAssigner.getDefaultTrigger();
+	}
+
+	/**
+	 * Sets the {@code Trigger} that should be used to trigger window emission.
+	 */
+	public WindowedStream<T, K, W> trigger(Trigger<? super T, ? super W> trigger) {
+		this.trigger = trigger;
+		return this;
+	}
+
+	/**
+	 * Sets the {@code Evictor} that should be used to evict elements from a window before emission.
+	 *
+	 * <p>
+	 * Note: When using an evictor window performance will degrade significantly, since
+	 * pre-aggregation of window results cannot be used.
+	 */
+	public WindowedStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor) {
+		this.evictor = evictor;
+		return this;
+	}
+
+
+	// ------------------------------------------------------------------------
+	//  Operations on the keyed windows
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Applies a reduce function to the window. The window function is called for each evaluation
+	 * of the window for each key individually. The output of the reduce function is interpreted
+	 * as a regular non-windowed stream.
+	 * <p>
+	 * This window will try and pre-aggregate data as much as the window policies permit. For example,
+	 * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
+	 * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide interval,
+	 * so a few elements are stored per key (one per slide interval).
+	 * Custom windows may not be able to pre-aggregate, or may need to store extra values in an
+	 * aggregation tree.
+	 * 
+	 * @param function The reduce function.
+	 * @return The data stream that is the result of applying the reduce function to the window. 
+	 */
+	public DataStream<T> reduceWindow(ReduceFunction<T> function) {
+		String callLocation = Utils.getCallLocationName();
+		String udfName = "Reduce at " + callLocation;
+
+		DataStream<T> result = createFastTimeOperatorIfValid(function, input.getType(), udfName);
+		if (result != null) {
+			return result;
+		}
+
+		String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
+		KeySelector<T, K> keySel = input.getKeySelector();
+
+		OneInputStreamOperator<T, T> operator;
+
+		if (evictor != null) {
+			operator = new EvictingWindowOperator<>(windowAssigner,
+					keySel,
+					new HeapWindowBuffer.Factory<T>(),
+					new ReduceWindowFunction<K, W, T>(function),
+					trigger,
+					evictor);
+
+		} else {
+			// we need to copy because we need our own instance of the pre aggregator
+			@SuppressWarnings("unchecked")
+			ReduceFunction<T> functionCopy = (ReduceFunction<T>) SerializationUtils.clone(function);
+
+			operator = new WindowOperator<>(windowAssigner,
+					keySel,
+					new PreAggregatingHeapWindowBuffer.Factory<>(functionCopy),
+					new ReduceWindowFunction<K, W, T>(function),
+					trigger);
+		}
+
+		return input.transform(opName, input.getType(), operator);
+	}
+
+	/**
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * <p>
+	 * Not that this function requires that all data in the windows is buffered until the window
+	 * is evaluated, as the function provides no means of pre-aggregation.
+	 * 
+	 * @param function The window function.
+	 * @return The data stream that is the result of applying the window function to the window.
+	 */
+	public <R> DataStream<R> apply(WindowFunction<T, R, K, W> function) {
+		TypeInformation<T> inType = input.getType();
+		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
+				function, WindowFunction.class, true, true, inType, null, false);
+
+		return apply(function, resultType);
+	}
+
+	/**
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * <p>
+	 * Not that this function requires that all data in the windows is buffered until the window
+	 * is evaluated, as the function provides no means of pre-aggregation.
+	 *
+	 * @param function The window function.
+	 * @return The data stream that is the result of applying the window function to the window.
+	 */
+	public <R> DataStream<R> apply(WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
+		//clean the closure
+		function = input.getExecutionEnvironment().clean(function);
+
+		String callLocation = Utils.getCallLocationName();
+		String udfName = "MapWindow at " + callLocation;
+
+		DataStream<R> result = createFastTimeOperatorIfValid(function, resultType, udfName);
+		if (result != null) {
+			return result;
+		}
+
+
+		String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
+		KeySelector<T, K> keySel = input.getKeySelector();
+
+		OneInputStreamOperator<T, R> operator;
+
+		if (evictor != null) {
+			operator = new EvictingWindowOperator<>(windowAssigner,
+					keySel,
+					new HeapWindowBuffer.Factory<T>(),
+					function,
+					trigger,
+					evictor);
+
+		} else {
+			operator = new WindowOperator<>(windowAssigner,
+					keySel,
+					new HeapWindowBuffer.Factory<T>(),
+					function,
+					trigger);
+		}
+
+		return input.transform(opName, resultType, operator);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	private <R> DataStream<R> createFastTimeOperatorIfValid(
+			Function function,
+			TypeInformation<R> resultType,
+			String functionName) {
+
+		if (windowAssigner instanceof SlidingProcessingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
+			SlidingProcessingTimeWindows timeWindows = (SlidingProcessingTimeWindows) windowAssigner;
+			final long windowLength = timeWindows.getSize();
+			final long windowSlide = timeWindows.getSlide();
+
+			String opName = "Fast " + timeWindows + " of " + functionName;
+
+			if (function instanceof ReduceFunction) {
+				@SuppressWarnings("unchecked")
+				ReduceFunction<T> reducer = (ReduceFunction<T>) function;
+
+				@SuppressWarnings("unchecked")
+				OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
+						new AggregatingProcessingTimeWindowOperator<>(
+								reducer, input.getKeySelector(), windowLength, windowSlide);
+				return input.transform(opName, resultType, op);
+			}
+			else if (function instanceof WindowFunction) {
+				@SuppressWarnings("unchecked")
+				WindowFunction<T, R, K, TimeWindow> wf = (WindowFunction<T, R, K, TimeWindow>) function;
+
+				OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
+						wf, input.getKeySelector(), windowLength, windowSlide);
+				return input.transform(opName, resultType, op);
+			}
+		} else if (windowAssigner instanceof TumblingProcessingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
+			TumblingProcessingTimeWindows timeWindows = (TumblingProcessingTimeWindows) windowAssigner;
+			final long windowLength = timeWindows.getSize();
+			final long windowSlide = timeWindows.getSize();
+
+			String opName = "Fast " + timeWindows + " of " + functionName;
+
+			if (function instanceof ReduceFunction) {
+				@SuppressWarnings("unchecked")
+				ReduceFunction<T> reducer = (ReduceFunction<T>) function;
+
+				@SuppressWarnings("unchecked")
+				OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
+						new AggregatingProcessingTimeWindowOperator<>(
+								reducer, input.getKeySelector(), windowLength, windowSlide);
+				return input.transform(opName, resultType, op);
+			}
+			else if (function instanceof WindowFunction) {
+				@SuppressWarnings("unchecked")
+				WindowFunction<T, R, K, TimeWindow> wf = (WindowFunction<T, R, K, TimeWindow>) function;
+
+				OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
+						wf, input.getKeySelector(), windowLength, windowSlide);
+				return input.transform(opName, resultType, op);
+			}
+		}
+
+		return null;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java
new file mode 100644
index 0000000..1d54436
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+
+/**
+ * Base interface for functions that are evaluated over non-keyed windows.
+ *
+ * @param <IN> The type of the input value.
+ * @param <OUT> The type of the output value.
+ */
+public interface AllWindowFunction<IN, OUT,  W extends Window> extends Function, Serializable {
+
+	/**
+	 * Evaluates the window and outputs none or several elements.
+	 *
+	 * @param window The window that is being evaluated.
+	 * @param values The elements in the window being evaluated.
+	 * @param out A collector for emitting elements.
+	 *
+	 * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+	 */
+	void apply(W window, Iterable<IN> values, Collector<OUT> out) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/KeyedWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/KeyedWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/KeyedWindowFunction.java
deleted file mode 100644
index 77ce53e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/KeyedWindowFunction.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.windowing;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.util.Collector;
-
-import java.io.Serializable;
-
-/**
- * Base interface for functions that are evaluated over keyed (grouped) windows.
- *
- * @param <IN> The type of the input value.
- * @param <OUT> The type of the output value.
- * @param <KEY> The type of the key.
- */
-public interface KeyedWindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
-
-	/**
-	 * 
-	 * @param key
-	 * @param values
-	 * @param out
-	 * 
-	 * @throws Exception The function may throw exceptions to fail the program and trigger recovery. 
-	 */
-	void evaluate(KEY key, W window, Iterable<IN> values, Collector<OUT> out) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceAllWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceAllWindowFunction.java
new file mode 100644
index 0000000..24855a5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceAllWindowFunction.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+public class ReduceAllWindowFunction<W extends Window, T> extends RichAllWindowFunction<T, T, W> {
+	private static final long serialVersionUID = 1L;
+
+	private final ReduceFunction<T> reduceFunction;
+
+	public ReduceAllWindowFunction(ReduceFunction<T> reduceFunction) {
+		this.reduceFunction = reduceFunction;
+	}
+
+	@Override
+	public void setRuntimeContext(RuntimeContext ctx) {
+		super.setRuntimeContext(ctx);
+		FunctionUtils.setFunctionRuntimeContext(reduceFunction, ctx);
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		FunctionUtils.openFunction(reduceFunction, parameters);
+	}
+
+	@Override
+	public void close() throws Exception {
+		super.close();
+		FunctionUtils.closeFunction(reduceFunction);
+	}
+
+	@Override
+	public void apply(W window, Iterable<T> values, Collector<T> out) throws Exception {
+		T result = null;
+
+		for (T v: values) {
+			if (result == null) {
+				result = v;
+			} else {
+				result = reduceFunction.reduce(result, v);
+			}
+		}
+
+		if (result != null) {
+			out.collect(result);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceKeyedWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceKeyedWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceKeyedWindowFunction.java
deleted file mode 100644
index 70627f0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceKeyedWindowFunction.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.functions.windowing;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.util.Collector;
-
-public class ReduceKeyedWindowFunction<K, W extends Window, T> extends RichKeyedWindowFunction<T, T, K, W> {
-	private static final long serialVersionUID = 1L;
-
-	private final ReduceFunction<T> reduceFunction;
-
-	public ReduceKeyedWindowFunction(ReduceFunction<T> reduceFunction) {
-		this.reduceFunction = reduceFunction;
-	}
-
-	@Override
-	public void setRuntimeContext(RuntimeContext ctx) {
-		super.setRuntimeContext(ctx);
-		FunctionUtils.setFunctionRuntimeContext(reduceFunction, ctx);
-	}
-
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		super.open(parameters);
-		FunctionUtils.openFunction(reduceFunction, parameters);
-	}
-
-	@Override
-	public void close() throws Exception {
-		super.close();
-		FunctionUtils.closeFunction(reduceFunction);
-	}
-
-	@Override
-	public void evaluate(K k, W window, Iterable<T> values, Collector<T> out) throws Exception {
-		T result = null;
-
-		for (T v: values) {
-			if (result == null) {
-				result = v;
-			} else {
-				result = reduceFunction.reduce(result, v);
-			}
-		}
-
-		if (result != null) {
-			out.collect(result);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
index ba26218..042fe18 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
@@ -24,7 +24,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
-public class ReduceWindowFunction<W extends Window, T> extends RichWindowFunction<T, T, W> {
+public class ReduceWindowFunction<K, W extends Window, T> extends RichWindowFunction<T, T, K, W> {
 	private static final long serialVersionUID = 1L;
 
 	private final ReduceFunction<T> reduceFunction;
@@ -52,7 +52,7 @@ public class ReduceWindowFunction<W extends Window, T> extends RichWindowFunctio
 	}
 
 	@Override
-	public void evaluate(W window, Iterable<T> values, Collector<T> out) throws Exception {
+	public void apply(K k, W window, Iterable<T> values, Collector<T> out) throws Exception {
 		T result = null;
 
 		for (T v: values) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java
index bceff82..6a472b1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java
@@ -25,7 +25,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
-public class ReduceWindowFunctionWithWindow<K, W extends Window, T> extends RichKeyedWindowFunction<T, Tuple2<W, T>, K, W> {
+public class ReduceWindowFunctionWithWindow<K, W extends Window, T> extends RichWindowFunction<T, Tuple2<W, T>, K, W> {
 	private static final long serialVersionUID = 1L;
 
 	private final ReduceFunction<T> reduceFunction;
@@ -53,7 +53,7 @@ public class ReduceWindowFunctionWithWindow<K, W extends Window, T> extends Rich
 	}
 
 	@Override
-	public void evaluate(K k, W window, Iterable<T> values, Collector<Tuple2<W, T>> out) throws Exception {
+	public void apply(K k, W window, Iterable<T> values, Collector<Tuple2<W, T>> out) throws Exception {
 		T result = null;
 
 		for (T v: values) {