You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/01 11:54:19 UTC

[8/9] flink git commit: [FLINK-2778] Add API for non-parallel non-keyed Windows

[FLINK-2778] Add API for non-parallel non-keyed Windows

This adds two new operators for non-keyed windows: Regular trigger
operator and evicting trigger operator.

This also adds the API calls nonParallelWindow(...) on DataStream and
the API class NonParallelWindowDataStream for representing these
operations.

This also adds tests for both the operators and the translation from API
to operators.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/41717d12
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/41717d12
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/41717d12

Branch: refs/heads/master
Commit: 41717d12372680b9cd55df936c558a255ac25163
Parents: 5623c15
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Sep 29 20:29:09 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Oct 1 11:04:00 2015 +0200

----------------------------------------------------------------------
 .../streaming/api/datastream/DataStream.java    |  77 ++++
 .../datastream/NonParallelWindowDataStream.java | 218 ++++++++++
 .../windowing/ReduceWindowFunction.java         |   4 +-
 .../functions/windowing/RichWindowFunction.java |  25 ++
 .../api/functions/windowing/WindowFunction.java |  43 ++
 .../EvictingNonKeyedWindowOperator.java         | 101 +++++
 .../windowing/NonKeyedWindowOperator.java       | 285 ++++++++++++
 .../EvictingNonKeyedWindowOperatorTest.java     | 173 ++++++++
 .../windowing/NonKeyedWindowOperatorTest.java   | 434 +++++++++++++++++++
 ...ParallelWindowDataStreamTranslationTest.java | 198 +++++++++
 .../windowing/TimeWindowTranslationTest.java    |  45 ++
 .../StreamingScalaAPICompletenessTest.scala     |   6 +-
 12 files changed, 1606 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/41717d12/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 5dfb1e2..e3f7f3e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -60,6 +60,11 @@ import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.streaming.api.transformations.StreamTransformation;
 import org.apache.flink.streaming.api.transformations.UnionTransformation;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.helper.Count;
 import org.apache.flink.streaming.api.windowing.helper.Delta;
 import org.apache.flink.streaming.api.windowing.helper.FullStream;
@@ -67,6 +72,10 @@ import org.apache.flink.streaming.api.windowing.helper.Time;
 import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
 import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
