You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/09/28 18:15:00 UTC

[04/12] flink git commit: Move window operators and tests to windowing package

Move window operators and tests to windowing package

The api package is also called windowing, this harmonizes the package
names.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/05d2138f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/05d2138f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/05d2138f

Branch: refs/heads/master
Commit: 05d2138f081ff5fa274dab571b9327f96be693aa
Parents: a606c4a
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Sep 24 16:33:09 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Sep 28 17:04:16 2015 +0200

----------------------------------------------------------------------
 .../api/datastream/KeyedWindowDataStream.java   |   2 +-
 ...ractAlignedProcessingTimeWindowOperator.java | 223 +++++++
 .../windowing/AbstractKeyedTimePanes.java       |  76 +++
 .../windowing/AccumulatingKeyedTimePanes.java   | 126 ++++
 ...ccumulatingProcessingTimeWindowOperator.java |  48 ++
 .../windowing/AggregatingKeyedTimePanes.java    | 103 +++
 ...AggregatingProcessingTimeWindowOperator.java |  47 ++
 .../runtime/operators/windowing/KeyMap.java     | 651 +++++++++++++++++++
 .../operators/windowing/PolicyToOperator.java   |  82 +++
 .../operators/windowing/package-info.java       |  22 +
 ...ractAlignedProcessingTimeWindowOperator.java | 223 -------
 .../windows/AbstractKeyedTimePanes.java         |  76 ---
 .../windows/AccumulatingKeyedTimePanes.java     | 126 ----
 ...ccumulatingProcessingTimeWindowOperator.java |  48 --
 .../windows/AggregatingKeyedTimePanes.java      | 103 ---
 ...AggregatingProcessingTimeWindowOperator.java |  47 --
 .../runtime/operators/windows/KeyMap.java       | 651 -------------------
 .../operators/windows/PolicyToOperator.java     |  82 ---
 .../runtime/operators/windows/package-info.java |  22 -
 ...AlignedProcessingTimeWindowOperatorTest.java | 547 ++++++++++++++++
 ...AlignedProcessingTimeWindowOperatorTest.java | 550 ++++++++++++++++
 .../operators/windowing/CollectingOutput.java   |  80 +++
 .../windowing/KeyMapPutIfAbsentTest.java        | 121 ++++
 .../operators/windowing/KeyMapPutTest.java      | 136 ++++
 .../runtime/operators/windowing/KeyMapTest.java | 344 ++++++++++
 ...AlignedProcessingTimeWindowOperatorTest.java | 547 ----------------
 ...AlignedProcessingTimeWindowOperatorTest.java | 551 ----------------
 .../operators/windows/CollectingOutput.java     |  80 ---
 .../windows/KeyMapPutIfAbsentTest.java          | 121 ----
 .../operators/windows/KeyMapPutTest.java        | 136 ----
 .../runtime/operators/windows/KeyMapTest.java   | 344 ----------
 31 files changed, 3157 insertions(+), 3158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
