You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/05 16:42:38 UTC

[04/13] flink git commit: [FLINK-2550] Simplify Stream Java API Class Names

http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichAllWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichAllWindowFunction.java
new file mode 100644
index 0000000..d78e2c3
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichAllWindowFunction.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+public abstract class RichAllWindowFunction<IN, OUT, W extends Window> extends AbstractRichFunction implements AllWindowFunction<IN, OUT, W> {
+	private static final long serialVersionUID = 1L;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichKeyedWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichKeyedWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichKeyedWindowFunction.java
deleted file mode 100644
index 90ccb40..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichKeyedWindowFunction.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.functions.windowing;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-
-public abstract class RichKeyedWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction implements KeyedWindowFunction<IN, OUT, KEY, W> {
-	private static final long serialVersionUID = 1L;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichWindowFunction.java
index b40b74a..0d40bbd 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichWindowFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichWindowFunction.java
@@ -20,6 +20,6 @@ package org.apache.flink.streaming.api.functions.windowing;
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
-public abstract class RichWindowFunction<IN, OUT, W extends Window> extends AbstractRichFunction implements WindowFunction<IN, OUT, W> {
+public abstract class RichWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction implements WindowFunction<IN, OUT, KEY, W> {
 	private static final long serialVersionUID = 1L;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java
index 1a4304e..eda12c0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java
@@ -25,19 +25,23 @@ import org.apache.flink.util.Collector;
 import java.io.Serializable;
 
 /**
- * Base interface for functions that are evaluated over non-keyed windows.
+ * Base interface for functions that are evaluated over keyed (grouped) windows.
  *
  * @param <IN> The type of the input value.
  * @param <OUT> The type of the output value.
+ * @param <KEY> The type of the key.
  */
-public interface WindowFunction<IN, OUT,  W extends Window> extends Function, Serializable {
+public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
 
 	/**
-	 * 
-	 * @param values
-	 * @param out
+	 * Evaluates the window and outputs none or several elements.
+	 *
+	 * @param key The key for which this window is evaluated.
+	 * @param window The window that is being evaluated.
+	 * @param values The elements in the window being evaluated.
+	 * @param out A collector for emitting elements.
 	 * 
 	 * @throws Exception The function may throw exceptions to fail the program and trigger recovery. 
 	 */
-	void evaluate(W window, Iterable<IN> values, Collector<OUT> out) throws Exception;
+	void apply(KEY key, W window, Iterable<IN> values, Collector<OUT> out) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
index 496da6b..55c1be0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.runtime.util.UnionIterator;
-import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
@@ -34,13 +34,13 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
 
 	private final KeyMap.LazyFactory<ArrayList<Type>> listFactory = getListFactory();
 
-	private final KeyedWindowFunction<Type, Result, Key, Window> function;
+	private final WindowFunction<Type, Result, Key, Window> function;
 	
 	private long evaluationPass;
 
 	// ------------------------------------------------------------------------
 	
-	public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, KeyedWindowFunction<Type, Result, Key, Window> function) {
+	public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, WindowFunction<Type, Result, Key, Window> function) {
 		this.keySelector = keySelector;
 		this.function = function;
 	}
@@ -59,7 +59,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
 		if (previousPanes.isEmpty()) {
 			// optimized path for single pane case (tumbling window)
 			for (KeyMap.Entry<Key, ArrayList<Type>> entry : latestPane) {
-				function.evaluate(entry.getKey(), window, entry.getValue(), out);
+				function.apply(entry.getKey(), window, entry.getValue(), out);
 			}
 		}
 		else {
@@ -77,7 +77,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
 	
 	static final class WindowFunctionTraversal<Key, Type, Result> implements KeyMap.TraversalEvaluator<Key, ArrayList<Type>> {
 
-		private final KeyedWindowFunction<Type, Result, Key, Window> function;
+		private final WindowFunction<Type, Result, Key, Window> function;
 		
 		private final UnionIterator<Type> unionIterator;
 		
@@ -87,7 +87,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
 
 		private TimeWindow window;
 
-		WindowFunctionTraversal(KeyedWindowFunction<Type, Result, Key, Window> function, TimeWindow window, Collector<Result> out) {
+		WindowFunctionTraversal(WindowFunction<Type, Result, Key, Window> function, TimeWindow window, Collector<Result> out) {
 			this.function = function;
 			this.out = out;
 			this.unionIterator = new UnionIterator<>();
@@ -108,7 +108,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
 
 		@Override
 		public void keyDone() throws Exception {
-			function.evaluate(currentKey, window, unionIterator, out);
+			function.apply(currentKey, window, unionIterator, out);
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
index ace3823..3bcffbc 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
@@ -20,19 +20,19 @@ package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
 
 public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT> 
-		extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, KeyedWindowFunction<IN, OUT, KEY, TimeWindow>>  {
+		extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, WindowFunction<IN, OUT, KEY, TimeWindow>>  {
 
 	private static final long serialVersionUID = 7305948082830843475L;
 
 	
 	public AccumulatingProcessingTimeWindowOperator(
-			KeyedWindowFunction<IN, OUT, KEY, TimeWindow> function,
+			WindowFunction<IN, OUT, KEY, TimeWindow> function,
 			KeySelector<IN, KEY> keySelector,
 			long windowLength,
 			long windowSlide)
@@ -43,7 +43,7 @@ public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>
 	@Override
 	protected AccumulatingKeyedTimePanes<IN, KEY, OUT> createPanes(KeySelector<IN, KEY> keySelector, Function function) {
 		@SuppressWarnings("unchecked")
-		KeyedWindowFunction<IN, OUT, KEY, Window> windowFunction = (KeyedWindowFunction<IN, OUT, KEY, Window>) function;
+		WindowFunction<IN, OUT, KEY, Window> windowFunction = (WindowFunction<IN, OUT, KEY, Window>) function;
 		
 		return new AccumulatingKeyedTimePanes<>(keySelector, windowFunction);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
index d5ed6cb..53df838 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.runtime.operators.windowing;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.evictors.Evictor;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
@@ -41,7 +41,7 @@ public class EvictingNonKeyedWindowOperator<IN, OUT, W extends Window> extends N
 
 	public EvictingNonKeyedWindowOperator(WindowAssigner<? super IN, W> windowAssigner,
 			WindowBufferFactory<? super IN, ? extends EvictingWindowBuffer<IN>> windowBufferFactory,
-			WindowFunction<IN, OUT, W> windowFunction,
+			AllWindowFunction<IN, OUT, W> windowFunction,
 			Trigger<? super IN, ? super W> trigger,
 			Evictor<? super IN, ? super W> evictor) {
 		super(windowAssigner, windowBufferFactory, windowFunction, trigger);
@@ -77,7 +77,7 @@ public class EvictingNonKeyedWindowOperator<IN, OUT, W extends Window> extends N
 
 		windowBuffer.removeElements(toEvict);
 
-		userFunction.evaluate(
+		userFunction.apply(
 				window,
 				bufferAndTrigger.f0.getUnpackedElements(),
 				timestampedCollector);

http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index cddcc42..334eb54 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.runtime.operators.windowing;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.EvictingWindowBuffer;
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
@@ -44,7 +44,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 	public EvictingWindowOperator(WindowAssigner<? super IN, W> windowAssigner,
 			KeySelector<IN, K> keySelector,
 			WindowBufferFactory<? super IN, ? extends EvictingWindowBuffer<IN>> windowBufferFactory,
-			KeyedWindowFunction<IN, OUT, K, W> windowFunction,
+			WindowFunction<IN, OUT, K, W> windowFunction,
 			Trigger<? super IN, ? super W> trigger,
 			Evictor<? super IN, ? super W> evictor) {
 		super(windowAssigner, keySelector, windowBufferFactory, windowFunction, trigger);
@@ -87,7 +87,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 
 		windowBuffer.removeElements(toEvict);
 
-		userFunction.evaluate(key,
+		userFunction.apply(key,
 				window,
 				bufferAndTrigger.f0.getUnpackedElements(),
 				timestampedCollector);

http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index 3a85759..d48643d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
@@ -46,7 +46,7 @@ import java.util.Map;
 import java.util.Set;
 
 public class NonKeyedWindowOperator<IN, OUT, W extends Window>
-		extends AbstractUdfStreamOperator<OUT, WindowFunction<IN, OUT, W>>
+		extends AbstractUdfStreamOperator<OUT, AllWindowFunction<IN, OUT, W>>
 		implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable {
 
 	private static final long serialVersionUID = 1L;
@@ -72,7 +72,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 
 	public NonKeyedWindowOperator(WindowAssigner<? super IN, W> windowAssigner,
 			WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory,
-			WindowFunction<IN, OUT, W> windowFunction,
+			AllWindowFunction<IN, OUT, W> windowFunction,
 			Trigger<? super IN, ? super W> trigger) {
 
 		super(windowFunction);
@@ -157,7 +157,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 		}
 
 
-		userFunction.evaluate(
+		userFunction.apply(
 				window,
 				bufferAndTrigger.f0.getUnpackedElements(),
 				timestampedCollector);

http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index cda4481..2d4635f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
@@ -47,7 +47,7 @@ import java.util.Map;
 import java.util.Set;
 
 public class WindowOperator<K, IN, OUT, W extends Window>
-		extends AbstractUdfStreamOperator<OUT, KeyedWindowFunction<IN, OUT, K, W>>
+		extends AbstractUdfStreamOperator<OUT, WindowFunction<IN, OUT, K, W>>
 		implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable {
 
 	private static final long serialVersionUID = 1L;
@@ -75,7 +75,7 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 	public WindowOperator(WindowAssigner<? super IN, W> windowAssigner,
 			KeySelector<IN, K> keySelector,
 			WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory,
-			KeyedWindowFunction<IN, OUT, K, W> windowFunction,
+			WindowFunction<IN, OUT, K, W> windowFunction,
 			Trigger<? super IN, ? super W> trigger) {
 
 		super(windowFunction);
@@ -181,7 +181,7 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 		}
 
 
-		userFunction.evaluate(key,
+		userFunction.apply(key,
 				window,
 				bufferAndTrigger.f0.getUnpackedElements(),
 				timestampedCollector);

http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
index 0ac352b..a8c4b49 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
@@ -157,7 +157,7 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
 				return new PartitionedStreamOperatorState(provider, statePartitioner, getUserCodeClassLoader());
 			} else {
 				throw new RuntimeException(
-						"Partitioned state can only be used with KeyedDataStreams.");
+						"Partitioned state can only be used with KeyedStreams.");
 			}
 		} else {
 			return new StreamOperatorState(provider);

http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
index 207b1b1..ead3af8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
@@ -46,7 +46,7 @@ import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.LocalStateHandle.LocalStateHandleProvider;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.datastream.KeyedDataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.operators.Output;
@@ -113,7 +113,7 @@ public class StatefulOperatorTest extends StreamingMultipleProgramsTestBase {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(3);
 
-		KeyedDataStream<Integer, Integer> keyedStream = env
+		KeyedStream<Integer, Integer> keyedStream = env
 				.fromCollection(Arrays.asList(0, 1, 2, 3, 4, 5, 6))
 				.keyBy(new ModKey(4));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index 99a2e14..89672df 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
@@ -49,7 +49,7 @@ import static org.junit.Assert.*;
 public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
 	@SuppressWarnings("unchecked")
-	private final KeyedWindowFunction<String, String, String, TimeWindow> mockFunction = mock(KeyedWindowFunction.class);
+	private final WindowFunction<String, String, String, TimeWindow> mockFunction = mock(WindowFunction.class);
 
 	@SuppressWarnings("unchecked")
 	private final KeySelector<String, String> mockKeySelector = mock(KeySelector.class);
@@ -61,11 +61,14 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 		}
 	};
 	
-	private final KeyedWindowFunction<Integer, Integer, Integer, TimeWindow> validatingIdentityFunction =
-			new KeyedWindowFunction<Integer, Integer, Integer, TimeWindow>()
+	private final WindowFunction<Integer, Integer, Integer, TimeWindow> validatingIdentityFunction =
+			new WindowFunction<Integer, Integer, Integer, TimeWindow>()
 	{
 		@Override
-		public void evaluate(Integer key, TimeWindow window, Iterable<Integer> values, Collector<Integer> out) {
+		public void apply(Integer key,
+				TimeWindow window,
+				Iterable<Integer> values,
+				Collector<Integer> out) {
 			for (Integer val : values) {
 				assertEquals(key, val);
 				out.collect(val);
@@ -472,7 +475,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
 			when(mockContext.getTaskName()).thenReturn("Test task name");
 
-			KeyedWindowFunction<Integer, Integer, Integer, TimeWindow> failingFunction = new FailingFunction(100);
+			WindowFunction<Integer, Integer, Integer, TimeWindow> failingFunction = new FailingFunction(100);
 
 			// the operator has a window time that is so long that it will not fire in this test
 			final long hundredYears = 100L * 365 * 24 * 60 * 60 * 1000;
@@ -523,7 +526,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
 	// ------------------------------------------------------------------------
 	
-	private static class FailingFunction implements KeyedWindowFunction<Integer, Integer, Integer, TimeWindow> {
+	private static class FailingFunction implements WindowFunction<Integer, Integer, Integer, TimeWindow> {
 
 		private final int failAfterElements;
 		
@@ -534,7 +537,10 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 		}
 
 		@Override
-		public void evaluate(Integer integer, TimeWindow window, Iterable<Integer> values, Collector<Integer> out) throws Exception {
+		public void apply(Integer integer,
+				TimeWindow window,
+				Iterable<Integer> values,
+				Collector<Integer> out) throws Exception {
 			for (Integer i : values) {
 				out.collect(i);
 				numElements++;

http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
new file mode 100644
index 0000000..ee8c6d6
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
+import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * These tests verify that the api calls on
+ * {@link org.apache.flink.streaming.api.datastream.AllWindowedStream} instantiate
+ * the correct window operator.
+ */
+public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
+
+	/**
+	 * These tests ensure that the fast aligned time windows operator is used if the
+	 * conditions are right.
+	 *
+	 * TODO: update once fast aligned time windows operator is in
+	 */
+	@Ignore
+	@Test
+	public void testFastTimeWindows() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DummyReducer reducer = new DummyReducer();
+
+		DataStream<Tuple2<String, Integer>> window1 = source
+				.windowAll(SlidingProcessingTimeWindows.of(1000, 100))
+				.reduceWindow(reducer);
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
+		Assert.assertTrue(operator1 instanceof AggregatingProcessingTimeWindowOperator);
+
+		DataStream<Tuple2<String, Integer>> window2 = source
+				.windowAll(SlidingProcessingTimeWindows.of(1000, 100))
+				.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void apply(
+							TimeWindow window,
+							Iterable<Tuple2<String, Integer>> values,
+							Collector<Tuple2<String, Integer>> out) throws Exception {
+
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
+		Assert.assertTrue(operator2 instanceof AccumulatingProcessingTimeWindowOperator);
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testNonEvicting() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DummyReducer reducer = new DummyReducer();
+
+		DataStream<Tuple2<String, Integer>> window1 = source
+				.windowAll(SlidingProcessingTimeWindows.of(1000, 100))
+				.trigger(CountTrigger.of(100))
+				.reduceWindow(reducer);
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
+		Assert.assertTrue(operator1 instanceof NonKeyedWindowOperator);
+		NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) operator1;
+		Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof CountTrigger);
+		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingProcessingTimeWindows);
+		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);
+
+		DataStream<Tuple2<String, Integer>> window2 = source
+				.windowAll(TumblingProcessingTimeWindows.of(1000))
+				.trigger(CountTrigger.of(100))
+				.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void apply(
+							TimeWindow window,
+							Iterable<Tuple2<String, Integer>> values,
+							Collector<Tuple2<String, Integer>> out) throws Exception {
+
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
+		Assert.assertTrue(operator2 instanceof NonKeyedWindowOperator);
+		NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) operator2;
+		Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof CountTrigger);
+		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testEvicting() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DummyReducer reducer = new DummyReducer();
+
+		DataStream<Tuple2<String, Integer>> window1 = source
+				.windowAll(SlidingProcessingTimeWindows.of(1000, 100))
+				.evictor(CountEvictor.of(100))
+				.reduceWindow(reducer);
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
+		Assert.assertTrue(operator1 instanceof EvictingNonKeyedWindowOperator);
+		EvictingNonKeyedWindowOperator winOperator1 = (EvictingNonKeyedWindowOperator) operator1;
+		Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof ProcessingTimeTrigger);
+		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingProcessingTimeWindows);
+		Assert.assertTrue(winOperator1.getEvictor() instanceof CountEvictor);
+		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
+
+		DataStream<Tuple2<String, Integer>> window2 = source
+				.windowAll(TumblingProcessingTimeWindows.of(1000))
+				.trigger(CountTrigger.of(100))
+				.evictor(TimeEvictor.of(100))
+				.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void apply(
+							TimeWindow window,
+							Iterable<Tuple2<String, Integer>> values,
+							Collector<Tuple2<String, Integer>> out) throws Exception {
+
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
+		Assert.assertTrue(operator2 instanceof EvictingNonKeyedWindowOperator);
+		EvictingNonKeyedWindowOperator winOperator2 = (EvictingNonKeyedWindowOperator) operator2;
+		Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof CountTrigger);
+		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+		Assert.assertTrue(winOperator2.getEvictor() instanceof TimeEvictor);
+		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
+	}
+
+	// ------------------------------------------------------------------------
+	//  UDFs
+	// ------------------------------------------------------------------------
+
+	public static class DummyReducer extends RichReduceFunction<Tuple2<String, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
+			return value1;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java
index 0dfceab..3139941 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java
@@ -23,8 +23,7 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.windowing.ReduceKeyedWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ReduceAllWindowFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
 import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
@@ -56,7 +55,7 @@ public class EvictingNonKeyedWindowOperatorTest {
 		EvictingNonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingNonKeyedWindowOperator<>(
 				GlobalWindows.create(),
 				new HeapWindowBuffer.Factory<Tuple2<String, Integer>>(),
-				new ReduceWindowFunction<GlobalWindow, Tuple2<String, Integer>>(new SumReducer(closeCalled)),
+				new ReduceAllWindowFunction<GlobalWindow, Tuple2<String, Integer>>(new SumReducer(closeCalled)),
 				CountTrigger.of(WINDOW_SLIDE),
 				CountEvictor.of(WINDOW_SIZE));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
index 01381f9..3d9605e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
 import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
-import org.apache.flink.streaming.api.functions.windowing.ReduceKeyedWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -56,7 +56,7 @@ public class EvictingWindowOperatorTest {
 				GlobalWindows.create(),
 				new TupleKeySelector(),
 				new HeapWindowBuffer.Factory<Tuple2<String, Integer>>(),
-				new ReduceKeyedWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer(closeCalled)),
+				new ReduceWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer(closeCalled)),
 				CountTrigger.of(WINDOW_SLIDE),
 				CountEvictor.of(WINDOW_SIZE));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
index b74b3ea..9b0bcc4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
@@ -23,8 +23,7 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.windowing.ReduceKeyedWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ReduceAllWindowFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
@@ -76,7 +75,7 @@ public class NonKeyedWindowOperatorTest {
 		NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new NonKeyedWindowOperator<>(
 				SlidingTimeWindows.of(WINDOW_SIZE, WINDOW_SLIDE),
 				windowBufferFactory,
-				new ReduceWindowFunction<TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
+				new ReduceAllWindowFunction<TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
 				WatermarkTrigger.create());
 
 		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
@@ -156,7 +155,7 @@ public class NonKeyedWindowOperatorTest {
 		NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new NonKeyedWindowOperator<>(
 				TumblingTimeWindows.of(WINDOW_SIZE),
 				windowBufferFactory,
-				new ReduceWindowFunction<TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
+				new ReduceAllWindowFunction<TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
 				WatermarkTrigger.create());
 
 		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
@@ -234,7 +233,7 @@ public class NonKeyedWindowOperatorTest {
 		NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new NonKeyedWindowOperator<>(
 				GlobalWindows.create(),
 				windowBufferFactory,
-				new ReduceWindowFunction<GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
+				new ReduceAllWindowFunction<GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
 				ContinuousWatermarkTrigger.of(WINDOW_SIZE));
 
 		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
@@ -312,7 +311,7 @@ public class NonKeyedWindowOperatorTest {
 		NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new NonKeyedWindowOperator<>(
 				GlobalWindows.create(),
 				windowBufferFactory,
-				new ReduceWindowFunction<GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
+				new ReduceAllWindowFunction<GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
 				PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE)));
 
 		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse(

http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonParallelWindowDataStreamTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonParallelWindowDataStreamTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonParallelWindowDataStreamTranslationTest.java
deleted file mode 100644
index 4babee1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonParallelWindowDataStreamTranslationTest.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.api.common.functions.RichReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.transformations.OneInputTransformation;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
-import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
-import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
-import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
-
-/**
- * These tests verify that the api calls on
- * {@link org.apache.flink.streaming.api.datastream.KeyedWindowDataStream} instantiate
- * the correct window operator.
- */
-public class NonParallelWindowDataStreamTranslationTest extends StreamingMultipleProgramsTestBase {
-
-	/**
-	 * These tests ensure that the fast aligned time windows operator is used if the
-	 * conditions are right.
-	 *
-	 * TODO: update once fast aligned time windows operator is in
-	 */
-	@Ignore
-	@Test
-	public void testFastTimeWindows() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
-
-		DummyReducer reducer = new DummyReducer();
-
-		DataStream<Tuple2<String, Integer>> window1 = source
-				.windowAll(SlidingProcessingTimeWindows.of(1000, 100))
-				.reduceWindow(reducer);
-
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
-		Assert.assertTrue(operator1 instanceof AggregatingProcessingTimeWindowOperator);
-
-		DataStream<Tuple2<String, Integer>> window2 = source
-				.windowAll(SlidingProcessingTimeWindows.of(1000, 100))
-				.mapWindow(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public void evaluate(
-							TimeWindow window,
-							Iterable<Tuple2<String, Integer>> values,
-							Collector<Tuple2<String, Integer>> out) throws Exception {
-
-					}
-				});
-
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
-		Assert.assertTrue(operator2 instanceof AccumulatingProcessingTimeWindowOperator);
-	}
-
-	@Test
-	@SuppressWarnings("rawtypes")
-	public void testNonEvicting() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
-
-		DummyReducer reducer = new DummyReducer();
-
-		DataStream<Tuple2<String, Integer>> window1 = source
-				.windowAll(SlidingProcessingTimeWindows.of(1000, 100))
-				.trigger(CountTrigger.of(100))
-				.reduceWindow(reducer);
-
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
-		Assert.assertTrue(operator1 instanceof NonKeyedWindowOperator);
-		NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) operator1;
-		Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof CountTrigger);
-		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingProcessingTimeWindows);
-		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);
-
-		DataStream<Tuple2<String, Integer>> window2 = source
-				.windowAll(TumblingProcessingTimeWindows.of(1000))
-				.trigger(CountTrigger.of(100))
-				.mapWindow(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public void evaluate(
-							TimeWindow window,
-							Iterable<Tuple2<String, Integer>> values,
-							Collector<Tuple2<String, Integer>> out) throws Exception {
-
-					}
-				});
-
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
-		Assert.assertTrue(operator2 instanceof NonKeyedWindowOperator);
-		NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) operator2;
-		Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof CountTrigger);
-		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
-		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
-	}
-
-	@Test
-	@SuppressWarnings("rawtypes")
-	public void testEvicting() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
-
-		DummyReducer reducer = new DummyReducer();
-
-		DataStream<Tuple2<String, Integer>> window1 = source
-				.windowAll(SlidingProcessingTimeWindows.of(1000, 100))
-				.evictor(CountEvictor.of(100))
-				.reduceWindow(reducer);
-
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
-		Assert.assertTrue(operator1 instanceof EvictingNonKeyedWindowOperator);
-		EvictingNonKeyedWindowOperator winOperator1 = (EvictingNonKeyedWindowOperator) operator1;
-		Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof ProcessingTimeTrigger);
-		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingProcessingTimeWindows);
-		Assert.assertTrue(winOperator1.getEvictor() instanceof CountEvictor);
-		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
-
-		DataStream<Tuple2<String, Integer>> window2 = source
-				.windowAll(TumblingProcessingTimeWindows.of(1000))
-				.trigger(CountTrigger.of(100))
-				.evictor(TimeEvictor.of(100))
-				.mapWindow(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public void evaluate(
-							TimeWindow window,
-							Iterable<Tuple2<String, Integer>> values,
-							Collector<Tuple2<String, Integer>> out) throws Exception {
-
-					}
-				});
-
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
-		Assert.assertTrue(operator2 instanceof EvictingNonKeyedWindowOperator);
-		EvictingNonKeyedWindowOperator winOperator2 = (EvictingNonKeyedWindowOperator) operator2;
-		Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof CountTrigger);
-		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
-		Assert.assertTrue(winOperator2.getEvictor() instanceof TimeEvictor);
-		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
-	}
-
-	// ------------------------------------------------------------------------
-	//  UDFs
-	// ------------------------------------------------------------------------
-
-	public static class DummyReducer extends RichReduceFunction<Tuple2<String, Integer>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
-			return value1;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
index aaf21e0..76d7bfe 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
@@ -21,9 +21,10 @@ import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.WindowedStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.windowing.time.Time;
@@ -38,7 +39,7 @@ import java.util.concurrent.TimeUnit;
 
 /**
  * These tests verify that the api calls on
- * {@link org.apache.flink.streaming.api.datastream.KeyedWindowDataStream} instantiate
+ * {@link WindowedStream} instantiate
  * the correct window operator.
  */
 public class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase {
@@ -67,11 +68,11 @@ public class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		DataStream<Tuple2<String, Integer>> window2 = source
 				.keyBy(0)
 				.timeWindow(Time.of(1000, TimeUnit.MILLISECONDS))
-				.mapWindow(new KeyedWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
+				.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
 					private static final long serialVersionUID = 1L;
 
 					@Override
-					public void evaluate(Tuple tuple,
+					public void apply(Tuple tuple,
 							TimeWindow window,
 							Iterable<Tuple2<String, Integer>> values,
 							Collector<Tuple2<String, Integer>> out) throws Exception {
@@ -110,11 +111,11 @@ public class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase
 
 		DataStream<Tuple2<String, Integer>> window2 = source
 				.timeWindowAll(Time.of(1000, TimeUnit.MILLISECONDS))
-				.mapWindow(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
+				.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
 					private static final long serialVersionUID = 1L;
 
 					@Override
-					public void evaluate(
+					public void apply(
 							TimeWindow window,
 							Iterable<Tuple2<String, Integer>> values,
 							Collector<Tuple2<String, Integer>> out) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 3107d51..1bfd1d5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
-import org.apache.flink.streaming.api.functions.windowing.ReduceKeyedWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
 import org.apache.flink.streaming.api.windowing.triggers.ContinuousWatermarkTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
@@ -76,7 +76,7 @@ public class WindowOperatorTest {
 				SlidingTimeWindows.of(WINDOW_SIZE, WINDOW_SLIDE),
 				new TupleKeySelector(),
 				windowBufferFactory,
-				new ReduceKeyedWindowFunction<String, TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
+				new ReduceWindowFunction<String, TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
 				WatermarkTrigger.create());
 
 		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
@@ -163,7 +163,7 @@ public class WindowOperatorTest {
 				TumblingTimeWindows.of(WINDOW_SIZE),
 				new TupleKeySelector(),
 				windowBufferFactory,
-				new ReduceKeyedWindowFunction<String, TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
+				new ReduceWindowFunction<String, TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
 				WatermarkTrigger.create());
 
 		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
@@ -246,7 +246,7 @@ public class WindowOperatorTest {
 				GlobalWindows.create(),
 				new TupleKeySelector(),
 				windowBufferFactory,
-				new ReduceKeyedWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
+				new ReduceWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
 				ContinuousWatermarkTrigger.of(WINDOW_SIZE));
 
 		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
@@ -331,7 +331,7 @@ public class WindowOperatorTest {
 				GlobalWindows.create(),
 				new TupleKeySelector(),
 				windowBufferFactory,
-				new ReduceKeyedWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
+				new ReduceWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
 				PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE)));
 
 		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse(

http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
index 43e7715..a3e6085 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
@@ -21,8 +21,9 @@ import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.WindowedStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
@@ -41,7 +42,7 @@ import org.junit.Test;
 
 /**
  * These tests verify that the api calls on
- * {@link org.apache.flink.streaming.api.datastream.KeyedWindowDataStream} instantiate
+ * {@link WindowedStream} instantiate
  * the correct window operator.
  */
 public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
@@ -70,11 +71,11 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 		DataStream<Tuple2<String, Integer>> window2 = source
 				.keyBy(0)
 				.window(SlidingProcessingTimeWindows.of(1000, 100))
-				.mapWindow(new KeyedWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
+				.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
 					private static final long serialVersionUID = 1L;
 
 					@Override
-					public void evaluate(Tuple tuple,
+					public void apply(Tuple tuple,
 							TimeWindow window,
 							Iterable<Tuple2<String, Integer>> values,
 							Collector<Tuple2<String, Integer>> out) throws Exception {
@@ -114,11 +115,11 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 				.keyBy(0)
 				.window(TumblingProcessingTimeWindows.of(1000))
 				.trigger(CountTrigger.of(100))
-				.mapWindow(new KeyedWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
+				.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
 					private static final long serialVersionUID = 1L;
 
 					@Override
-					public void evaluate(Tuple tuple,
+					public void apply(Tuple tuple,
 							TimeWindow window,
 							Iterable<Tuple2<String, Integer>> values,
 							Collector<Tuple2<String, Integer>> out) throws Exception {
@@ -164,11 +165,11 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 				.window(TumblingProcessingTimeWindows.of(1000))
 				.trigger(CountTrigger.of(100))
 				.evictor(TimeEvictor.of(100))
-				.mapWindow(new KeyedWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
+				.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
 					private static final long serialVersionUID = 1L;
 
 					@Override
-					public void evaluate(Tuple tuple,
+					public void apply(Tuple tuple,
 							TimeWindow window,
 							Iterable<Tuple2<String, Integer>> values,
 							Collector<Tuple2<String, Integer>> out) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
index a5f1e89..5d32b8e 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
@@ -26,7 +26,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
@@ -81,10 +81,10 @@ public class GroupedProcessingTimeWindowExample {
 			.timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
 			.reduceWindow(new SummingReducer())
 
-			// alternative: use a mapWindow function which does not pre-aggregate
+			// alternative: use a apply function which does not pre-aggregate
 //			.keyBy(new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>())
 //			.window(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
-//			.mapWindow(new SummingWindowFunction())
+//			.apply(new SummingWindowFunction())
 				
 			.addSink(new SinkFunction<Tuple2<Long, Long>>() {
 				@Override
@@ -104,10 +104,10 @@ public class GroupedProcessingTimeWindowExample {
 		}
 	}
 
-	public static class SummingWindowFunction implements KeyedWindowFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Long, Window> {
+	public static class SummingWindowFunction implements WindowFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Long, Window> {
 
 		@Override
-		public void evaluate(Long key, Window window, Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) {
+		public void apply(Long key, Window window, Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) {
 			long sum = 0L;
 			for (Tuple2<Long, Long> value : values) {
 				sum += value.f1;

http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 9f4f52a..ca5fc48 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -39,6 +39,7 @@ import org.apache.flink.streaming.util.serialization.SerializationSchema
 import org.apache.flink.util.Collector
 import org.apache.flink.api.common.functions.{RichMapFunction, RichFlatMapFunction, RichFilterFunction}
 import org.apache.flink.streaming.api.scala.function.StatefulFunction
+import org.apache.flink.streaming.api.datastream.{KeyedStream => JavaKeyedStream}
 
 class DataStream[T](javaStream: JavaStream[T]) {
 
@@ -246,7 +247,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * Groups the elements of a DataStream by the given field expressions to
    * be used with grouped operators like grouped reduce or grouped aggregations.
    */
-  def groupBy(firstField: String, otherFields: String*): GroupedDataStream[T, JavaTuple] = 
+  def groupBy(firstField: String, otherFields: String*): GroupedDataStream[T, JavaTuple] =
    javaStream.groupBy(firstField +: otherFields.toArray: _*)   
   
   /**
@@ -601,7 +602,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
   }
 
   private[flink] def isStatePartitioned: Boolean = {
-    javaStream.isInstanceOf[KeyedDataStream[_, _]]
+    javaStream.isInstanceOf[JavaKeyedStream[_, _]]
   }
 
   /**