+import org.apache.flink.streaming.api.windowing.time.AbstractTime;
+import org.apache.flink.streaming.api.windowing.time.EventTime;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
@@ -805,6 +814,74 @@ public class DataStream<T> {
 	}
 
 	/**
+	 * Windows this {@code KeyedDataStream} into tumbling time windows.
+	 *
+	 * <p>
+	 * This is a shortcut for either {@code .window(TumblingTimeWindows.of(size))} or
+	 * {@code .window(TumblingProcessingTimeWindows.of(size))} depending on the time characteristic
+	 * set using
+	 * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
+	 *
+	 * @param size The size of the window.
+	 */
+	public NonParallelWindowDataStream<T, TimeWindow> timeWindowAll(AbstractTime size) {
+		AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
+
+		if (actualSize instanceof EventTime) {
+			return windowAll(TumblingTimeWindows.of(actualSize.toMilliseconds()));
+		} else {
+			return windowAll(TumblingProcessingTimeWindows.of(actualSize.toMilliseconds()));
+		}
+	}
+
+	/**
+	 * Windows this {@code KeyedDataStream} into sliding time windows.
+	 *
+	 * <p>
+	 * This is a shortcut for either {@code .window(SlidingTimeWindows.of(size, slide))} or
+	 * {@code .window(SlidingProcessingTimeWindows.of(size, slide))} depending on the time characteristic
+	 * set using
+	 * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
+	 *
+	 * @param size The size of the window.
+	 */
+	public NonParallelWindowDataStream<T, TimeWindow> timeWindowAll(AbstractTime size, AbstractTime slide) {
+		AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
+		AbstractTime actualSlide = slide.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
+
+		if (actualSize instanceof EventTime) {
+			return windowAll(SlidingTimeWindows.of(actualSize.toMilliseconds(),
+					actualSlide.toMilliseconds()));
+		} else {
+			return windowAll(SlidingProcessingTimeWindows.of(actualSize.toMilliseconds(),
+					actualSlide.toMilliseconds()));
+		}
+	}
+
+	/**
+	 * Windows this data stream to a {@code KeyedTriggerWindowDataStream}, which evaluates windows
+	 * over a key grouped stream. Elements are put into windows by a
+	 * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. The grouping of
+	 * elements is done both by key and by window.
+	 *
+	 * <p>
+	 * A {@link org.apache.flink.streaming.api.windowing.triggers.Trigger} can be defined to specify
+	 * when windows are evaluated. However, {@code WindowAssigners} have a default {@code Trigger}
+	 * that is used if a {@code Trigger} is not specified.
+	 *
+	 * <p>
+	 * Note: This operation can be inherently non-parallel since all elements have to pass through
+	 * the same operator instance. (Only for special cases, such as aligned time windows is
+	 * it possible to perform this operation in parallel).
+	 *
+	 * @param assigner The {@code WindowAssigner} that assigns elements to windows.
+	 * @return The trigger windows data stream.
+	 */
+	public <W extends Window> NonParallelWindowDataStream<T, W> windowAll(WindowAssigner<? super T, W> assigner) {
+		return new NonParallelWindowDataStream<>(this, assigner);
+	}
+
+	/**
 	 * Writes a DataStream to the standard output stream (stdout).
 	 *
 	 * <p>

http://git-wip-us.apache.org/repos/asf/flink/blob/41717d12/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/NonParallelWindowDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/NonParallelWindowDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/NonParallelWindowDataStream.java
new file mode 100644
index 0000000..5cb3b6b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/NonParallelWindowDataStream.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.evictors.Evictor;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.operators.windowing.EvictingNonKeyedWindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.NonKeyedWindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
+
+/**
+ * A {@code NonParallelWindowDataStream} represents a data stream where the stream of
+ * elements is split into windows based on a
+ * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. Window emission
+ * is triggered based on a {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.
+ *
+ * <p>
+ * If an {@link org.apache.flink.streaming.api.windowing.evictors.Evictor} is specified it will be
+ * used to evict elements from the window after
+ * evaluation was triggered by the {@code Trigger} but before the actual evaluation of the window.
+ * When using an evictor window performance will degrade significantly, since
+ * pre-aggregation of window results cannot be used.
+ *
+ * <p>
+ * Note that the {@code NonParallelWindowDataStream} is purely and API construct, during runtime
+ * the {@code NonParallelWindowDataStream} will be collapsed together with the
+ * operation over the window into one single operation.
+ *
+ * @param <T> The type of elements in the stream.
+ * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns the elements to.
+ */
+public class NonParallelWindowDataStream<T, W extends Window> {
+
+	/** The data stream that is windowed by this stream */
+	private final DataStream<T> input;
+
+	/** The window assigner */
+	private final WindowAssigner<? super T, W> windowAssigner;
+
+	/** The trigger that is used for window evaluation/emission. */
+	private Trigger<? super T, ? super W> trigger;
+
+	/** The evictor that is used for evicting elements before window evaluation. */
+	private Evictor<? super T, ? super W> evictor;
+
+
+	public NonParallelWindowDataStream(DataStream<T> input,
+			WindowAssigner<? super T, W> windowAssigner) {
+		this.input = input;
+		this.windowAssigner = windowAssigner;
+		this.trigger = windowAssigner.getDefaultTrigger();
+	}
+
+	/**
+	 * Sets the {@code Trigger} that should be used to trigger window emission.
+	 */
+	public NonParallelWindowDataStream<T, W> trigger(Trigger<? super T, ? super W> trigger) {
+		this.trigger = trigger;
+		return this;
+	}
+
+	/**
+	 * Sets the {@code Evictor} that should be used to evict elements from a window before emission.
+	 *
+	 * <p>
+	 * Note: When using an evictor window performance will degrade significantly, since
+	 * pre-aggregation of window results cannot be used.
+	 */
+	public NonParallelWindowDataStream<T, W> evictor(Evictor<? super T, ? super W> evictor) {
+		this.evictor = evictor;
+		return this;
+	}
+
+
+	// ------------------------------------------------------------------------
+	//  Operations on the keyed windows
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Applies a reduce function to the window. The window function is called for each evaluation
+	 * of the window for each key individually. The output of the reduce function is interpreted
+	 * as a regular non-windowed stream.
+	 * <p>
+	 * This window will try and pre-aggregate data as much as the window policies permit. For example,
+	 * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
+	 * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide interval,
+	 * so a few elements are stored per key (one per slide interval).
+	 * Custom windows may not be able to pre-aggregate, or may need to store extra values in an
+	 * aggregation tree.
+	 * 
+	 * @param function The reduce function.
+	 * @return The data stream that is the result of applying the reduce function to the window. 
+	 */
+	public DataStream<T> reduceWindow(ReduceFunction<T> function) {
+		String callLocation = Utils.getCallLocationName();
+		String udfName = "Reduce at " + callLocation;
+
+		DataStream<T> result = createFastTimeOperatorIfValid(function, input.getType(), udfName);
+		if (result != null) {
+			return result;
+		}
+
+		String opName = "NonParallelTriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
+
+		OneInputStreamOperator<T, T> operator;
+
+		if (evictor != null) {
+			operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
+					new HeapWindowBuffer.Factory<T>(),
+					new ReduceWindowFunction<W, T>(function),
+					trigger,
+					evictor);
+
+		} else {
+			// we need to copy because we need our own instance of the pre aggregator
+			@SuppressWarnings("unchecked")
+			ReduceFunction<T> functionCopy = (ReduceFunction<T>) SerializationUtils.clone(function);
+
+			operator = new NonKeyedWindowOperator<>(windowAssigner,
+					new PreAggregatingHeapWindowBuffer.Factory<>(functionCopy),
+					new ReduceWindowFunction<W, T>(function),
+					trigger);
+		}
+
+		return input.transform(opName, input.getType(), operator).setParallelism(1);
+	}
+
+	/**
+	 * Applies a window function to the window. The window function is called for each evaluation
+	 * of the window for each key individually. The output of the window function is interpreted
+	 * as a regular non-windowed stream.
+	 * <p>
+	 * Not that this function requires that all data in the windows is buffered until the window
+	 * is evaluated, as the function provides no means of pre-aggregation.
+	 * 
+	 * @param function The window function.
+	 * @return The data stream that is the result of applying the window function to the window.
+	 */
+	public <R> DataStream<R> mapWindow(WindowFunction<T, R, W> function) {
+		TypeInformation<T> inType = input.getType();
+		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
+				function, WindowFunction.class, true, true, inType, null, false);
+
+		String callLocation = Utils.getCallLocationName();
+		String udfName = "MapWindow at " + callLocation;
+
+		DataStream<R> result = createFastTimeOperatorIfValid(function, resultType, udfName);
+		if (result != null) {
+			return result;
+		}
+
+
+		String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
+
+		OneInputStreamOperator<T, R> operator;
+
+		if (evictor != null) {
+			operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
+					new HeapWindowBuffer.Factory<T>(),
+					function,
+					trigger,
+					evictor);
+
+		} else {
+			operator = new NonKeyedWindowOperator<>(windowAssigner,
+					new HeapWindowBuffer.Factory<T>(),
+					function,
+					trigger);
+		}
+
+
+
+		return input.transform(opName, resultType, operator).setParallelism(1);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+
+	private <R> DataStream<R> createFastTimeOperatorIfValid(
+			Function function,
+			TypeInformation<R> resultType,
+			String functionName) {
+
+		// TODO: add once non-parallel fast aligned time windows operator is ready
+		return null;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/41717d12/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
index 1c9578a..ba26218 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
@@ -24,7 +24,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
-public class ReduceWindowFunction<K, W extends Window, T> extends RichKeyedWindowFunction<T, T, K, W> {
+public class ReduceWindowFunction<W extends Window, T> extends RichWindowFunction<T, T, W> {
 	private static final long serialVersionUID = 1L;
 
 	private final ReduceFunction<T> reduceFunction;
@@ -52,7 +52,7 @@ public class ReduceWindowFunction<K, W extends Window, T> extends RichKeyedWindo
 	}
 
 	@Override
-	public void evaluate(K k, W window, Iterable<T> values, Collector<T> out) throws Exception {
+	public void evaluate(W window, Iterable<T> values, Collector<T> out) throws Exception {
 		T result = null;
 
 		for (T v: values) {

http://git-wip-us.apache.org/repos/asf/flink/blob/41717d12/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichWindowFunction.java
new file mode 100644
index 0000000..b40b74a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichWindowFunction.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+public abstract class RichWindowFunction<IN, OUT, W extends Window> extends AbstractRichFunction implements WindowFunction<IN, OUT, W> {
+	private static final long serialVersionUID = 1L;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/41717d12/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java
new file mode 100644
index 0000000..1a4304e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+
+/**
+ * Base interface for functions that are evaluated over non-keyed windows.
+ *
+ * @param <IN> The type of the input value.
+ * @param <OUT> The type of the output value.
+ */
+public interface WindowFunction<IN, OUT,  W extends Window> extends Function, Serializable {
+
+	/**
+	 * 
+	 * @param values
+	 * @param out
+	 * 
+	 * @throws Exception The function may throw exceptions to fail the program and trigger recovery. 
+	 */
+	void evaluate(W window, Iterable<IN> values, Collector<OUT> out) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/41717d12/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
new file mode 100644
index 0000000..d5ed6cb
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
@@ -0,0 +1,101 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.evictors.Evictor;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.EvictingWindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class EvictingNonKeyedWindowOperator<IN, OUT, W extends Window> extends NonKeyedWindowOperator<IN, OUT, W> {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(EvictingNonKeyedWindowOperator.class);
+
+	private final Evictor<? super IN, ? super W> evictor;
+
+	public EvictingNonKeyedWindowOperator(WindowAssigner<? super IN, W> windowAssigner,
+			WindowBufferFactory<? super IN, ? extends EvictingWindowBuffer<IN>> windowBufferFactory,
+			WindowFunction<IN, OUT, W> windowFunction,
+			Trigger<? super IN, ? super W> trigger,
+			Evictor<? super IN, ? super W> evictor) {
+		super(windowAssigner, windowBufferFactory, windowFunction, trigger);
+		this.evictor = evictor;
+	}
+
+	@Override
+	@SuppressWarnings("unchecked, rawtypes")
+	protected void emitWindow(W window, boolean purge) throws Exception {
+
+		timestampedCollector.setTimestamp(window.getEnd());
+
+		Tuple2<WindowBuffer<IN>, TriggerContext> bufferAndTrigger;
+		if (purge) {
+			bufferAndTrigger = windows.remove(window);
+		} else {
+			bufferAndTrigger = windows.get(window);
+		}
+
+		if (bufferAndTrigger == null) {
+			LOG.debug("Window {} already gone.", window);
+			return;
+		}
+
+
+		EvictingWindowBuffer<IN> windowBuffer = (EvictingWindowBuffer<IN>) bufferAndTrigger.f0;
+
+		int toEvict = 0;
+		if (windowBuffer.size() > 0) {
+			// need some type trickery here...
+			toEvict = evictor.evict((Iterable) windowBuffer.getElements(), windowBuffer.size(), window);
+		}
+
+		windowBuffer.removeElements(toEvict);
+
+		userFunction.evaluate(
+				window,
+				bufferAndTrigger.f0.getUnpackedElements(),
+				timestampedCollector);
+	}
+
+	@Override
+	public EvictingNonKeyedWindowOperator<IN, OUT, W> enableSetProcessingTime(boolean setProcessingTime) {
+		super.enableSetProcessingTime(setProcessingTime);
+		return this;
+	}
+
+
+	// ------------------------------------------------------------------------
+	// Getters for testing
+	// ------------------------------------------------------------------------
+
+	@VisibleForTesting
+	public Evictor<? super IN, ? super W> getEvictor() {
+		return evictor;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/41717d12/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
new file mode 100644
index 0000000..3a85759
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -0,0 +1,285 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+public class NonKeyedWindowOperator<IN, OUT, W extends Window>
+		extends AbstractUdfStreamOperator<OUT, WindowFunction<IN, OUT, W>>
+		implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(NonKeyedWindowOperator.class);
+
+
+	private final WindowAssigner<? super IN, W> windowAssigner;
+
+	private final Trigger<? super IN, ? super W> triggerTemplate;
+	private final WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory;
+
+	protected transient Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>> windows;
+
+	private transient Map<Long, Set<TriggerContext>> processingTimeTimers;
+	private transient Map<Long, Set<TriggerContext>> watermarkTimers;
+
+	protected transient TimestampedCollector<OUT> timestampedCollector;
+
+	private boolean setProcessingTime = false;
+
+	private TypeSerializer<IN> inputSerializer;
+
+	public NonKeyedWindowOperator(WindowAssigner<? super IN, W> windowAssigner,
+			WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory,
+			WindowFunction<IN, OUT, W> windowFunction,
+			Trigger<? super IN, ? super W> trigger) {
+
+		super(windowFunction);
+
+		this.windowAssigner = windowAssigner;
+
+		this.windowBufferFactory = windowBufferFactory;
+		this.triggerTemplate = trigger;
+
+		setChainingStrategy(ChainingStrategy.ALWAYS);
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
+		inputSerializer = (TypeSerializer<IN>) type.createSerializer(executionConfig);
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		windows = Maps.newHashMap();
+		watermarkTimers = Maps.newHashMap();
+		processingTimeTimers = Maps.newHashMap();
+		timestampedCollector = new TimestampedCollector<>(output);
+
+		if (inputSerializer == null) {
+			throw new IllegalStateException("Input serializer was not set.");
+		}
+
+		windowBufferFactory.setRuntimeContext(getRuntimeContext());
+		windowBufferFactory.open(parameters);
+	}
+
+	@Override
+	public void close() throws Exception {
+		super.close();
+		// emit the elements that we still keep
+		for (W window: windows.keySet()) {
+			emitWindow(window, false);
+		}
+		windows.clear();
+		windowBufferFactory.close();
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public void processElement(StreamRecord<IN> element) throws Exception {
+		if (setProcessingTime) {
+			element.replace(element.getValue(), System.currentTimeMillis());
+		}
+		Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(), element.getTimestamp());
+
+		for (W window: elementWindows) {
+			Tuple2<WindowBuffer<IN>, TriggerContext> bufferAndTrigger = windows.get(window);
+			if (bufferAndTrigger == null) {
+				bufferAndTrigger = new Tuple2<>();
+				bufferAndTrigger.f0 = windowBufferFactory.create();
+				bufferAndTrigger.f1 = new TriggerContext(window, triggerTemplate.duplicate());
+				windows.put(window, bufferAndTrigger);
+			}
+			StreamRecord<IN> elementCopy = new StreamRecord<>(inputSerializer.copy(element.getValue()), element.getTimestamp());
+			bufferAndTrigger.f0.storeElement(elementCopy);
+			Trigger.TriggerResult triggerResult = bufferAndTrigger.f1.trigger.onElement(elementCopy.getValue(), elementCopy.getTimestamp(), window, bufferAndTrigger.f1);
+			processTriggerResult(triggerResult, window);
+		}
+	}
+
+	protected void emitWindow(W window, boolean purge) throws Exception {
+		timestampedCollector.setTimestamp(window.getEnd());
+
+		Tuple2<WindowBuffer<IN>, TriggerContext> bufferAndTrigger;
+		if (purge) {
+			bufferAndTrigger = windows.remove(window);
+		} else {
+			bufferAndTrigger = windows.get(window);
+		}
+
+		if (bufferAndTrigger == null) {
+			LOG.debug("Window {} already gone.", window);
+			return;
+		}
+
+
+		userFunction.evaluate(
+				window,
+				bufferAndTrigger.f0.getUnpackedElements(),
+				timestampedCollector);
+	}
+
+	private void processTriggerResult(Trigger.TriggerResult triggerResult, W window) throws Exception {
+		switch (triggerResult) {
+			case FIRE:
+				emitWindow(window, false);
+				break;
+
+			case FIRE_AND_PURGE:
+				emitWindow(window, true);
+				break;
+
+			case CONTINUE:
+				// ingore
+		}
+	}
+
+	@Override
+	public void processWatermark(Watermark mark) throws Exception {
+		Set<Long> toRemove = Sets.newHashSet();
+
+		for (Map.Entry<Long, Set<TriggerContext>> triggers: watermarkTimers.entrySet()) {
+			if (triggers.getKey() <= mark.getTimestamp()) {
+				for (TriggerContext trigger: triggers.getValue()) {
+					Trigger.TriggerResult triggerResult = trigger.trigger.onTime(mark.getTimestamp(), trigger);
+					processTriggerResult(triggerResult, trigger.window);
+				}
+				toRemove.add(triggers.getKey());
+			}
+		}
+
+		for (Long l: toRemove) {
+			watermarkTimers.remove(l);
+		}
+		output.emitWatermark(mark);
+	}
+
+	@Override
+	public void trigger(long time) throws Exception {
+		Set<Long> toRemove = Sets.newHashSet();
+
+		for (Map.Entry<Long, Set<TriggerContext>> triggers: processingTimeTimers.entrySet()) {
+			if (triggers.getKey() < time) {
+				for (TriggerContext trigger: triggers.getValue()) {
+					Trigger.TriggerResult triggerResult = trigger.trigger.onTime(time, trigger);
+					processTriggerResult(triggerResult, trigger.window);
+				}
+				toRemove.add(triggers.getKey());
+			}
+		}
+
+		for (Long l: toRemove) {
+			processingTimeTimers.remove(l);
+		}
+	}
+
+	protected class TriggerContext implements Trigger.TriggerContext {
+		Trigger<? super IN, ? super W> trigger;
+		W window;
+
+		public TriggerContext(W window, Trigger<? super IN, ? super W> trigger) {
+			this.window = window;
+			this.trigger = trigger;
+		}
+
+		@Override
+		public void registerProcessingTimeTimer(long time) {
+			Set<TriggerContext> triggers = processingTimeTimers.get(time);
+			if (triggers == null) {
+				getRuntimeContext().registerTimer(time, NonKeyedWindowOperator.this);
+				triggers = Sets.newHashSet();
+				processingTimeTimers.put(time, triggers);
+			}
+			triggers.add(this);
+		}
+
+		@Override
+		public void registerWatermarkTimer(long time) {
+			Set<TriggerContext> triggers = watermarkTimers.get(time);
+			if (triggers == null) {
+				triggers = Sets.newHashSet();
+				watermarkTimers.put(time, triggers);
+			}
+			triggers.add(this);
+		}
+	}
+
+	/**
+	 * When this flag is enabled the current processing time is set as the timestamp of elements
+	 * upon arrival. This must be used, for example, when using the
+	 * {@link org.apache.flink.streaming.api.windowing.evictors.TimeEvictor} with processing
+	 * time semantics.
+	 */
+	public NonKeyedWindowOperator<IN, OUT, W> enableSetProcessingTime(boolean setProcessingTime) {
+		this.setProcessingTime = setProcessingTime;
+		return this;
+	}
+
+	// ------------------------------------------------------------------------
+	// Getters for testing
+	// ------------------------------------------------------------------------
+
+	@VisibleForTesting
+	public Trigger<? super IN, ? super W> getTriggerTemplate() {
+		return triggerTemplate;
+	}
+
+	@VisibleForTesting
+	public WindowAssigner<? super IN, W> getWindowAssigner() {
+		return windowAssigner;
+	}
+
+	@VisibleForTesting
+	public WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> getWindowBufferFactory() {
+		return windowBufferFactory;
+	}
+
+	@VisibleForTesting
+	public boolean isSetProcessingTime() {
+		return setProcessingTime;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/41717d12/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java
new file mode 100644
index 0000000..0dfceab
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.windowing.ReduceKeyedWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Comparator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class EvictingNonKeyedWindowOperatorTest {
+
+	// For counting if close() is called the correct number of times on the SumReducer
+
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testCountTrigger() throws Exception {
+		AtomicInteger closeCalled = new AtomicInteger(0);
+
+		final int WINDOW_SIZE = 4;
+		final int WINDOW_SLIDE = 2;
+
+		EvictingNonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingNonKeyedWindowOperator<>(
+				GlobalWindows.create(),
+				new HeapWindowBuffer.Factory<Tuple2<String, Integer>>(),
+				new ReduceWindowFunction<GlobalWindow, Tuple2<String, Integer>>(new SumReducer(closeCalled)),
+				CountTrigger.of(WINDOW_SLIDE),
+				CountEvictor.of(WINDOW_SIZE));
+
+		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
+
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+				new OneInputStreamOperatorTestHarness<>(operator);
+
+		long initialTime = 0L;
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		testHarness.open();
+
+		// The global window actually ignores these timestamps...
+
+		// add elements out-of-order
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999));
+
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
+
+
+
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
+
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+		testHarness.close();
+
+		Assert.assertEquals("Close was not called.", 1, closeCalled.get());
+
+
+	}
+
+	// ------------------------------------------------------------------------
+	//  UDFs
+	// ------------------------------------------------------------------------
+
+	public static class SumReducer extends RichReduceFunction<Tuple2<String, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		private boolean openCalled = false;
+
+		private  AtomicInteger closeCalled;
+
+		public SumReducer(AtomicInteger closeCalled) {
+			this.closeCalled = closeCalled;
+		}
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+			openCalled = true;
+		}
+
+		@Override
+		public void close() throws Exception {
+			super.close();
+			closeCalled.incrementAndGet();
+		}
+
+		@Override
+		public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
+				Tuple2<String, Integer> value2) throws Exception {
+			if (!openCalled) {
+				Assert.fail("Open was not called");
+			}
+			return new Tuple2<>(value2.f0, value1.f1 + value2.f1);
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	private static class ResultSortComparator implements Comparator<Object> {
+		@Override
+		public int compare(Object o1, Object o2) {
+			if (o1 instanceof Watermark || o2 instanceof Watermark) {
+				return 0;
+			} else {
+				StreamRecord<Tuple2<String, Integer>> sr0 = (StreamRecord<Tuple2<String, Integer>>) o1;
+				StreamRecord<Tuple2<String, Integer>> sr1 = (StreamRecord<Tuple2<String, Integer>>) o2;
+				if (sr0.getTimestamp() != sr1.getTimestamp()) {
+					return (int) (sr0.getTimestamp() - sr1.getTimestamp());
+				}
+				int comparison = sr0.getValue().f0.compareTo(sr1.getValue().f0);
+				if (comparison != 0) {
+					return comparison;
+				} else {
+					return sr0.getValue().f1 - sr1.getValue().f1;
+				}
+			}
+		}
+	}
+
+	private static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public String getKey(Tuple2<String, Integer> value) throws Exception {
+			return value.f0;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/41717d12/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
new file mode 100644
index 0000000..b74b3ea
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
@@ -0,0 +1,434 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.windowing.ReduceKeyedWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
+import org.apache.flink.streaming.api.windowing.triggers.ContinuousWatermarkTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@RunWith(Parameterized.class)
+public class NonKeyedWindowOperatorTest {
+
+	@SuppressWarnings("unchecked,rawtypes")
+	private WindowBufferFactory windowBufferFactory;
+
+	public NonKeyedWindowOperatorTest(WindowBufferFactory<?, ?> windowBufferFactory) {
+		this.windowBufferFactory = windowBufferFactory;
+	}
+
+	// For counting if close() is called the correct number of times on the SumReducer
+	private static AtomicInteger closeCalled = new AtomicInteger(0);
+
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testSlidingEventTimeWindows() throws Exception {
+		closeCalled.set(0);
+
+		final int WINDOW_SIZE = 3000;
+		final int WINDOW_SLIDE = 1000;
+
+		NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new NonKeyedWindowOperator<>(
+				SlidingTimeWindows.of(WINDOW_SIZE, WINDOW_SLIDE),
+				windowBufferFactory,
+				new ReduceWindowFunction<TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
+				WatermarkTrigger.create());
+
+		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+				new OneInputStreamOperatorTestHarness<>(operator);
+
+		long initialTime = 0L;
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		testHarness.open();
+
+		// add elements out-of-order
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
+
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
+
+
+		testHarness.processWatermark(new Watermark(initialTime + 999));
+		expectedOutput.add(new Watermark(999));
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+
+		testHarness.processWatermark(new Watermark(initialTime + 1999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), initialTime + 1999));
+		expectedOutput.add(new Watermark(1999));
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+		testHarness.processWatermark(new Watermark(initialTime + 2999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), initialTime + 2999));
+		expectedOutput.add(new Watermark(2999));
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+		testHarness.processWatermark(new Watermark(initialTime + 3999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 5), initialTime + 3999));
+		expectedOutput.add(new Watermark(3999));
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+		testHarness.processWatermark(new Watermark(initialTime + 4999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), initialTime + 4999));
+		expectedOutput.add(new Watermark(4999));
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+		testHarness.processWatermark(new Watermark(initialTime + 5999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), initialTime + 5999));
+		expectedOutput.add(new Watermark(5999));
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+
+		// those don't have any effect...
+		testHarness.processWatermark(new Watermark(initialTime + 6999));
+		testHarness.processWatermark(new Watermark(initialTime + 7999));
+		expectedOutput.add(new Watermark(6999));
+		expectedOutput.add(new Watermark(7999));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+		testHarness.close();
+		if (windowBufferFactory instanceof PreAggregatingHeapWindowBuffer.Factory) {
+			Assert.assertEquals("Close was not called.", 2, closeCalled.get());
+		} else {
+			Assert.assertEquals("Close was not called.", 1, closeCalled.get());
+		}
+	}
+
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testTumblingEventTimeWindows() throws Exception {
+		closeCalled.set(0);
+
+		final int WINDOW_SIZE = 3000;
+
+		NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new NonKeyedWindowOperator<>(
+				TumblingTimeWindows.of(WINDOW_SIZE),
+				windowBufferFactory,
+				new ReduceWindowFunction<TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
+				WatermarkTrigger.create());
+
+		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
+
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+				new OneInputStreamOperatorTestHarness<>(operator);
+
+		long initialTime = 0L;
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		testHarness.open();
+
+		// add elements out-of-order
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
+
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
+
+
+		testHarness.processWatermark(new Watermark(initialTime + 999));
+		expectedOutput.add(new Watermark(999));
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+
+		testHarness.processWatermark(new Watermark(initialTime + 1999));
+		expectedOutput.add(new Watermark(1999));
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+		testHarness.processWatermark(new Watermark(initialTime + 2999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), initialTime + 2999));
+		expectedOutput.add(new Watermark(2999));
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+		testHarness.processWatermark(new Watermark(initialTime + 3999));
+		expectedOutput.add(new Watermark(3999));
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+		testHarness.processWatermark(new Watermark(initialTime + 4999));
+		expectedOutput.add(new Watermark(4999));
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+		testHarness.processWatermark(new Watermark(initialTime + 5999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), initialTime + 5999));
+		expectedOutput.add(new Watermark(5999));
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+
+		// those don't have any effect...
+		testHarness.processWatermark(new Watermark(initialTime + 6999));
+		testHarness.processWatermark(new Watermark(initialTime + 7999));
+		expectedOutput.add(new Watermark(6999));
+		expectedOutput.add(new Watermark(7999));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+		testHarness.close();
+		if (windowBufferFactory instanceof PreAggregatingHeapWindowBuffer.Factory) {
+			Assert.assertEquals("Close was not called.", 2, closeCalled.get());
+		} else {
+			Assert.assertEquals("Close was not called.", 1, closeCalled.get());
+		}
+	}
+
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testContinuousWatermarkTrigger() throws Exception {
+		closeCalled.set(0);
+
+		final int WINDOW_SIZE = 3000;
+
+		NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new NonKeyedWindowOperator<>(
+				GlobalWindows.create(),
+				windowBufferFactory,
+				new ReduceWindowFunction<GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
+				ContinuousWatermarkTrigger.of(WINDOW_SIZE));
+
+		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+				new OneInputStreamOperatorTestHarness<>(operator);
+
+		long initialTime = 0L;
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		testHarness.open();
+
+		// The global window actually ignores these timestamps...
+
+		// add elements out-of-order
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999));
+
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
+
+
+		testHarness.processWatermark(new Watermark(initialTime + 1000));
+		expectedOutput.add(new Watermark(1000));
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+
+		testHarness.processWatermark(new Watermark(initialTime + 2000));
+		expectedOutput.add(new Watermark(2000));
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+		testHarness.processWatermark(new Watermark(initialTime + 3000));
+		expectedOutput.add(new Watermark(3000));
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+		testHarness.processWatermark(new Watermark(initialTime + 4000));
+		expectedOutput.add(new Watermark(4000));
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+		testHarness.processWatermark(new Watermark(initialTime + 5000));
+		expectedOutput.add(new Watermark(5000));
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+		testHarness.processWatermark(new Watermark(initialTime + 6000));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 5), Long.MAX_VALUE));
+		expectedOutput.add(new Watermark(6000));
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+
+		// those don't have any effect...
+		testHarness.processWatermark(new Watermark(initialTime + 7000));
+		testHarness.processWatermark(new Watermark(initialTime + 8000));
+		expectedOutput.add(new Watermark(7000));
+		expectedOutput.add(new Watermark(8000));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+		testHarness.close();
+		if (windowBufferFactory instanceof PreAggregatingHeapWindowBuffer.Factory) {
+			Assert.assertEquals("Close was not called.", 2, closeCalled.get());
+		} else {
+			Assert.assertEquals("Close was not called.", 1, closeCalled.get());
+		}
+	}
+
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testCountTrigger() throws Exception {
+		closeCalled.set(0);
+
+		final int WINDOW_SIZE = 4;
+
+		NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new NonKeyedWindowOperator<>(
+				GlobalWindows.create(),
+				windowBufferFactory,
+				new ReduceWindowFunction<GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
+				PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE)));
+
+		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse(
+				"Tuple2<String, Integer>"), new ExecutionConfig());
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+				new OneInputStreamOperatorTestHarness<>(operator);
+
+		long initialTime = 0L;
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		testHarness.open();
+
+		// The global window actually ignores these timestamps...
+
+		// add elements out-of-order
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
+
+
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
+
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+		testHarness.close();
+		if (windowBufferFactory instanceof PreAggregatingHeapWindowBuffer.Factory) {
+			Assert.assertEquals("Close was not called.", 2, closeCalled.get());
+		} else {
+			Assert.assertEquals("Close was not called.", 1, closeCalled.get());
+		}
+
+	}
+
+	// ------------------------------------------------------------------------
+	//  UDFs
+	// ------------------------------------------------------------------------
+
+	public static class SumReducer extends RichReduceFunction<Tuple2<String, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		private boolean openCalled = false;
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+			openCalled = true;
+		}
+
+		@Override
+		public void close() throws Exception {
+			super.close();
+			closeCalled.incrementAndGet();
+		}
+
+		@Override
+		public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
+				Tuple2<String, Integer> value2) throws Exception {
+			if (!openCalled) {
+				Assert.fail("Open was not called");
+			}
+			return new Tuple2<>(value2.f0, value1.f1 + value2.f1);
+		}
+	}
+	// ------------------------------------------------------------------------
+	//  Parametrization for testing different window buffers
+	// ------------------------------------------------------------------------
+
+	@Parameterized.Parameters(name = "WindowBuffer = {0}")
+	@SuppressWarnings("unchecked,rawtypes")
+	public static Collection<WindowBufferFactory[]> windowBuffers(){
+		return Arrays.asList(new WindowBufferFactory[]{new PreAggregatingHeapWindowBuffer.Factory(new SumReducer())},
+				new WindowBufferFactory[]{new HeapWindowBuffer.Factory()}
+				);
+	}
+
+	@SuppressWarnings("unchecked")
+	private static class ResultSortComparator implements Comparator<Object> {
+		@Override
+		public int compare(Object o1, Object o2) {
+			if (o1 instanceof Watermark || o2 instanceof Watermark) {
+				return 0;
+			} else {
+				StreamRecord<Tuple2<String, Integer>> sr0 = (StreamRecord<Tuple2<String, Integer>>) o1;
+				StreamRecord<Tuple2<String, Integer>> sr1 = (StreamRecord<Tuple2<String, Integer>>) o2;
+				if (sr0.getTimestamp() != sr1.getTimestamp()) {
+					return (int) (sr0.getTimestamp() - sr1.getTimestamp());
+				}
+				int comparison = sr0.getValue().f0.compareTo(sr1.getValue().f0);
+				if (comparison != 0) {
+					return comparison;
+				} else {
+					return sr0.getValue().f1 - sr1.getValue().f1;
+				}
+			}
+		}
+	}
+
+	private static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public String getKey(Tuple2<String, Integer> value) throws Exception {
+			return value.f0;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/41717d12/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonParallelWindowDataStreamTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonParallelWindowDataStreamTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonParallelWindowDataStreamTranslationTest.java
new file mode 100644
index 0000000..4babee1
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonParallelWindowDataStreamTranslationTest.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
+import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * These tests verify that the api calls on
+ * {@link org.apache.flink.streaming.api.datastream.KeyedWindowDataStream} instantiate
+ * the correct window operator.
+ */
+public class NonParallelWindowDataStreamTranslationTest extends StreamingMultipleProgramsTestBase {
+
+	/**
+	 * These tests ensure that the fast aligned time windows operator is used if the
+	 * conditions are right.
+	 *
+	 * TODO: update once fast aligned time windows operator is in
+	 */
+	@Ignore
+	@Test
+	public void testFastTimeWindows() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DummyReducer reducer = new DummyReducer();
+
+		DataStream<Tuple2<String, Integer>> window1 = source
+				.windowAll(SlidingProcessingTimeWindows.of(1000, 100))
+				.reduceWindow(reducer);
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
+		Assert.assertTrue(operator1 instanceof AggregatingProcessingTimeWindowOperator);
+
+		DataStream<Tuple2<String, Integer>> window2 = source
+				.windowAll(SlidingProcessingTimeWindows.of(1000, 100))
+				.mapWindow(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void evaluate(
+							TimeWindow window,
+							Iterable<Tuple2<String, Integer>> values,
+							Collector<Tuple2<String, Integer>> out) throws Exception {
+
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
+		Assert.assertTrue(operator2 instanceof AccumulatingProcessingTimeWindowOperator);
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testNonEvicting() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DummyReducer reducer = new DummyReducer();
+
+		DataStream<Tuple2<String, Integer>> window1 = source
+				.windowAll(SlidingProcessingTimeWindows.of(1000, 100))
+				.trigger(CountTrigger.of(100))
+				.reduceWindow(reducer);
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
+		Assert.assertTrue(operator1 instanceof NonKeyedWindowOperator);
+		NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) operator1;
+		Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof CountTrigger);
+		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingProcessingTimeWindows);
+		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);
+
+		DataStream<Tuple2<String, Integer>> window2 = source
+				.windowAll(TumblingProcessingTimeWindows.of(1000))
+				.trigger(CountTrigger.of(100))
+				.mapWindow(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void evaluate(
+							TimeWindow window,
+							Iterable<Tuple2<String, Integer>> values,
+							Collector<Tuple2<String, Integer>> out) throws Exception {
+
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
+		Assert.assertTrue(operator2 instanceof NonKeyedWindowOperator);
+		NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) operator2;
+		Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof CountTrigger);
+		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testEvicting() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DummyReducer reducer = new DummyReducer();
+
+		DataStream<Tuple2<String, Integer>> window1 = source
+				.windowAll(SlidingProcessingTimeWindows.of(1000, 100))
+				.evictor(CountEvictor.of(100))
+				.reduceWindow(reducer);
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
+		Assert.assertTrue(operator1 instanceof EvictingNonKeyedWindowOperator);
+		EvictingNonKeyedWindowOperator winOperator1 = (EvictingNonKeyedWindowOperator) operator1;
+		Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof ProcessingTimeTrigger);
+		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingProcessingTimeWindows);
+		Assert.assertTrue(winOperator1.getEvictor() instanceof CountEvictor);
+		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
+
+		DataStream<Tuple2<String, Integer>> window2 = source
+				.windowAll(TumblingProcessingTimeWindows.of(1000))
+				.trigger(CountTrigger.of(100))
+				.evictor(TimeEvictor.of(100))
+				.mapWindow(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void evaluate(
+							TimeWindow window,
+							Iterable<Tuple2<String, Integer>> values,
+							Collector<Tuple2<String, Integer>> out) throws Exception {
+
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
+		Assert.assertTrue(operator2 instanceof EvictingNonKeyedWindowOperator);
+		EvictingNonKeyedWindowOperator winOperator2 = (EvictingNonKeyedWindowOperator) operator2;
+		Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof CountTrigger);
+		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+		Assert.assertTrue(winOperator2.getEvictor() instanceof TimeEvictor);
+		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
+	}
+
+	// ------------------------------------------------------------------------
+	//  UDFs
+	// ------------------------------------------------------------------------
+
+	public static class DummyReducer extends RichReduceFunction<Tuple2<String, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
+			return value1;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/41717d12/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
index d0aa547..aaf21e0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.windowing.time.Time;
@@ -30,6 +31,7 @@ import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import java.util.concurrent.TimeUnit;
@@ -82,6 +84,49 @@ public class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		Assert.assertTrue(operator2 instanceof AccumulatingProcessingTimeWindowOperator);
 	}
 