index dfb7032..37151d7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
@@ -28,7 +28,7 @@ import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy;
-import org.apache.flink.streaming.runtime.operators.windows.PolicyToOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.PolicyToOperator;
 
 /**
  * A KeyedWindowDataStream represents a data stream where elements are grouped by key, and 

http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
new file mode 100644
index 0000000..6c4e53a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.commons.math3.util.ArithmeticUtils;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.MathUtils;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+
+public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT> extends AbstractStreamOperator<OUT> 
+		implements OneInputStreamOperator<IN, OUT>, Triggerable {
+	
+	private static final long serialVersionUID = 3245500864882459867L;
+	
+	private static final long MIN_SLIDE_TIME = 50;
+	
+	// ----- fields for operator parametrization -----
+	
+	private final Function function;
+	private final KeySelector<IN, KEY> keySelector;
+	
+	private final long windowSize;
+	private final long windowSlide;
+	private final long paneSize;
+	private final int numPanesPerWindow;
+	
+	// ----- fields for operator functionality -----
+	
+	private transient AbstractKeyedTimePanes<IN, KEY, ?, OUT> panes;
+	
+	private transient TimestampedCollector<OUT> out;
+	
+	private transient long nextEvaluationTime;
+	private transient long nextSlideTime;
+	
+	protected AbstractAlignedProcessingTimeWindowOperator(
+			Function function,
+			KeySelector<IN, KEY> keySelector,
+			long windowLength,
+			long windowSlide)
+	{
+		if (function == null || keySelector == null) {
+			throw new NullPointerException();
+		}
+		if (windowLength < MIN_SLIDE_TIME) {
+			throw new IllegalArgumentException("Window length must be at least " + MIN_SLIDE_TIME + " msecs");
+		}
+		if (windowSlide < MIN_SLIDE_TIME) {
+			throw new IllegalArgumentException("Window slide must be at least " + MIN_SLIDE_TIME + " msecs");
+		}
+		if (windowLength < windowSlide) {
+			throw new IllegalArgumentException("The window size must be larger than the window slide");
+		}
+		
+		final long paneSlide = ArithmeticUtils.gcd(windowLength, windowSlide);
+		if (paneSlide < MIN_SLIDE_TIME) {
+			throw new IllegalArgumentException(String.format(
+					"Cannot compute window of size %d msecs sliding by %d msecs. " +
+							"The unit of grouping is too small: %d msecs", windowLength, windowSlide, paneSlide));
+		}
+		
+		this.function = function;
+		this.keySelector = keySelector;
+		this.windowSize = windowLength;
+		this.windowSlide = windowSlide;
+		this.paneSize = paneSlide;
+		this.numPanesPerWindow = MathUtils.checkedDownCast(windowLength / paneSlide);
+	}
+	
+	
+	protected abstract AbstractKeyedTimePanes<IN, KEY, ?, OUT> createPanes(
+			KeySelector<IN, KEY> keySelector, Function function);
+
+	// ------------------------------------------------------------------------
+	//  startup and shutdown
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		out = new TimestampedCollector<>(output);
+		
+		// create the panes that gather the elements per slide
+		panes = createPanes(keySelector, function);
+		
+		// decide when to first compute the window and when to slide it
+		// the values should align with the start of time (that is, the UNIX epoch, not the big bang)
+		final long now = System.currentTimeMillis();
+		nextEvaluationTime = now + windowSlide - (now % windowSlide);
+		nextSlideTime = now + paneSize - (now % paneSize);
+		
+		getRuntimeContext().registerTimer(Math.min(nextEvaluationTime, nextSlideTime), this);
+	}
+
+	@Override
+	public void close() throws Exception {
+		final long finalWindowTimestamp = nextEvaluationTime;
+
+		// early stop the triggering thread, so it does not attempt to return any more data
+		stopTriggers();
+
+		// emit the remaining data
+		computeWindow(finalWindowTimestamp);
+	}
+
+	@Override
+	public void dispose() {
+		// acquire the lock during shutdown, to prevent trigger calls at the same time
+		// fail-safe stop of the triggering thread (in case of an error)
+		stopTriggers();
+
+		// release the panes
+		panes.dispose();
+	}
+	
+	private void stopTriggers() {
+		// reset the action timestamps. this makes sure any pending triggers will not evaluate
+		nextEvaluationTime = -1L;
+		nextSlideTime = -1L;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Receiving elements and triggers
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public void processElement(StreamRecord<IN> element) throws Exception {
+		panes.addElementToLatestPane(element.getValue());
+	}
+
+	@Override
+	public void processWatermark(Watermark mark) {
+		// this operator does not react to watermarks
+	}
+
+	@Override
+	public void trigger(long timestamp) throws Exception {
+		// first we check if we actually trigger the window function
+		if (timestamp == nextEvaluationTime) {
+			// compute and output the results
+			computeWindow(timestamp);
+
+			nextEvaluationTime += windowSlide;
+		}
+
+		// check if we slide the panes by one. this may happen in addition to the
+		// window computation, or just by itself
+		if (timestamp == nextSlideTime) {
+			panes.slidePanes(numPanesPerWindow);
+			nextSlideTime += paneSize;
+		}
+
+		long nextTriggerTime = Math.min(nextEvaluationTime, nextSlideTime);
+		getRuntimeContext().registerTimer(nextTriggerTime, this);
+	}
+	
+	private void computeWindow(long timestamp) throws Exception {
+		out.setTimestamp(timestamp);
+		panes.truncatePanes(numPanesPerWindow);
+		panes.evaluateWindow(out);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Property access (for testing)
+	// ------------------------------------------------------------------------
+
+	public long getWindowSize() {
+		return windowSize;
+	}
+
+	public long getWindowSlide() {
+		return windowSlide;
+	}
+
+	public long getPaneSize() {
+		return paneSize;
+	}
+	
+	public int getNumPanesPerWindow() {
+		return numPanesPerWindow;
+	}
+
+	public long getNextEvaluationTime() {
+		return nextEvaluationTime;
+	}
+
+	public long getNextSlideTime() {
+		return nextSlideTime;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public String toString() {
+		return "Window (processing time) (length=" + windowSize + ", slide=" + windowSlide + ')';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
new file mode 100644
index 0000000..fae024b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayDeque;
+
+
+public abstract class AbstractKeyedTimePanes<Type, Key, Aggregate, Result> {
+	
+	protected KeyMap<Key, Aggregate> latestPane = new KeyMap<>();
+
+	protected final ArrayDeque<KeyMap<Key, Aggregate>> previousPanes = new ArrayDeque<>();
+
+	// ------------------------------------------------------------------------
+
+	public abstract void addElementToLatestPane(Type element) throws Exception;
+
+	public abstract void evaluateWindow(Collector<Result> out) throws Exception;
+	
+	
+	public void dispose() {
+		// since all is heap data, there is no need to clean up anything
+		latestPane = null;
+		previousPanes.clear();
+	}
+	
+	
+	public void slidePanes(int panesToKeep) {
+		if (panesToKeep > 1) {
+			// the current pane becomes the latest previous pane
+			previousPanes.addLast(latestPane);
+
+			// truncate the history
+			while (previousPanes.size() >= panesToKeep) {
+				previousPanes.removeFirst();
+			}
+		}
+
+		// we need a new latest pane
+		latestPane = new KeyMap<>();
+	}
+	
+	public void truncatePanes(int numToRetain) {
+		while (previousPanes.size() >= numToRetain) {
+			previousPanes.removeFirst();
+		}
+	}
+	
+	protected void traverseAllPanes(KeyMap.TraversalEvaluator<Key, Aggregate> traversal, long traversalPass) throws Exception{
+		// gather all panes in an array (faster iterations)
+		@SuppressWarnings({"unchecked", "rawtypes"})
+		KeyMap<Key, Aggregate>[] panes = previousPanes.toArray(new KeyMap[previousPanes.size() + 1]);
+		panes[panes.length - 1] = latestPane;
+
+		// let the maps make a coordinated traversal and evaluate the window function per contained key
+		KeyMap.traverseMaps(panes, traversal, traversalPass);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
new file mode 100644
index 0000000..d85c53e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.util.UnionIterator;
+import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+
+
+public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyedTimePanes<Type, Key, ArrayList<Type>, Result> {
+	
+	private final KeySelector<Type, Key> keySelector;
+
+	private final KeyMap.LazyFactory<ArrayList<Type>> listFactory = getListFactory();
+
+	private final KeyedWindowFunction<Type, Result, Key> function;
+	
+	private long evaluationPass;
+
+	// ------------------------------------------------------------------------
+	
+	public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, KeyedWindowFunction<Type, Result, Key> function) {
+		this.keySelector = keySelector;
+		this.function = function;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void addElementToLatestPane(Type element) throws Exception {
+		Key k = keySelector.getKey(element);
+		ArrayList<Type> elements = latestPane.putIfAbsent(k, listFactory);
+		elements.add(element);
+	}
+
+	@Override
+	public void evaluateWindow(Collector<Result> out) throws Exception {
+		if (previousPanes.isEmpty()) {
+			// optimized path for single pane case (tumbling window)
+			for (KeyMap.Entry<Key, ArrayList<Type>> entry : latestPane) {
+				function.evaluate(entry.getKey(), entry.getValue(), out);
+			}
+		}
+		else {
+			// general code path for multi-pane case
+			WindowFunctionTraversal<Key, Type, Result> evaluator = new WindowFunctionTraversal<>(function, out);
+			traverseAllPanes(evaluator, evaluationPass);
+		}
+		
+		evaluationPass++;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Running a window function in a map traversal
+	// ------------------------------------------------------------------------
+	
+	static final class WindowFunctionTraversal<Key, Type, Result> implements KeyMap.TraversalEvaluator<Key, ArrayList<Type>> {
+
+		private final KeyedWindowFunction<Type, Result, Key> function;
+		
+		private final UnionIterator<Type> unionIterator;
+		
+		private final Collector<Result> out;
+		
+		private Key currentKey;
+
+		WindowFunctionTraversal(KeyedWindowFunction<Type, Result, Key> function, Collector<Result> out) {
+			this.function = function;
+			this.out = out;
+			this.unionIterator = new UnionIterator<>();
+		}
+
+
+		@Override
+		public void startNewKey(Key key) {
+			unionIterator.clear();
+			currentKey = key;
+		}
+
+		@Override
+		public void nextValue(ArrayList<Type> value) {
+			unionIterator.addList(value);
+		}
+
+		@Override
+		public void keyDone() throws Exception {
+			function.evaluate(currentKey, unionIterator, out);
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Lazy factory for lists (put if absent)
+	// ------------------------------------------------------------------------
+	
+	@SuppressWarnings("unchecked")
+	private static <V> KeyMap.LazyFactory<ArrayList<V>> getListFactory() {
+		return (KeyMap.LazyFactory<ArrayList<V>>) LIST_FACTORY;
+	}
+
+	private static final KeyMap.LazyFactory<?> LIST_FACTORY = new KeyMap.LazyFactory<ArrayList<?>>() {
+
+		@Override
+		public ArrayList<?> create() {
+			return new ArrayList<>(4);
+		}
+	};
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
new file mode 100644
index 0000000..4df308d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction;
+
+
+public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT> 
+		extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT>  {
+
+	private static final long serialVersionUID = 7305948082830843475L;
+
+	
+	public AccumulatingProcessingTimeWindowOperator(
+			KeyedWindowFunction<IN, OUT, KEY> function,
+			KeySelector<IN, KEY> keySelector,
+			long windowLength,
+			long windowSlide)
+	{
+		super(function, keySelector, windowLength, windowSlide);
+	}
+
+	@Override
+	protected AccumulatingKeyedTimePanes<IN, KEY, OUT> createPanes(KeySelector<IN, KEY> keySelector, Function function) {
+		@SuppressWarnings("unchecked")
+		KeyedWindowFunction<IN, OUT, KEY> windowFunction = (KeyedWindowFunction<IN, OUT, KEY>) function;
+		
+		return new AccumulatingKeyedTimePanes<>(keySelector, windowFunction);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
new file mode 100644
index 0000000..48f4eb1
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.util.Collector;
+
+
+public class AggregatingKeyedTimePanes<Type, Key> extends AbstractKeyedTimePanes<Type, Key, Type, Type> {
+	
+	private final KeySelector<Type, Key> keySelector;
+	
+	private final ReduceFunction<Type> reducer;
+	
+	private long evaluationPass;
+
+	// ------------------------------------------------------------------------
+	
+	public AggregatingKeyedTimePanes(KeySelector<Type, Key> keySelector, ReduceFunction<Type> reducer) {
+		this.keySelector = keySelector;
+		this.reducer = reducer;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void addElementToLatestPane(Type element) throws Exception {
+		Key k = keySelector.getKey(element);
+		latestPane.putOrAggregate(k, element, reducer);
+	}
+
+	@Override
+	public void evaluateWindow(Collector<Type> out) throws Exception {
+		if (previousPanes.isEmpty()) {
+			// optimized path for single pane case
+			for (KeyMap.Entry<Key, Type> entry : latestPane) {
+				out.collect(entry.getValue());
+			}
+		}
+		else {
+			// general code path for multi-pane case
+			AggregatingTraversal<Key, Type> evaluator = new AggregatingTraversal<>(reducer, out);
+			traverseAllPanes(evaluator, evaluationPass);
+		}
+		
+		evaluationPass++;
+	}
+
+	// ------------------------------------------------------------------------
+	//  The maps traversal that performs the final aggregation
+	// ------------------------------------------------------------------------
+	
+	static final class AggregatingTraversal<Key, Type> implements KeyMap.TraversalEvaluator<Key, Type> {
+
+		private final ReduceFunction<Type> function;
+		
+		private final Collector<Type> out;
+		
+		private Type currentValue;
+
+		AggregatingTraversal(ReduceFunction<Type> function, Collector<Type> out) {
+			this.function = function;
+			this.out = out;
+		}
+
+		@Override
+		public void startNewKey(Key key) {
+			currentValue = null;
+		}
+
+		@Override
+		public void nextValue(Type value) throws Exception {
+			if (currentValue != null) {
+				currentValue = function.reduce(currentValue, value);
+			}
+			else {
+				currentValue = value;
+			}
+		}
+
+		@Override
+		public void keyDone() throws Exception {
+			out.collect(currentValue);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
new file mode 100644
index 0000000..99457bb
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+
+public class AggregatingProcessingTimeWindowOperator<KEY, IN> 
+		extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, IN>  {
+
+	private static final long serialVersionUID = 7305948082830843475L;
+
+	
+	public AggregatingProcessingTimeWindowOperator(
+			ReduceFunction<IN> function,
+			KeySelector<IN, KEY> keySelector,
+			long windowLength,
+			long windowSlide)
+	{
+		super(function, keySelector, windowLength, windowSlide);
+	}
+
+	@Override
+	protected AggregatingKeyedTimePanes<IN, KEY> createPanes(KeySelector<IN, KEY> keySelector, Function function) {
+		@SuppressWarnings("unchecked")
+		ReduceFunction<IN> windowFunction = (ReduceFunction<IN>) function;
+		
+		return new AggregatingKeyedTimePanes<IN, KEY>(keySelector, windowFunction);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMap.java
new file mode 100644
index 0000000..3f44c4a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMap.java
@@ -0,0 +1,651 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.runtime.util.MathUtils;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * A special Hash Map implementation that can be traversed efficiently in sync with other
+ * hash maps.
+ * <p>
+ * The differences between this hash map and Java's "java.util.HashMap" are:
+ * <ul>
+ *     <li>A different hashing scheme. This implementation uses extensible hashing, meaning that
+ *         each hash table growth takes one more lower hash code bit into account, and values that where
+ *         formerly in the same bucket will afterwards be in the two adjacent buckets.</li>
+ *     <li>This allows an efficient traversal of multiple hash maps together, even though the maps are
+ *         of different sizes.</li>
+ *     <li>The map offers functions such as "putIfAbsent()" and "putOrAggregate()"</li>
+ *     <li>The map supports no removal/shrinking.</li>
+ * </ul>
+ */
+public class KeyMap<K, V> implements Iterable<KeyMap.Entry<K, V>> {
+	
+	/** The minimum table capacity, 64 entries */
+	private static final int MIN_CAPACITY = 0x40;
+	
+	/** The maximum possible table capacity, the largest positive power of
+	 * two in the 32bit signed integer value range */
+	private static final int MAX_CAPACITY = 0x40000000;
+	
+	/** The number of bits used for table addressing when table is at max capacity */
+	private static final int FULL_BIT_RANGE = MathUtils.log2strict(MAX_CAPACITY);
+	
+	// ------------------------------------------------------------------------
+	
+	/** The hash index, as an array of entries */
+	private Entry<K, V>[] table;
+	
+	/** The number of bits by which the hash code is shifted right, to find the bucket */
+	private int shift;
+	
+	/** The number of elements in the hash table */
+	private int numElements;
+	
+	/** The number of elements above which the hash table needs to grow */
+	private int rehashThreshold;
+	
+	/** The base-2 logarithm of the table capacity */ 
+	private int log2size;
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new hash table with the default initial capacity.
+	 */
+	public KeyMap() {
+		this(0);
+	}
+
+	/**
+	 * Creates a new table with a capacity tailored to the given expected number of elements.
+	 * 
+	 * @param expectedNumberOfElements The number of elements to tailor the capacity to.
+	 */
+	public KeyMap(int expectedNumberOfElements) {
+		if (expectedNumberOfElements < 0) {
+			throw new IllegalArgumentException("Invalid capacity: " + expectedNumberOfElements);
+		}
+		
+		// round up to the next power or two
+		// guard against too small capacity and integer overflows
+		int capacity = Integer.highestOneBit(expectedNumberOfElements) << 1;
+		capacity = capacity >= 0 ? Math.max(MIN_CAPACITY, capacity) : MAX_CAPACITY;
+
+		// this also acts as a sanity check
+		log2size = MathUtils.log2strict(capacity);
+		shift = FULL_BIT_RANGE - log2size;
+		table = allocateTable(capacity);
+		rehashThreshold = getRehashThreshold(capacity);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Gets and Puts
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Inserts the given value, mapped under the given key. If the table already contains a value for
+	 * the key, the value is replaced and returned. If no value is contained, yet, the function
+	 * returns null.
+	 * 
+	 * @param key The key to insert.
+	 * @param value The value to insert.
+	 * @return The previously mapped value for the key, or null, if no value was mapped for the key.
+	 * 
+	 * @throws java.lang.NullPointerException Thrown, if the key is null.
+	 */
+	public final V put(K key, V value) {
+		final int hash = hash(key);
+		final int slot = indexOf (hash);
+		
+		// search the chain from the slot
+		for (Entry<K, V> e = table[slot]; e != null; e = e.next) {
+			Object k;
+			if (e.hashCode == hash && ((k = e.key) == key || key.equals(k))) {
+				// found match
+				V old = e.value;
+				e.value = value;
+				return old;
+			}
+		}
+
+		// no match, insert a new value
+		insertNewEntry(hash, key, value, slot);
+		return null;
+	}
+
+	/**
+	 * Inserts a value for the given key, if no value is yet contained for that key. Otherwise,
+	 * returns the value currently contained for the key.
+	 * <p>
+	 * The value that is inserted in case that the key is not contained, yet, is lazily created
+	 * using the given factory.
+	 *
+	 * @param key The key to insert.
+	 * @param factory The factory that produces the value, if no value is contained, yet, for the key.
+	 * @return The value in the map after this operation (either the previously contained value, or the
+	 *         newly created value).
+	 * 
+	 * @throws java.lang.NullPointerException Thrown, if the key is null.
+	 */
+	public final V putIfAbsent(K key, LazyFactory<V> factory) {
+		final int hash = hash(key);
+		final int slot = indexOf(hash);
+
+		// search the chain from the slot
+		for (Entry<K, V> entry = table[slot]; entry != null; entry = entry.next) {
+			if (entry.hashCode == hash && entry.key.equals(key)) {
+				// found match
+				return entry.value;
+			}
+		}
+
+		// no match, insert a new value
+		V value = factory.create();
+		insertNewEntry(hash, key, value, slot);
+
+		// return the created value
+		return value;
+	}
+
+	/**
+	 * Inserts or aggregates a value into the hash map. If the hash map does not yet contain the key,
+	 * this method inserts the value. If the table already contains the key (and a value) this
+	 * method will use the given ReduceFunction function to combine the existing value and the
+	 * given value to a new value, and store that value for the key. 
+	 * 
+	 * @param key The key to map the value.
+	 * @param value The new value to insert, or aggregate with the existing value.
+	 * @param aggregator The aggregator to use if a value is already contained.
+	 * 
+	 * @return The value in the map after this operation: Either the given value, or the aggregated value.
+	 * 
+	 * @throws java.lang.NullPointerException Thrown, if the key is null.
+	 * @throws Exception The method forwards exceptions from the aggregation function.
+	 */
+	public final V putOrAggregate(K key, V value, ReduceFunction<V> aggregator) throws Exception {
+		final int hash = hash(key);
+		final int slot = indexOf(hash);
+
+		// search the chain from the slot
+		for (Entry<K, V> entry = table[slot]; entry != null; entry = entry.next) {
+			if (entry.hashCode == hash && entry.key.equals(key)) {
+				// found match
+				entry.value = aggregator.reduce(entry.value, value);
+				return entry.value;
+			}
+		}
+
+		// no match, insert a new value
+		insertNewEntry(hash, key, value, slot);
+		// return the original value
+		return value;
+	}
+
+	/**
+	 * Looks up the value mapped under the given key. Returns null if no value is mapped under this key.
+	 * 
+	 * @param key The key to look up.
+	 * @return The value associated with the key, or null, if no value is found for the key.
+	 * 
+	 * @throws java.lang.NullPointerException Thrown, if the key is null.
+	 */
+	public V get(K key) {
+		final int hash = hash(key);
+		final int slot = indexOf(hash);
+		
+		// search the chain from the slot
+		for (Entry<K, V> entry = table[slot]; entry != null; entry = entry.next) {
+			if (entry.hashCode == hash && entry.key.equals(key)) {
+				return entry.value;
+			}
+		}
+		
+		// not found
+		return null;
+	}
+
+	private void insertNewEntry(int hashCode, K key, V value, int position) {
+		Entry<K,V> e = table[position];
+		table[position] = new Entry<>(key, value, hashCode, e);
+		numElements++;
+
+		// rehash if necessary
+		if (numElements > rehashThreshold) {
+			growTable();
+		}
+	}
+	
+	private int indexOf(int hashCode) {
+		return (hashCode >> shift) & (table.length - 1);
+	}
+
+	/**
+	 * Creates an iterator over the entries of this map.
+	 * 
+	 * @return An iterator over the entries of this map.
+	 */
+	@Override
+	public Iterator<Entry<K, V>> iterator() {
+		return new Iterator<Entry<K, V>>() {
+			
+			private final Entry<K, V>[] tab = KeyMap.this.table;
+			
+			private Entry<K, V> nextEntry;
+			
+			private int nextPos = 0;
+			
+			@Override
+			public boolean hasNext() {
+				if (nextEntry != null) {
+					return true;
+				}
+				else {
+					while (nextPos < tab.length) {
+						Entry<K, V> e = tab[nextPos++];
+						if (e != null) {
+							nextEntry = e;
+							return true;
+						}
+					}
+					return false;
+				}
+			}
+
+			@Override
+			public Entry<K, V> next() {
+				if (nextEntry != null || hasNext()) {
+					Entry<K, V> e = nextEntry;
+					nextEntry = nextEntry.next;
+					return e;
+				}
+				else {
+					throw new NoSuchElementException();
+				}
+			}
+
+			@Override
+			public void remove() {
+				throw new UnsupportedOperationException();
+			}
+		};
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Properties
+	// ------------------------------------------------------------------------
+	
+	/**
+	 * Gets the number of elements currently in the map.
+	 * @return The number of elements currently in the map.
+	 */
+	public int size() {
+		return numElements;
+	}
+
+	/**
+	 * Checks whether the map is empty.
+	 * @return True, if the map is empty, false otherwise.
+	 */
+	public boolean isEmpty() {
+		return numElements == 0;
+	}
+
+	/**
+	 * Gets the current table capacity, i.e., the number of slots in the hash table, without
+	 * and overflow chaining.
+	 * @return The number of slots in the hash table.
+	 */
+	public int getCurrentTableCapacity() {
+		return table.length;
+	}
+
+	/**
+	 * Gets the base-2 logarithm of the hash table capacity, as returned by
+	 * {@link #getCurrentTableCapacity()}.
+	 * 
+	 * @return The base-2 logarithm of the hash table capacity.
+	 */
+	public int getLog2TableCapacity() {
+		return log2size;
+	}
+	
+	public int getRehashThreshold() {
+		return rehashThreshold;
+	}
+	
+	public int getShift() {
+		return shift;
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+	
+	@SuppressWarnings("unchecked")
+	private Entry<K, V>[] allocateTable(int numElements) {
+		return (Entry<K, V>[]) new Entry<?, ?>[numElements];
+	}
+	
+	private void growTable() {
+		final int newSize = table.length << 1;
+				
+		// only grow if there is still space to grow the table
+		if (newSize > 0) {
+			final Entry<K, V>[] oldTable = this.table;
+			final Entry<K, V>[] newTable = allocateTable(newSize);
+
+			final int newShift = shift - 1;
+			final int newMask = newSize - 1;
+			
+			// go over all slots from the table. since we hash to adjacent positions in
+			// the new hash table, this is actually cache efficient
+			for (Entry<K, V> entry : oldTable) {
+				// traverse the chain for each slot
+				while (entry != null) {
+					final int newPos = (entry.hashCode >> newShift) & newMask;
+					Entry<K, V> nextEntry = entry.next;
+					entry.next = newTable[newPos];
+					newTable[newPos] = entry;
+					entry = nextEntry;
+				}
+			}
+			
+			this.table = newTable;
+			this.shift = newShift;
+			this.rehashThreshold = getRehashThreshold(newSize);
+			this.log2size += 1;
+		}
+	}
+	
+	private static int hash(Object key) {
+		int code = key.hashCode();
+		
+		// we need a strong hash function that generates diverse upper bits
+		// this hash function is more expensive than the "scramble" used by "java.util.HashMap",
+		// but required for this sort of hash table
+		code = (code + 0x7ed55d16) + (code << 12);
+		code = (code ^ 0xc761c23c) ^ (code >>> 19);
+		code = (code + 0x165667b1) + (code << 5);
+		code = (code + 0xd3a2646c) ^ (code << 9);
+		code = (code + 0xfd7046c5) + (code << 3);
+		return (code ^ 0xb55a4f09) ^ (code >>> 16);
+	}
+	
+	private static int getRehashThreshold(int capacity) {
+		// divide before multiply, to avoid overflow
+		return capacity / 4 * 3;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Testing Utilities
+	// ------------------------------------------------------------------------
+
+	/**
+	 * For testing only: Actively counts the number of entries, rather than using the
+	 * counter variable. This method has linear complexity, rather than constant.
+	 * 
+	 * @return The counted number of entries.
+	 */
+	int traverseAndCountElements() {
+		int num = 0;
+		
+		for (Entry<?, ?> entry : table) {
+			while (entry != null) {
+				num++;
+				entry = entry.next;
+			}
+		}
+		
+		return num;
+	}
+
+	/**
+	 * For testing only: Gets the length of the longest overflow chain.
+	 * This method has linear complexity.
+	 * 
+	 * @return The length of the longest overflow chain.
+	 */
+	int getLongestChainLength() {
+		int maxLen = 0;
+
+		for (Entry<?, ?> entry : table) {
+			int thisLen = 0;
+			while (entry != null) {
+				thisLen++;
+				entry = entry.next;
+			}
+			maxLen = Math.max(maxLen, thisLen);
+		}
+
+		return maxLen;
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * An entry in the hash table.
+	 * 
+	 * @param <K> Type of the key.
+	 * @param <V> Type of the value.
+	 */
+	public static final class Entry<K, V> {
+		
+		final K key;
+		final int hashCode;
+		
+		V value;
+		Entry<K, V> next;
+		long touchedTag;
+
+		Entry(K key, V value, int hashCode, Entry<K, V> next) {
+			this.key = key;
+			this.value = value;
+			this.next = next;
+			this.hashCode = hashCode;
+		}
+
+		public K getKey() {
+			return key;
+		}
+
+		public V getValue() {
+			return value;
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Performs a traversal about logical the multi-map that results from the union of the
+	 * given maps. This method does not actually build a union of the map, but traverses the hash maps
+	 * together.
+	 * 
+	 * @param maps The array uf maps whose union should be traversed.
+	 * @param visitor The visitor that is called for each key and all values.
+	 * @param touchedTag A tag that is used to mark elements that have been touched in this specific
+	 *                   traversal. Each successive traversal should supply a larger value for this
+	 *                   tag than the previous one.
+	 * 
+	 * @param <K> The type of the map's key.
+	 * @param <V> The type of the map's value.
+	 */
+	public static <K, V> void traverseMaps(
+					final KeyMap<K, V>[] maps,
+					final TraversalEvaluator<K, V> visitor,
+					final long touchedTag)
+		throws Exception
+	{
+		// we need to work on the maps in descending size
+		Arrays.sort(maps, CapacityDescendingComparator.INSTANCE);
+		
+		final int[] shifts = new int[maps.length];
+		final int[] lowBitsMask = new int[maps.length];
+		final int numSlots = maps[0].table.length;
+		final int numTables = maps.length;
+		
+		// figure out how much each hash table collapses the entries
+		for (int i = 0; i < numTables; i++) {
+			shifts[i] = maps[0].log2size - maps[i].log2size;
+			lowBitsMask[i] = (1 << shifts[i]) - 1;
+		}
+		
+		// go over all slots (based on the largest hash table)
+		for (int pos = 0; pos < numSlots; pos++) {
+			
+			// for each slot, go over all tables, until the table does not have that slot any more
+			// for tables where multiple slots collapse into one, we visit that one when we process the
+			// latest of all slots that collapse to that one
+			int mask;
+			for (int rootTable = 0;
+					rootTable < numTables && ((mask = lowBitsMask[rootTable]) & pos) == mask;
+					rootTable++)
+			{
+				// use that table to gather keys and start collecting keys from the following tables
+				// go over all entries of that slot in the table
+				Entry<K, V> entry = maps[rootTable].table[pos >> shifts[rootTable]];
+				while (entry != null) {
+					// take only entries that have not been collected as part of other tables
+					if (entry.touchedTag < touchedTag) {
+						entry.touchedTag = touchedTag;
+						
+						final K key = entry.key;
+						final int hashCode = entry.hashCode;
+						visitor.startNewKey(key);
+						visitor.nextValue(entry.value);
+						
+						addEntriesFromChain(entry.next, visitor, key, touchedTag, hashCode);
+						
+						// go over the other hash tables and collect their entries for the key
+						for (int followupTable = rootTable + 1; followupTable < numTables; followupTable++) {
+							Entry<K, V> followupEntry = maps[followupTable].table[pos >> shifts[followupTable]];
+							if (followupEntry != null) {
+								addEntriesFromChain(followupEntry, visitor, key, touchedTag, hashCode);
+							}
+						}
+
+						visitor.keyDone();
+					}
+					
+					entry = entry.next;
+				}
+			}
+		}
+	}
+	
+	private static <K, V> void addEntriesFromChain(
+			Entry<K, V> entry,
+			TraversalEvaluator<K, V> visitor,
+			K key,
+			long touchedTag,
+			int hashCode) throws Exception
+	{
+		while (entry != null) {
+			if (entry.touchedTag < touchedTag && entry.hashCode == hashCode && entry.key.equals(key)) {
+				entry.touchedTag = touchedTag;
+				visitor.nextValue(entry.value);
+			}
+			entry = entry.next;
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	
+	/**
+	 * Comparator that defines a descending order on maps depending on their table capacity
+	 * and number of elements.
+	 */
+	static final class CapacityDescendingComparator implements Comparator<KeyMap<?, ?>> {
+		
+		static final CapacityDescendingComparator INSTANCE = new CapacityDescendingComparator();
+		
+		private CapacityDescendingComparator() {}
+
+
+		@Override
+		public int compare(KeyMap<?, ?> o1, KeyMap<?, ?> o2) {
+			// this sorts descending
+			int cmp = o2.getLog2TableCapacity() - o1.getLog2TableCapacity();
+			if (cmp != 0) {
+				return cmp;
+			}
+			else {
+				return o2.size() - o1.size();
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A factory for lazy/on-demand instantiation of values.
+	 *
+	 * @param <V> The type created by the factory.
+	 */
+	public static interface LazyFactory<V> {
+
+		/**
+		 * The factory method; creates the value.
+		 * @return The value.
+		 */
+		V create();
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A visitor for a traversal over the union of multiple hash maps. The visitor is
+	 * called for each key in the union of the maps and all values associated with that key
+	 * (one per map, but multiple across maps). 
+	 * 
+	 * @param <K> The type of the key.
+	 * @param <V> The type of the value.
+	 */
+	public static interface TraversalEvaluator<K, V> {
+
+		/**
+		 * Called whenever the traversal starts with a new key.
+		 * 
+		 * @param key The key traversed.
+		 * @throws Exception Method forwards all exceptions.
+		 */
+		void startNewKey(K key) throws Exception;
+
+		/**
+		 * Called for each value found for the current key.
+		 * 
+		 * @param value The next value.
+		 * @throws Exception Method forwards all exceptions.
+		 */
+		void nextValue(V value) throws Exception;
+
+		/**
+		 * Called when the traversal for the current key is complete.
+		 * 
+		 * @throws Exception Method forwards all exceptions.
+		 */
+		void keyDone() throws Exception;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java
new file mode 100644
index 0000000..b34d0bc
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.windowing.windowpolicy.EventTime;
+import org.apache.flink.streaming.api.windowing.windowpolicy.ProcessingTime;
+import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy;
+
+/**
+ * This class implements the conversion from window policies to concrete operator
+ * implementations.
+ */
+public class PolicyToOperator {
+
+	/**
+	 * Entry point to create an operator for the given window policies and the window function.
+	 */
+	public static <IN, OUT, KEY> OneInputStreamOperator<IN, OUT> createOperatorForPolicies(
+			WindowPolicy window, WindowPolicy slide, Function function, KeySelector<IN, KEY> keySelector)
+	{
+		if (window == null || function == null) {
+			throw new NullPointerException();
+		}
+		
+		// -- case 1: both policies are processing time policies
+		if (window instanceof ProcessingTime && (slide == null || slide instanceof ProcessingTime)) {
+			final long windowLength = ((ProcessingTime) window).toMilliseconds();
+			final long windowSlide = slide == null ? windowLength : ((ProcessingTime) slide).toMilliseconds();
+			
+			if (function instanceof ReduceFunction) {
+				@SuppressWarnings("unchecked")
+				ReduceFunction<IN> reducer = (ReduceFunction<IN>) function;
+
+				@SuppressWarnings("unchecked")
+				OneInputStreamOperator<IN, OUT> op = (OneInputStreamOperator<IN, OUT>)
+						new AggregatingProcessingTimeWindowOperator<KEY, IN>(
+								reducer, keySelector, windowLength, windowSlide);
+				return op;
+			}
+			else if (function instanceof KeyedWindowFunction) {
+				@SuppressWarnings("unchecked")
+				KeyedWindowFunction<IN, OUT, KEY> wf = (KeyedWindowFunction<IN, OUT, KEY>) function;
+
+				return new AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>(
+								wf, keySelector, windowLength, windowSlide);
+			}
+		}
+
+		// -- case 2: both policies are event time policies
+		if (window instanceof EventTime && (slide == null || slide instanceof EventTime)) {
+			// add event time implementation
+		}
+		
+		throw new UnsupportedOperationException("The windowing mechanism does not yet support " + window.toString(slide));
+	}
+	
+	// ------------------------------------------------------------------------
+	
+	/** Don't instantiate */
+	private PolicyToOperator() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/package-info.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/package-info.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/package-info.java
new file mode 100644
index 0000000..55749a1
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains the operators that implement the various window operations
+ * on data streams. 
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;

http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractAlignedProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractAlignedProcessingTimeWindowOperator.java
deleted file mode 100644
index 2e926bc..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractAlignedProcessingTimeWindowOperator.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windows;
-
-import org.apache.commons.math3.util.ArithmeticUtils;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.util.MathUtils;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-
-public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT> extends AbstractStreamOperator<OUT> 
-		implements OneInputStreamOperator<IN, OUT>, Triggerable {
-	
-	private static final long serialVersionUID = 3245500864882459867L;
-	
-	private static final long MIN_SLIDE_TIME = 50;
-	
-	// ----- fields for operator parametrization -----
-	
-	private final Function function;
-	private final KeySelector<IN, KEY> keySelector;
-	
-	private final long windowSize;
-	private final long windowSlide;
-	private final long paneSize;
-	private final int numPanesPerWindow;
-	
-	// ----- fields for operator functionality -----
-	
-	private transient AbstractKeyedTimePanes<IN, KEY, ?, OUT> panes;
-	
-	private transient TimestampedCollector<OUT> out;
-	
-	private transient long nextEvaluationTime;
-	private transient long nextSlideTime;
-	
-	protected AbstractAlignedProcessingTimeWindowOperator(
-			Function function,
-			KeySelector<IN, KEY> keySelector,
-			long windowLength,
-			long windowSlide)
-	{
-		if (function == null || keySelector == null) {
-			throw new NullPointerException();
-		}
-		if (windowLength < MIN_SLIDE_TIME) {
-			throw new IllegalArgumentException("Window length must be at least " + MIN_SLIDE_TIME + " msecs");
-		}
-		if (windowSlide < MIN_SLIDE_TIME) {
-			throw new IllegalArgumentException("Window slide must be at least " + MIN_SLIDE_TIME + " msecs");
-		}
-		if (windowLength < windowSlide) {
-			throw new IllegalArgumentException("The window size must be larger than the window slide");
-		}
-		
-		final long paneSlide = ArithmeticUtils.gcd(windowLength, windowSlide);
-		if (paneSlide < MIN_SLIDE_TIME) {
-			throw new IllegalArgumentException(String.format(
-					"Cannot compute window of size %d msecs sliding by %d msecs. " +
-							"The unit of grouping is too small: %d msecs", windowLength, windowSlide, paneSlide));
-		}
-		
-		this.function = function;
-		this.keySelector = keySelector;
-		this.windowSize = windowLength;
-		this.windowSlide = windowSlide;
-		this.paneSize = paneSlide;
-		this.numPanesPerWindow = MathUtils.checkedDownCast(windowLength / paneSlide);
-	}
-	
-	
-	protected abstract AbstractKeyedTimePanes<IN, KEY, ?, OUT> createPanes(
-			KeySelector<IN, KEY> keySelector, Function function);
-
-	// ------------------------------------------------------------------------
-	//  startup and shutdown
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		out = new TimestampedCollector<>(output);
-		
-		// create the panes that gather the elements per slide
-		panes = createPanes(keySelector, function);
-		
-		// decide when to first compute the window and when to slide it
-		// the values should align with the start of time (that is, the UNIX epoch, not the big bang)
-		final long now = System.currentTimeMillis();
-		nextEvaluationTime = now + windowSlide - (now % windowSlide);
-		nextSlideTime = now + paneSize - (now % paneSize);
-		
-		getRuntimeContext().registerTimer(Math.min(nextEvaluationTime, nextSlideTime), this);
-	}
-
-	@Override
-	public void close() throws Exception {
-		final long finalWindowTimestamp = nextEvaluationTime;
-
-		// early stop the triggering thread, so it does not attempt to return any more data
-		stopTriggers();
-
-		// emit the remaining data
-		computeWindow(finalWindowTimestamp);
-	}
-
-	@Override
-	public void dispose() {
-		// acquire the lock during shutdown, to prevent trigger calls at the same time
-		// fail-safe stop of the triggering thread (in case of an error)
-		stopTriggers();
-
-		// release the panes
-		panes.dispose();
-	}
-	
-	private void stopTriggers() {
-		// reset the action timestamps. this makes sure any pending triggers will not evaluate
-		nextEvaluationTime = -1L;
-		nextSlideTime = -1L;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Receiving elements and triggers
-	// ------------------------------------------------------------------------
-	
-	@Override
-	public void processElement(StreamRecord<IN> element) throws Exception {
-		panes.addElementToLatestPane(element.getValue());
-	}
-
-	@Override
-	public void processWatermark(Watermark mark) {
-		// this operator does not react to watermarks
-	}
-
-	@Override
-	public void trigger(long timestamp) throws Exception {
-		// first we check if we actually trigger the window function
-		if (timestamp == nextEvaluationTime) {
-			// compute and output the results
-			computeWindow(timestamp);
-
-			nextEvaluationTime += windowSlide;
-		}
-
-		// check if we slide the panes by one. this may happen in addition to the
-		// window computation, or just by itself
-		if (timestamp == nextSlideTime) {
-			panes.slidePanes(numPanesPerWindow);
-			nextSlideTime += paneSize;
-		}
-
-		long nextTriggerTime = Math.min(nextEvaluationTime, nextSlideTime);
-		getRuntimeContext().registerTimer(nextTriggerTime, this);
-	}
-	
-	private void computeWindow(long timestamp) throws Exception {
-		out.setTimestamp(timestamp);
-		panes.truncatePanes(numPanesPerWindow);
-		panes.evaluateWindow(out);
-	}
-
-	// ------------------------------------------------------------------------
-	//  Property access (for testing)
-	// ------------------------------------------------------------------------
-
-	public long getWindowSize() {
-		return windowSize;
-	}
-
-	public long getWindowSlide() {
-		return windowSlide;
-	}
-
-	public long getPaneSize() {
-		return paneSize;
-	}
-	
-	public int getNumPanesPerWindow() {
-		return numPanesPerWindow;
-	}
-
-	public long getNextEvaluationTime() {
-		return nextEvaluationTime;
-	}
-
-	public long getNextSlideTime() {
-		return nextSlideTime;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-	
-	@Override
-	public String toString() {
-		return "Window (processing time) (length=" + windowSize + ", slide=" + windowSlide + ')';
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractKeyedTimePanes.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractKeyedTimePanes.java
deleted file mode 100644
index a49b2e6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AbstractKeyedTimePanes.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windows;
-
-import org.apache.flink.util.Collector;
-
-import java.util.ArrayDeque;
-
-
-public abstract class AbstractKeyedTimePanes<Type, Key, Aggregate, Result> {
-	
-	protected KeyMap<Key, Aggregate> latestPane = new KeyMap<>();
-
-	protected final ArrayDeque<KeyMap<Key, Aggregate>> previousPanes = new ArrayDeque<>();
-
-	// ------------------------------------------------------------------------
-
-	public abstract void addElementToLatestPane(Type element) throws Exception;
-
-	public abstract void evaluateWindow(Collector<Result> out) throws Exception;
-	
-	
-	public void dispose() {
-		// since all is heap data, there is no need to clean up anything
-		latestPane = null;
-		previousPanes.clear();
-	}
-	
-	
-	public void slidePanes(int panesToKeep) {
-		if (panesToKeep > 1) {
-			// the current pane becomes the latest previous pane
-			previousPanes.addLast(latestPane);
-
-			// truncate the history
-			while (previousPanes.size() >= panesToKeep) {
-				previousPanes.removeFirst();
-			}
-		}
-
-		// we need a new latest pane
-		latestPane = new KeyMap<>();
-	}
-	
-	public void truncatePanes(int numToRetain) {
-		while (previousPanes.size() >= numToRetain) {
-			previousPanes.removeFirst();
-		}
-	}
-	
-	protected void traverseAllPanes(KeyMap.TraversalEvaluator<Key, Aggregate> traversal, long traversalPass) throws Exception{
-		// gather all panes in an array (faster iterations)
-		@SuppressWarnings({"unchecked", "rawtypes"})
-		KeyMap<Key, Aggregate>[] panes = previousPanes.toArray(new KeyMap[previousPanes.size() + 1]);
-		panes[panes.length - 1] = latestPane;
-
-		// let the maps make a coordinated traversal and evaluate the window function per contained key
-		KeyMap.traverseMaps(panes, traversal, traversalPass);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingKeyedTimePanes.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingKeyedTimePanes.java
deleted file mode 100644
index 1212123..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingKeyedTimePanes.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windows;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.runtime.util.UnionIterator;
-import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction;
-import org.apache.flink.util.Collector;
-
-import java.util.ArrayList;
-
-
-public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyedTimePanes<Type, Key, ArrayList<Type>, Result> {
-	
-	private final KeySelector<Type, Key> keySelector;
-
-	private final KeyMap.LazyFactory<ArrayList<Type>> listFactory = getListFactory();
-
-	private final KeyedWindowFunction<Type, Result, Key> function;
-	
-	private long evaluationPass;
-
-	// ------------------------------------------------------------------------
-	
-	public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, KeyedWindowFunction<Type, Result, Key> function) {
-		this.keySelector = keySelector;
-		this.function = function;
-	}
-
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void addElementToLatestPane(Type element) throws Exception {
-		Key k = keySelector.getKey(element);
-		ArrayList<Type> elements = latestPane.putIfAbsent(k, listFactory);
-		elements.add(element);
-	}
-
-	@Override
-	public void evaluateWindow(Collector<Result> out) throws Exception {
-		if (previousPanes.isEmpty()) {
-			// optimized path for single pane case (tumbling window)
-			for (KeyMap.Entry<Key, ArrayList<Type>> entry : latestPane) {
-				function.evaluate(entry.getKey(), entry.getValue(), out);
-			}
-		}
-		else {
-			// general code path for multi-pane case
-			WindowFunctionTraversal<Key, Type, Result> evaluator = new WindowFunctionTraversal<>(function, out);
-			traverseAllPanes(evaluator, evaluationPass);
-		}
-		
-		evaluationPass++;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Running a window function in a map traversal
-	// ------------------------------------------------------------------------
-	
-	static final class WindowFunctionTraversal<Key, Type, Result> implements KeyMap.TraversalEvaluator<Key, ArrayList<Type>> {
-
-		private final KeyedWindowFunction<Type, Result, Key> function;
-		
-		private final UnionIterator<Type> unionIterator;
-		
-		private final Collector<Result> out;
-		
-		private Key currentKey;
-
-		WindowFunctionTraversal(KeyedWindowFunction<Type, Result, Key> function, Collector<Result> out) {
-			this.function = function;
-			this.out = out;
-			this.unionIterator = new UnionIterator<>();
-		}
-
-
-		@Override
-		public void startNewKey(Key key) {
-			unionIterator.clear();
-			currentKey = key;
-		}
-
-		@Override
-		public void nextValue(ArrayList<Type> value) {
-			unionIterator.addList(value);
-		}
-
-		@Override
-		public void keyDone() throws Exception {
-			function.evaluate(currentKey, unionIterator, out);
-		}
-	}
-	
-	// ------------------------------------------------------------------------
-	//  Lazy factory for lists (put if absent)
-	// ------------------------------------------------------------------------
-	
-	@SuppressWarnings("unchecked")
-	private static <V> KeyMap.LazyFactory<ArrayList<V>> getListFactory() {
-		return (KeyMap.LazyFactory<ArrayList<V>>) LIST_FACTORY;
-	}
-
-	private static final KeyMap.LazyFactory<?> LIST_FACTORY = new KeyMap.LazyFactory<ArrayList<?>>() {
-
-		@Override
-		public ArrayList<?> create() {
-			return new ArrayList<>(4);
-		}
-	};
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingProcessingTimeWindowOperator.java
deleted file mode 100644
index fb9d163..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingProcessingTimeWindowOperator.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windows;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction;
-
-
-public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT> 
-		extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT>  {
-
-	private static final long serialVersionUID = 7305948082830843475L;
-
-	
-	public AccumulatingProcessingTimeWindowOperator(
-			KeyedWindowFunction<IN, OUT, KEY> function,
-			KeySelector<IN, KEY> keySelector,
-			long windowLength,
-			long windowSlide)
-	{
-		super(function, keySelector, windowLength, windowSlide);
-	}
-
-	@Override
-	protected AccumulatingKeyedTimePanes<IN, KEY, OUT> createPanes(KeySelector<IN, KEY> keySelector, Function function) {
-		@SuppressWarnings("unchecked")
-		KeyedWindowFunction<IN, OUT, KEY> windowFunction = (KeyedWindowFunction<IN, OUT, KEY>) function;
-		
-		return new AccumulatingKeyedTimePanes<>(keySelector, windowFunction);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingKeyedTimePanes.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingKeyedTimePanes.java
deleted file mode 100644
index 730c984..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingKeyedTimePanes.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windows;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.util.Collector;
-
-
-public class AggregatingKeyedTimePanes<Type, Key> extends AbstractKeyedTimePanes<Type, Key, Type, Type> {
-	
-	private final KeySelector<Type, Key> keySelector;
-	
-	private final ReduceFunction<Type> reducer;
-	
-	private long evaluationPass;
-
-	// ------------------------------------------------------------------------
-	
-	public AggregatingKeyedTimePanes(KeySelector<Type, Key> keySelector, ReduceFunction<Type> reducer) {
-		this.keySelector = keySelector;
-		this.reducer = reducer;
-	}
-
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void addElementToLatestPane(Type element) throws Exception {
-		Key k = keySelector.getKey(element);
-		latestPane.putOrAggregate(k, element, reducer);
-	}
-
-	@Override
-	public void evaluateWindow(Collector<Type> out) throws Exception {
-		if (previousPanes.isEmpty()) {
-			// optimized path for single pane case
-			for (KeyMap.Entry<Key, Type> entry : latestPane) {
-				out.collect(entry.getValue());
-			}
-		}
-		else {
-			// general code path for multi-pane case
-			AggregatingTraversal<Key, Type> evaluator = new AggregatingTraversal<>(reducer, out);
-			traverseAllPanes(evaluator, evaluationPass);
-		}
-		
-		evaluationPass++;
-	}
-
-	// ------------------------------------------------------------------------
-	//  The maps traversal that performs the final aggregation
-	// ------------------------------------------------------------------------
-	
-	static final class AggregatingTraversal<Key, Type> implements KeyMap.TraversalEvaluator<Key, Type> {
-
-		private final ReduceFunction<Type> function;
-		
-		private final Collector<Type> out;
-		
-		private Type currentValue;
-
-		AggregatingTraversal(ReduceFunction<Type> function, Collector<Type> out) {
-			this.function = function;
-			this.out = out;
-		}
-
-		@Override
-		public void startNewKey(Key key) {
-			currentValue = null;
-		}
-
-		@Override
-		public void nextValue(Type value) throws Exception {
-			if (currentValue != null) {
-				currentValue = function.reduce(currentValue, value);
-			}
-			else {
-				currentValue = value;
-			}
-		}
-
-		@Override
-		public void keyDone() throws Exception {
-			out.collect(currentValue);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingProcessingTimeWindowOperator.java
deleted file mode 100644
index 8bed749..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingProcessingTimeWindowOperator.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windows;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-
-public class AggregatingProcessingTimeWindowOperator<KEY, IN> 
-		extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, IN>  {
-
-	private static final long serialVersionUID = 7305948082830843475L;
-
-	
-	public AggregatingProcessingTimeWindowOperator(
-			ReduceFunction<IN> function,
-			KeySelector<IN, KEY> keySelector,
-			long windowLength,
-			long windowSlide)
-	{
-		super(function, keySelector, windowLength, windowSlide);
-	}
-
-	@Override
-	protected AggregatingKeyedTimePanes<IN, KEY> createPanes(KeySelector<IN, KEY> keySelector, Function function) {
-		@SuppressWarnings("unchecked")
-		ReduceFunction<IN> windowFunction = (ReduceFunction<IN>) function;
-		
-		return new AggregatingKeyedTimePanes<IN, KEY>(keySelector, windowFunction);
-	}
-}