+	/**
+	 * These tests ensure that the fast aligned time windows operator is used if the
+	 * conditions are right.
+	 *
+	 * TODO: update once the fast aligned time windows operator is in
+	 */
+	@Ignore
+	@Test
+	public void testNonParallelFastTimeWindows() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DummyReducer reducer = new DummyReducer();
+
+		DataStream<Tuple2<String, Integer>> window1 = source
+				.timeWindowAll(Time.of(1000, TimeUnit.MILLISECONDS),
+						Time.of(100, TimeUnit.MILLISECONDS))
+				.reduceWindow(reducer);
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
+		Assert.assertTrue(operator1 instanceof AggregatingProcessingTimeWindowOperator);
+
+		DataStream<Tuple2<String, Integer>> window2 = source
+				.timeWindowAll(Time.of(1000, TimeUnit.MILLISECONDS))
+				.mapWindow(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void evaluate(
+							TimeWindow window,
+							Iterable<Tuple2<String, Integer>> values,
+							Collector<Tuple2<String, Integer>> out) throws Exception {
+
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
+		Assert.assertTrue(operator2 instanceof AccumulatingProcessingTimeWindowOperator);
+	}
+
 	// ------------------------------------------------------------------------
 	//  UDFs
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/41717d12/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
index e7d9953..f53b986 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
@@ -62,7 +62,11 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
 
 
       // TypeHints are only needed for Java API, Scala API doesn't need them
-      "org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.returns"
+      "org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.returns",
+
+      // Deactivated until Scala API has new windowing API
+      "org.apache.flink.streaming.api.datastream.DataStream.timeWindowAll",
+      "org.apache.flink.streaming.api.datastream.DataStream.windowAll"
     )
     val excludedPatterns = Seq(
       // We don't have project on tuples in the Scala API