You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:48 UTC

[32/51] [partial] flink git commit: [FLINK-2877] Move Streaming API out of Staging package

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/StreamingOperatorMetrics.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/StreamingOperatorMetrics.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/StreamingOperatorMetrics.java
deleted file mode 100644
index 8429889..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/StreamingOperatorMetrics.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators;
-
-public class StreamingOperatorMetrics {
-	
-	
-	public void incrementLateElementDiscarded() {
-		
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java
deleted file mode 100644
index 50d1cb6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators;
-
-/**
- * This interface must be implemented by objects that are triggered by the timer service available
- * to stream operators in {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment}.
- */
-public interface Triggerable {
-
-	/**
-	 * This method is invoked with the timestamp for which the trigger was scheduled.
-	 * <p>
-	 * If the triggering is delayed for whatever reason (trigger timer was blocked, JVM stalled due
-	 * to a garbage collection), the timestamp supplied to this function will still be the original
-	 * timestamp for which the trigger was scheduled.
-	 * 
-	 * @param timestamp The timestamp for which the trigger event was scheduled.
-	 */
-	void trigger(long timestamp) throws Exception ;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/package-info.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/package-info.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/package-info.java
deleted file mode 100644
index 5fe6873..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * This package contains the operators that perform the stream transformations.
- * One or more operators are bundled into a "chain" and executed in a stream task. 
- */
-package org.apache.flink.streaming.runtime.operators;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
deleted file mode 100644
index 3165f88..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
+++ /dev/null
@@ -1,335 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.commons.math3.util.ArithmeticUtils;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.util.MathUtils;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
-
-import static java.util.Objects.requireNonNull;
-
-public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, STATE, F extends Function> 
-		extends AbstractUdfStreamOperator<OUT, F> 
-		implements OneInputStreamOperator<IN, OUT>, Triggerable {
-	
-	private static final long serialVersionUID = 3245500864882459867L;
-	
-	private static final long MIN_SLIDE_TIME = 50;
-	
-	// ----- fields for operator parametrization -----
-	
-	private final Function function;
-	private final KeySelector<IN, KEY> keySelector;
-	
-	private final TypeSerializer<KEY> keySerializer;
-	private final TypeSerializer<STATE> stateTypeSerializer;
-	
-	private final long windowSize;
-	private final long windowSlide;
-	private final long paneSize;
-	private final int numPanesPerWindow;
-	
-	// ----- fields for operator functionality -----
-	
-	private transient AbstractKeyedTimePanes<IN, KEY, STATE, OUT> panes;
-	
-	private transient TimestampedCollector<OUT> out;
-	
-	private transient RestoredState<IN, KEY, STATE, OUT> restoredState;
-	
-	private transient long nextEvaluationTime;
-	private transient long nextSlideTime;
-	
-	protected AbstractAlignedProcessingTimeWindowOperator(
-			F function,
-			KeySelector<IN, KEY> keySelector,
-			TypeSerializer<KEY> keySerializer,
-			TypeSerializer<STATE> stateTypeSerializer,
-			long windowLength,
-			long windowSlide)
-	{
-		super(function);
-		
-		if (windowLength < MIN_SLIDE_TIME) {
-			throw new IllegalArgumentException("Window length must be at least " + MIN_SLIDE_TIME + " msecs");
-		}
-		if (windowSlide < MIN_SLIDE_TIME) {
-			throw new IllegalArgumentException("Window slide must be at least " + MIN_SLIDE_TIME + " msecs");
-		}
-		if (windowLength < windowSlide) {
-			throw new IllegalArgumentException("The window size must be larger than the window slide");
-		}
-		
-		final long paneSlide = ArithmeticUtils.gcd(windowLength, windowSlide);
-		if (paneSlide < MIN_SLIDE_TIME) {
-			throw new IllegalArgumentException(String.format(
-					"Cannot compute window of size %d msecs sliding by %d msecs. " +
-							"The unit of grouping is too small: %d msecs", windowLength, windowSlide, paneSlide));
-		}
-		
-		this.function = requireNonNull(function);
-		this.keySelector = requireNonNull(keySelector);
-		this.keySerializer = requireNonNull(keySerializer);
-		this.stateTypeSerializer = requireNonNull(stateTypeSerializer);
-		this.windowSize = windowLength;
-		this.windowSlide = windowSlide;
-		this.paneSize = paneSlide;
-		this.numPanesPerWindow = MathUtils.checkedDownCast(windowLength / paneSlide);
-	}
-	
-	
-	protected abstract AbstractKeyedTimePanes<IN, KEY, STATE, OUT> createPanes(
-			KeySelector<IN, KEY> keySelector, Function function);
-
-	// ------------------------------------------------------------------------
-	//  startup and shutdown
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void open() throws Exception {
-		super.open();
-
-		out = new TimestampedCollector<>(output);
-		
-		// decide when to first compute the window and when to slide it
-		// the values should align with the start of time (that is, the UNIX epoch, not the big bang)
-		final long now = System.currentTimeMillis();
-		nextEvaluationTime = now + windowSlide - (now % windowSlide);
-		nextSlideTime = now + paneSize - (now % paneSize);
-
-		final long firstTriggerTime = Math.min(nextEvaluationTime, nextSlideTime);
-		
-		// check if we restored state and if we need to fire some windows based on that restored state
-		if (restoredState == null) {
-			// initial empty state: create empty panes that gather the elements per slide
-			panes = createPanes(keySelector, function);
-		}
-		else {
-			// restored state
-			panes = restoredState.panes;
-			
-			long nextPastEvaluationTime = restoredState.nextEvaluationTime;
-			long nextPastSlideTime = restoredState.nextSlideTime;
-			long nextPastTriggerTime = Math.min(nextPastEvaluationTime, nextPastSlideTime);
-			int numPanesRestored = panes.getNumPanes();
-			
-			// fire windows from the past as long as there are more panes with data and as long
-			// as the missed trigger times have not caught up with the presence
-			while (numPanesRestored > 0 && nextPastTriggerTime < firstTriggerTime) {
-				// evaluate the window from the past
-				if (nextPastTriggerTime == nextPastEvaluationTime) {
-					computeWindow(nextPastTriggerTime);
-					nextPastEvaluationTime += windowSlide;
-				}
-				
-				// evaluate slide from the past
-				if (nextPastTriggerTime == nextPastSlideTime) {
-					panes.slidePanes(numPanesPerWindow);
-					numPanesRestored--;
-					nextPastSlideTime += paneSize;
-				}
-
-				nextPastTriggerTime = Math.min(nextPastEvaluationTime, nextPastSlideTime);
-			}
-		}
-		
-		// make sure the first window happens
-		registerTimer(firstTriggerTime, this);
-	}
-
-	@Override
-	public void close() throws Exception {
-		super.close();
-		
-		final long finalWindowTimestamp = nextEvaluationTime;
-
-		// early stop the triggering thread, so it does not attempt to return any more data
-		stopTriggers();
-
-		// emit the remaining data
-		computeWindow(finalWindowTimestamp);
-	}
-
-	@Override
-	public void dispose() {
-		super.dispose();
-		
-		// acquire the lock during shutdown, to prevent trigger calls at the same time
-		// fail-safe stop of the triggering thread (in case of an error)
-		stopTriggers();
-
-		// release the panes. panes may still be null if dispose is called
-		// after open() failed
-		if (panes != null) {
-			panes.dispose();
-		}
-	}
-	
-	private void stopTriggers() {
-		// reset the action timestamps. this makes sure any pending triggers will not evaluate
-		nextEvaluationTime = -1L;
-		nextSlideTime = -1L;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Receiving elements and triggers
-	// ------------------------------------------------------------------------
-	
-	@Override
-	public void processElement(StreamRecord<IN> element) throws Exception {
-		panes.addElementToLatestPane(element.getValue());
-	}
-
-	@Override
-	public void processWatermark(Watermark mark) {
-		// this operator does not react to watermarks
-	}
-
-	@Override
-	public void trigger(long timestamp) throws Exception {
-		// first we check if we actually trigger the window function
-		if (timestamp == nextEvaluationTime) {
-			// compute and output the results
-			computeWindow(timestamp);
-
-			nextEvaluationTime += windowSlide;
-		}
-
-		// check if we slide the panes by one. this may happen in addition to the
-		// window computation, or just by itself
-		if (timestamp == nextSlideTime) {
-			panes.slidePanes(numPanesPerWindow);
-			nextSlideTime += paneSize;
-		}
-
-		long nextTriggerTime = Math.min(nextEvaluationTime, nextSlideTime);
-		registerTimer(nextTriggerTime, this);
-	}
-	
-	private void computeWindow(long timestamp) throws Exception {
-		out.setTimestamp(timestamp);
-		panes.truncatePanes(numPanesPerWindow);
-		panes.evaluateWindow(out, new TimeWindow(timestamp, timestamp + windowSize));
-	}
-
-	// ------------------------------------------------------------------------
-	//  Checkpointing
-	// ------------------------------------------------------------------------
-
-	@Override
-	public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
-		StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
-		
-		// we write the panes with the key/value maps into the stream, as well as when this state
-		// should have triggered and slided
-		StateBackend.CheckpointStateOutputView out = 
-				getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
-
-		out.writeLong(nextEvaluationTime);
-		out.writeLong(nextSlideTime);
-		panes.writeToOutput(out, keySerializer, stateTypeSerializer);
-		
-		taskState.setOperatorState(out.closeAndGetHandle());
-		return taskState;
-	}
-
-	@Override
-	public void restoreState(StreamTaskState taskState) throws Exception {
-		super.restoreState(taskState);
-
-		@SuppressWarnings("unchecked")
-		StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState();
-		DataInputView in = inputState.getState(getUserCodeClassloader());
-		
-		final long nextEvaluationTime = in.readLong();
-		final long nextSlideTime = in.readLong();
-
-		AbstractKeyedTimePanes<IN, KEY, STATE, OUT> panes = createPanes(keySelector, function);
-		panes.readFromInput(in, keySerializer, stateTypeSerializer);
-		
-		restoredState = new RestoredState<>(panes, nextEvaluationTime, nextSlideTime);
-	}
-
-	// ------------------------------------------------------------------------
-	//  Property access (for testing)
-	// ------------------------------------------------------------------------
-
-	public long getWindowSize() {
-		return windowSize;
-	}
-
-	public long getWindowSlide() {
-		return windowSlide;
-	}
-
-	public long getPaneSize() {
-		return paneSize;
-	}
-	
-	public int getNumPanesPerWindow() {
-		return numPanesPerWindow;
-	}
-
-	public long getNextEvaluationTime() {
-		return nextEvaluationTime;
-	}
-
-	public long getNextSlideTime() {
-		return nextSlideTime;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-	
-	@Override
-	public String toString() {
-		return "Window (processing time) (length=" + windowSize + ", slide=" + windowSlide + ')';
-	}
-
-	// ------------------------------------------------------------------------
-	// ------------------------------------------------------------------------
-	
-	private static final class RestoredState<IN, KEY, STATE, OUT> {
-
-		final AbstractKeyedTimePanes<IN, KEY, STATE, OUT> panes;
-		final long nextEvaluationTime;
-		final long nextSlideTime;
-
-		RestoredState(AbstractKeyedTimePanes<IN, KEY, STATE, OUT> panes, long nextEvaluationTime, long nextSlideTime) {
-			this.panes = panes;
-			this.nextEvaluationTime = nextEvaluationTime;
-			this.nextSlideTime = nextSlideTime;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
deleted file mode 100644
index d1cea20..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.util.Collector;
-
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.Iterator;
-
-/**
- * Base class for a multiple key/value maps organized in panes.
- */
-public abstract class AbstractKeyedTimePanes<Type, Key, Aggregate, Result> {
-	
-	private static final int BEGIN_OF_STATE_MAGIC_NUMBER = 0x0FF1CE42;
-
-	private static final int BEGIN_OF_PANE_MAGIC_NUMBER = 0xBADF00D5;
-	
-	/** The latest time pane */
-	protected KeyMap<Key, Aggregate> latestPane = new KeyMap<>();
-
-	/** The previous time panes, ordered by time (early to late) */
-	protected final ArrayDeque<KeyMap<Key, Aggregate>> previousPanes = new ArrayDeque<>();
-
-	// ------------------------------------------------------------------------
-
-	public abstract void addElementToLatestPane(Type element) throws Exception;
-
-	public abstract void evaluateWindow(Collector<Result> out, TimeWindow window) throws Exception;
-	
-	
-	public void dispose() {
-		// since all is heap data, there is no need to clean up anything
-		latestPane = null;
-		previousPanes.clear();
-	}
-	
-	public int getNumPanes() {
-		return previousPanes.size() + 1;
-	}
-	
-	
-	public void slidePanes(int panesToKeep) {
-		if (panesToKeep > 1) {
-			// the current pane becomes the latest previous pane
-			previousPanes.addLast(latestPane);
-
-			// truncate the history
-			while (previousPanes.size() >= panesToKeep) {
-				previousPanes.removeFirst();
-			}
-		}
-
-		// we need a new latest pane
-		latestPane = new KeyMap<>();
-	}
-	
-	public void truncatePanes(int numToRetain) {
-		while (previousPanes.size() >= numToRetain) {
-			previousPanes.removeFirst();
-		}
-	}
-	
-	protected void traverseAllPanes(KeyMap.TraversalEvaluator<Key, Aggregate> traversal, long traversalPass) throws Exception{
-		// gather all panes in an array (faster iterations)
-		@SuppressWarnings({"unchecked", "rawtypes"})
-		KeyMap<Key, Aggregate>[] panes = previousPanes.toArray(new KeyMap[previousPanes.size() + 1]);
-		panes[panes.length - 1] = latestPane;
-
-		// let the maps make a coordinated traversal and evaluate the window function per contained key
-		KeyMap.traverseMaps(panes, traversal, traversalPass);
-	}
-	
-	// ------------------------------------------------------------------------
-	//  Serialization and de-serialization
-	// ------------------------------------------------------------------------
-
-	public void writeToOutput(
-			final DataOutputView output,
-			final TypeSerializer<Key> keySerializer,
-			final TypeSerializer<Aggregate> aggSerializer) throws IOException
-	{
-		output.writeInt(BEGIN_OF_STATE_MAGIC_NUMBER);
-		
-		int numPanes = getNumPanes();
-		output.writeInt(numPanes);
-		
-		// write from the past
-		Iterator<KeyMap<Key, Aggregate>> previous = previousPanes.iterator();
-		for (int paneNum = 0; paneNum < numPanes; paneNum++) {
-			output.writeInt(BEGIN_OF_PANE_MAGIC_NUMBER);
-			KeyMap<Key, Aggregate> pane = (paneNum == numPanes - 1) ? latestPane : previous.next();
-			
-			output.writeInt(pane.size());
-			for (KeyMap.Entry<Key, Aggregate> entry : pane) {
-				keySerializer.serialize(entry.getKey(), output);
-				aggSerializer.serialize(entry.getValue(), output);
-			}
-		}
-	}
-	
-	public void readFromInput(
-			final DataInputView input,
-			final TypeSerializer<Key> keySerializer,
-			final TypeSerializer<Aggregate> aggSerializer) throws IOException
-	{
-		validateMagicNumber(BEGIN_OF_STATE_MAGIC_NUMBER, input.readInt());
-		int numPanes = input.readInt();
-		
-		// read from the past towards the presence
-		while (numPanes > 0) {
-			validateMagicNumber(BEGIN_OF_PANE_MAGIC_NUMBER, input.readInt());
-			KeyMap<Key, Aggregate> pane = (numPanes == 1) ? latestPane : new KeyMap<Key, Aggregate>();
-			
-			final int numElementsInPane = input.readInt();
-			for (int i = numElementsInPane - 1; i >= 0; i--) {
-				Key k = keySerializer.deserialize(input);
-				Aggregate a = aggSerializer.deserialize(input);
-				pane.put(k, a);
-			}
-			
-			if (numPanes > 1) {
-				previousPanes.addLast(pane);
-			}
-			numPanes--;
-		}
-	}
-	
-	private static void validateMagicNumber(int expected, int found) throws IOException {
-		if (expected != found) {
-			throw new IOException("Corrupt state stream - wrong magic number. " +
-				"Expected '" + Integer.toHexString(expected) +
-				"', found '" + Integer.toHexString(found) + '\'');
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
deleted file mode 100644
index c854e6c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.runtime.util.UnionIterator;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.util.Collector;
-
-import java.util.ArrayList;
-
-
-public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyedTimePanes<Type, Key, ArrayList<Type>, Result> {
-	
-	private final KeySelector<Type, Key> keySelector;
-
-	private final KeyMap.LazyFactory<ArrayList<Type>> listFactory = getListFactory();
-
-	private final WindowFunction<Type, Result, Key, Window> function;
-
-	/**
-	 * IMPORTANT: This value needs to start at one, so it is fresher than the value that new entries have (zero) */
-	private long evaluationPass = 1L;   
-
-	// ------------------------------------------------------------------------
-	
-	public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, WindowFunction<Type, Result, Key, Window> function) {
-		this.keySelector = keySelector;
-		this.function = function;
-	}
-
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void addElementToLatestPane(Type element) throws Exception {
-		Key k = keySelector.getKey(element);
-		ArrayList<Type> elements = latestPane.putIfAbsent(k, listFactory);
-		elements.add(element);
-	}
-
-	@Override
-	public void evaluateWindow(Collector<Result> out, TimeWindow window) throws Exception {
-		if (previousPanes.isEmpty()) {
-			// optimized path for single pane case (tumbling window)
-			for (KeyMap.Entry<Key, ArrayList<Type>> entry : latestPane) {
-				function.apply(entry.getKey(), window, entry.getValue(), out);
-			}
-		}
-		else {
-			// general code path for multi-pane case
-			WindowFunctionTraversal<Key, Type, Result> evaluator = new WindowFunctionTraversal<>(function, window, out);
-			traverseAllPanes(evaluator, evaluationPass);
-		}
-		
-		evaluationPass++;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Running a window function in a map traversal
-	// ------------------------------------------------------------------------
-	
-	static final class WindowFunctionTraversal<Key, Type, Result> implements KeyMap.TraversalEvaluator<Key, ArrayList<Type>> {
-
-		private final WindowFunction<Type, Result, Key, Window> function;
-		
-		private final UnionIterator<Type> unionIterator;
-		
-		private final Collector<Result> out;
-		
-		private Key currentKey;
-
-		private TimeWindow window;
-
-		WindowFunctionTraversal(WindowFunction<Type, Result, Key, Window> function, TimeWindow window, Collector<Result> out) {
-			this.function = function;
-			this.out = out;
-			this.unionIterator = new UnionIterator<>();
-			this.window = window;
-		}
-
-
-		@Override
-		public void startNewKey(Key key) {
-			unionIterator.clear();
-			currentKey = key;
-		}
-
-		@Override
-		public void nextValue(ArrayList<Type> value) {
-			unionIterator.addList(value);
-		}
-
-		@Override
-		public void keyDone() throws Exception {
-			function.apply(currentKey, window, unionIterator, out);
-		}
-	}
-	
-	// ------------------------------------------------------------------------
-	//  Lazy factory for lists (put if absent)
-	// ------------------------------------------------------------------------
-	
-	@SuppressWarnings("unchecked")
-	private static <V> KeyMap.LazyFactory<ArrayList<V>> getListFactory() {
-		return (KeyMap.LazyFactory<ArrayList<V>>) LIST_FACTORY;
-	}
-
-	private static final KeyMap.LazyFactory<?> LIST_FACTORY = new KeyMap.LazyFactory<ArrayList<?>>() {
-
-		@Override
-		public ArrayList<?> create() {
-			return new ArrayList<>(4);
-		}
-	};
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
deleted file mode 100644
index 7a7d04c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-
-import java.io.IOException;
-import java.util.ArrayList;
-
-
-public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT> 
-		extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, ArrayList<IN>, WindowFunction<IN, OUT, KEY, TimeWindow>> {
-
-	private static final long serialVersionUID = 7305948082830843475L;
-
-	
-	public AccumulatingProcessingTimeWindowOperator(
-			WindowFunction<IN, OUT, KEY, TimeWindow> function,
-			KeySelector<IN, KEY> keySelector,
-			TypeSerializer<KEY> keySerializer,
-			TypeSerializer<IN> valueSerializer,
-			long windowLength,
-			long windowSlide)
-	{
-		super(function, keySelector, keySerializer,
-				new ArrayListSerializer<IN>(valueSerializer), windowLength, windowSlide);
-	}
-
-	@Override
-	protected AccumulatingKeyedTimePanes<IN, KEY, OUT> createPanes(KeySelector<IN, KEY> keySelector, Function function) {
-		@SuppressWarnings("unchecked")
-		WindowFunction<IN, OUT, KEY, Window> windowFunction = (WindowFunction<IN, OUT, KEY, Window>) function;
-		
-		return new AccumulatingKeyedTimePanes<>(keySelector, windowFunction);
-	}
-	
-	// ------------------------------------------------------------------------
-	//  Utility Serializer for Lists of Elements
-	// ------------------------------------------------------------------------
-	
-	@SuppressWarnings("ForLoopReplaceableByForEach")
-	private static final class ArrayListSerializer<T> extends TypeSerializer<ArrayList<T>> {
-
-		private static final long serialVersionUID = 1119562170939152304L;
-		
-		private final TypeSerializer<T> elementSerializer;
-
-		ArrayListSerializer(TypeSerializer<T> elementSerializer) {
-			this.elementSerializer = elementSerializer;
-		}
-
-		@Override
-		public boolean isImmutableType() {
-			return false;
-		}
-
-		@Override
-		public TypeSerializer<ArrayList<T>> duplicate() {
-			TypeSerializer<T> duplicateElement = elementSerializer.duplicate();
-			return duplicateElement == elementSerializer ? this : new ArrayListSerializer<T>(duplicateElement);
-		}
-
-		@Override
-		public ArrayList<T> createInstance() {
-			return new ArrayList<>();
-		}
-
-		@Override
-		public ArrayList<T> copy(ArrayList<T> from) {
-			ArrayList<T> newList = new ArrayList<>(from.size());
-			for (int i = 0; i < from.size(); i++) {
-				newList.add(elementSerializer.copy(from.get(i)));
-			}
-			return newList;
-		}
-
-		@Override
-		public ArrayList<T> copy(ArrayList<T> from, ArrayList<T> reuse) {
-			return copy(from);
-		}
-
-		@Override
-		public int getLength() {
-			return -1; // var length
-		}
-
-		@Override
-		public void serialize(ArrayList<T> list, DataOutputView target) throws IOException {
-			final int size = list.size();
-			target.writeInt(size);
-			for (int i = 0; i < size; i++) {
-				elementSerializer.serialize(list.get(i), target);
-			}
-		}
-
-		@Override
-		public ArrayList<T> deserialize(DataInputView source) throws IOException {
-			final int size = source.readInt();
-			final ArrayList<T> list = new ArrayList<>(size);
-			for (int i = 0; i < size; i++) {
-				list.add(elementSerializer.deserialize(source));
-			}
-			return list;
-		}
-
-		@Override
-		public ArrayList<T> deserialize(ArrayList<T> reuse, DataInputView source) throws IOException {
-			return deserialize(source);
-		}
-
-		@Override
-		public void copy(DataInputView source, DataOutputView target) throws IOException {
-			// copy number of elements
-			final int num = source.readInt();
-			target.writeInt(num);
-			for (int i = 0; i < num; i++) {
-				elementSerializer.copy(source, target);
-			}
-		}
-
-		// --------------------------------------------------------------------
-		
-		@Override
-		public boolean equals(Object obj) {
-			return obj == this || 
-					(obj != null && obj.getClass() == getClass() && 
-						elementSerializer.equals(((ArrayListSerializer<?>) obj).elementSerializer));
-		}
-
-		@Override
-		public boolean canEqual(Object obj) {
-			return true;
-		}
-
-		@Override
-		public int hashCode() {
-			return elementSerializer.hashCode();
-		}
-	} 
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
deleted file mode 100644
index d395b2a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.util.Collector;
-
-
-public class AggregatingKeyedTimePanes<Type, Key> extends AbstractKeyedTimePanes<Type, Key, Type, Type> {
-	
-	private final KeySelector<Type, Key> keySelector;
-	
-	private final ReduceFunction<Type> reducer;
-
-	/**
-	 * IMPORTANT: This value needs to start at one, so it is fresher than the value that new entries have (zero) */
-	private long evaluationPass = 1L;
-
-	// ------------------------------------------------------------------------
-	
-	public AggregatingKeyedTimePanes(KeySelector<Type, Key> keySelector, ReduceFunction<Type> reducer) {
-		this.keySelector = keySelector;
-		this.reducer = reducer;
-	}
-
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void addElementToLatestPane(Type element) throws Exception {
-		Key k = keySelector.getKey(element);
-		latestPane.putOrAggregate(k, element, reducer);
-	}
-
-	@Override
-	public void evaluateWindow(Collector<Type> out, TimeWindow window) throws Exception {
-		if (previousPanes.isEmpty()) {
-			// optimized path for single pane case
-			for (KeyMap.Entry<Key, Type> entry : latestPane) {
-				out.collect(entry.getValue());
-			}
-		}
-		else {
-			// general code path for multi-pane case
-			AggregatingTraversal<Key, Type> evaluator = new AggregatingTraversal<>(reducer, out);
-			traverseAllPanes(evaluator, evaluationPass);
-		}
-		
-		evaluationPass++;
-	}
-
-	// ------------------------------------------------------------------------
-	//  The maps traversal that performs the final aggregation
-	// ------------------------------------------------------------------------
-	
-	static final class AggregatingTraversal<Key, Type> implements KeyMap.TraversalEvaluator<Key, Type> {
-
-		private final ReduceFunction<Type> function;
-		
-		private final Collector<Type> out;
-		
-		private Type currentValue;
-
-		AggregatingTraversal(ReduceFunction<Type> function, Collector<Type> out) {
-			this.function = function;
-			this.out = out;
-		}
-
-		@Override
-		public void startNewKey(Key key) {
-			currentValue = null;
-		}
-
-		@Override
-		public void nextValue(Type value) throws Exception {
-			if (currentValue != null) {
-				currentValue = function.reduce(currentValue, value);
-			}
-			else {
-				currentValue = value;
-			}
-		}
-
-		@Override
-		public void keyDone() throws Exception {
-			out.collect(currentValue);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
deleted file mode 100644
index 0e07cea..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-
-public class AggregatingProcessingTimeWindowOperator<KEY, IN> 
-		extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, IN, IN, ReduceFunction<IN>> {
-
-	private static final long serialVersionUID = 7305948082830843475L;
-
-	
-	public AggregatingProcessingTimeWindowOperator(
-			ReduceFunction<IN> function,
-			KeySelector<IN, KEY> keySelector,
-			TypeSerializer<KEY> keySerializer,
-			TypeSerializer<IN> aggregateSerializer,
-			long windowLength,
-			long windowSlide)
-	{
-		super(function, keySelector, keySerializer, aggregateSerializer, windowLength, windowSlide);
-	}
-
-	@Override
-	protected AggregatingKeyedTimePanes<IN, KEY> createPanes(KeySelector<IN, KEY> keySelector, Function function) {
-		@SuppressWarnings("unchecked")
-		ReduceFunction<IN> windowFunction = (ReduceFunction<IN>) function;
-		
-		return new AggregatingKeyedTimePanes<IN, KEY>(keySelector, windowFunction);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
deleted file mode 100644
index 1bb451a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
-import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
-import org.apache.flink.streaming.api.windowing.evictors.Evictor;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.EvictingWindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Evicting window operator for non-keyed windows.
- *
- * @see org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator
- *
- * @param <IN> The type of the incoming elements.
- * @param <OUT> The type of elements emitted by the {@code WindowFunction}.
- * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns.
- */
-public class EvictingNonKeyedWindowOperator<IN, OUT, W extends Window> extends NonKeyedWindowOperator<IN, OUT, W> {
-
-	private static final long serialVersionUID = 1L;
-
-	private final Evictor<? super IN, ? super W> evictor;
-
-	public EvictingNonKeyedWindowOperator(WindowAssigner<? super IN, W> windowAssigner,
-			TypeSerializer<W> windowSerializer,
-			WindowBufferFactory<? super IN, ? extends EvictingWindowBuffer<IN>> windowBufferFactory,
-			AllWindowFunction<IN, OUT, W> windowFunction,
-			Trigger<? super IN, ? super W> trigger,
-			Evictor<? super IN, ? super W> evictor) {
-		super(windowAssigner, windowSerializer, windowBufferFactory, windowFunction, trigger);
-		this.evictor = requireNonNull(evictor);
-	}
-
-	@Override
-	@SuppressWarnings("unchecked, rawtypes")
-	protected void emitWindow(Context context) throws Exception {
-		timestampedCollector.setTimestamp(context.window.maxTimestamp());
-		EvictingWindowBuffer<IN> windowBuffer = (EvictingWindowBuffer<IN>) context.windowBuffer;
-
-		int toEvict = 0;
-		if (windowBuffer.size() > 0) {
-			// need some type trickery here...
-			toEvict = evictor.evict((Iterable) windowBuffer.getElements(), windowBuffer.size(), context.window);
-		}
-
-		windowBuffer.removeElements(toEvict);
-
-		userFunction.apply(
-				context.window,
-				context.windowBuffer.getUnpackedElements(),
-				timestampedCollector);
-	}
-
-	@Override
-	public EvictingNonKeyedWindowOperator<IN, OUT, W> enableSetProcessingTime(boolean setProcessingTime) {
-		super.enableSetProcessingTime(setProcessingTime);
-		return this;
-	}
-
-
-	// ------------------------------------------------------------------------
-	// Getters for testing
-	// ------------------------------------------------------------------------
-
-	@VisibleForTesting
-	public Evictor<? super IN, ? super W> getEvictor() {
-		return evictor;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
deleted file mode 100644
index ad43812..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
-import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.EvictingWindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
-import org.apache.flink.streaming.api.windowing.evictors.Evictor;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * A {@link WindowOperator} that also allows an {@link Evictor} to be used.
- *
- * <p>
- * The {@code Evictor} is used to evict elements from panes before processing a window and after
- * a {@link Trigger} has fired.
- *
- * @param <K> The type of key returned by the {@code KeySelector}.
- * @param <IN> The type of the incoming elements.
- * @param <OUT> The type of elements emitted by the {@code WindowFunction}.
- * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns.
- */
-public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends WindowOperator<K, IN, OUT, W> {
-
-	private static final long serialVersionUID = 1L;
-
-	private final Evictor<? super IN, ? super W> evictor;
-
-	public EvictingWindowOperator(WindowAssigner<? super IN, W> windowAssigner,
-			TypeSerializer<W> windowSerializer,
-			KeySelector<IN, K> keySelector,
-			TypeSerializer<K> keySerializer,
-			WindowBufferFactory<? super IN, ? extends EvictingWindowBuffer<IN>> windowBufferFactory,
-			WindowFunction<IN, OUT, K, W> windowFunction,
-			Trigger<? super IN, ? super W> trigger,
-			Evictor<? super IN, ? super W> evictor) {
-		super(windowAssigner, windowSerializer, keySelector, keySerializer, windowBufferFactory, windowFunction, trigger);
-		this.evictor = requireNonNull(evictor);
-	}
-
-	@Override
-	@SuppressWarnings("unchecked, rawtypes")
-	protected void emitWindow(Context context) throws Exception {
-		timestampedCollector.setTimestamp(context.window.maxTimestamp());
-		EvictingWindowBuffer<IN> windowBuffer = (EvictingWindowBuffer<IN>) context.windowBuffer;
-
-		int toEvict = 0;
-		if (windowBuffer.size() > 0) {
-			// need some type trickery here...
-			toEvict = evictor.evict((Iterable) windowBuffer.getElements(), windowBuffer.size(), context.window);
-		}
-
-		windowBuffer.removeElements(toEvict);
-
-		userFunction.apply(context.key,
-				context.window,
-				context.windowBuffer.getUnpackedElements(),
-				timestampedCollector);
-	}
-
-	@Override
-	public EvictingWindowOperator<K, IN, OUT, W> enableSetProcessingTime(boolean setProcessingTime) {
-		super.enableSetProcessingTime(setProcessingTime);
-		return this;
-	}
-
-
-	// ------------------------------------------------------------------------
-	// Getters for testing
-	// ------------------------------------------------------------------------
-
-	@VisibleForTesting
-	public Evictor<? super IN, ? super W> getEvictor() {
-		return evictor;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMap.java
deleted file mode 100644
index 3f44c4a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMap.java
+++ /dev/null
@@ -1,651 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.runtime.util.MathUtils;
-
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-/**
- * A special Hash Map implementation that can be traversed efficiently in sync with other
- * hash maps.
- * <p>
- * The differences between this hash map and Java's "java.util.HashMap" are:
- * <ul>
- *     <li>A different hashing scheme. This implementation uses extensible hashing, meaning that
- *         each hash table growth takes one more lower hash code bit into account, and values that where
- *         formerly in the same bucket will afterwards be in the two adjacent buckets.</li>
- *     <li>This allows an efficient traversal of multiple hash maps together, even though the maps are
- *         of different sizes.</li>
- *     <li>The map offers functions such as "putIfAbsent()" and "putOrAggregate()"</li>
- *     <li>The map supports no removal/shrinking.</li>
- * </ul>
- */
-public class KeyMap<K, V> implements Iterable<KeyMap.Entry<K, V>> {
-	
-	/** The minimum table capacity, 64 entries */
-	private static final int MIN_CAPACITY = 0x40;
-	
-	/** The maximum possible table capacity, the largest positive power of
-	 * two in the 32bit signed integer value range */
-	private static final int MAX_CAPACITY = 0x40000000;
-	
-	/** The number of bits used for table addressing when table is at max capacity */
-	private static final int FULL_BIT_RANGE = MathUtils.log2strict(MAX_CAPACITY);
-	
-	// ------------------------------------------------------------------------
-	
-	/** The hash index, as an array of entries */
-	private Entry<K, V>[] table;
-	
-	/** The number of bits by which the hash code is shifted right, to find the bucket */
-	private int shift;
-	
-	/** The number of elements in the hash table */
-	private int numElements;
-	
-	/** The number of elements above which the hash table needs to grow */
-	private int rehashThreshold;
-	
-	/** The base-2 logarithm of the table capacity */ 
-	private int log2size;
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates a new hash table with the default initial capacity.
-	 */
-	public KeyMap() {
-		this(0);
-	}
-
-	/**
-	 * Creates a new table with a capacity tailored to the given expected number of elements.
-	 * 
-	 * @param expectedNumberOfElements The number of elements to tailor the capacity to.
-	 */
-	public KeyMap(int expectedNumberOfElements) {
-		if (expectedNumberOfElements < 0) {
-			throw new IllegalArgumentException("Invalid capacity: " + expectedNumberOfElements);
-		}
-		
-		// round up to the next power or two
-		// guard against too small capacity and integer overflows
-		int capacity = Integer.highestOneBit(expectedNumberOfElements) << 1;
-		capacity = capacity >= 0 ? Math.max(MIN_CAPACITY, capacity) : MAX_CAPACITY;
-
-		// this also acts as a sanity check
-		log2size = MathUtils.log2strict(capacity);
-		shift = FULL_BIT_RANGE - log2size;
-		table = allocateTable(capacity);
-		rehashThreshold = getRehashThreshold(capacity);
-	}
-
-	// ------------------------------------------------------------------------
-	//  Gets and Puts
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Inserts the given value, mapped under the given key. If the table already contains a value for
-	 * the key, the value is replaced and returned. If no value is contained, yet, the function
-	 * returns null.
-	 * 
-	 * @param key The key to insert.
-	 * @param value The value to insert.
-	 * @return The previously mapped value for the key, or null, if no value was mapped for the key.
-	 * 
-	 * @throws java.lang.NullPointerException Thrown, if the key is null.
-	 */
-	public final V put(K key, V value) {
-		final int hash = hash(key);
-		final int slot = indexOf (hash);
-		
-		// search the chain from the slot
-		for (Entry<K, V> e = table[slot]; e != null; e = e.next) {
-			Object k;
-			if (e.hashCode == hash && ((k = e.key) == key || key.equals(k))) {
-				// found match
-				V old = e.value;
-				e.value = value;
-				return old;
-			}
-		}
-
-		// no match, insert a new value
-		insertNewEntry(hash, key, value, slot);
-		return null;
-	}
-
-	/**
-	 * Inserts a value for the given key, if no value is yet contained for that key. Otherwise,
-	 * returns the value currently contained for the key.
-	 * <p>
-	 * The value that is inserted in case that the key is not contained, yet, is lazily created
-	 * using the given factory.
-	 *
-	 * @param key The key to insert.
-	 * @param factory The factory that produces the value, if no value is contained, yet, for the key.
-	 * @return The value in the map after this operation (either the previously contained value, or the
-	 *         newly created value).
-	 * 
-	 * @throws java.lang.NullPointerException Thrown, if the key is null.
-	 */
-	public final V putIfAbsent(K key, LazyFactory<V> factory) {
-		final int hash = hash(key);
-		final int slot = indexOf(hash);
-
-		// search the chain from the slot
-		for (Entry<K, V> entry = table[slot]; entry != null; entry = entry.next) {
-			if (entry.hashCode == hash && entry.key.equals(key)) {
-				// found match
-				return entry.value;
-			}
-		}
-
-		// no match, insert a new value
-		V value = factory.create();
-		insertNewEntry(hash, key, value, slot);
-
-		// return the created value
-		return value;
-	}
-
-	/**
-	 * Inserts or aggregates a value into the hash map. If the hash map does not yet contain the key,
-	 * this method inserts the value. If the table already contains the key (and a value) this
-	 * method will use the given ReduceFunction function to combine the existing value and the
-	 * given value to a new value, and store that value for the key. 
-	 * 
-	 * @param key The key to map the value.
-	 * @param value The new value to insert, or aggregate with the existing value.
-	 * @param aggregator The aggregator to use if a value is already contained.
-	 * 
-	 * @return The value in the map after this operation: Either the given value, or the aggregated value.
-	 * 
-	 * @throws java.lang.NullPointerException Thrown, if the key is null.
-	 * @throws Exception The method forwards exceptions from the aggregation function.
-	 */
-	public final V putOrAggregate(K key, V value, ReduceFunction<V> aggregator) throws Exception {
-		final int hash = hash(key);
-		final int slot = indexOf(hash);
-
-		// search the chain from the slot
-		for (Entry<K, V> entry = table[slot]; entry != null; entry = entry.next) {
-			if (entry.hashCode == hash && entry.key.equals(key)) {
-				// found match
-				entry.value = aggregator.reduce(entry.value, value);
-				return entry.value;
-			}
-		}
-
-		// no match, insert a new value
-		insertNewEntry(hash, key, value, slot);
-		// return the original value
-		return value;
-	}
-
-	/**
-	 * Looks up the value mapped under the given key. Returns null if no value is mapped under this key.
-	 * 
-	 * @param key The key to look up.
-	 * @return The value associated with the key, or null, if no value is found for the key.
-	 * 
-	 * @throws java.lang.NullPointerException Thrown, if the key is null.
-	 */
-	public V get(K key) {
-		final int hash = hash(key);
-		final int slot = indexOf(hash);
-		
-		// search the chain from the slot
-		for (Entry<K, V> entry = table[slot]; entry != null; entry = entry.next) {
-			if (entry.hashCode == hash && entry.key.equals(key)) {
-				return entry.value;
-			}
-		}
-		
-		// not found
-		return null;
-	}
-
-	private void insertNewEntry(int hashCode, K key, V value, int position) {
-		Entry<K,V> e = table[position];
-		table[position] = new Entry<>(key, value, hashCode, e);
-		numElements++;
-
-		// rehash if necessary
-		if (numElements > rehashThreshold) {
-			growTable();
-		}
-	}
-	
-	private int indexOf(int hashCode) {
-		return (hashCode >> shift) & (table.length - 1);
-	}
-
-	/**
-	 * Creates an iterator over the entries of this map.
-	 * 
-	 * @return An iterator over the entries of this map.
-	 */
-	@Override
-	public Iterator<Entry<K, V>> iterator() {
-		return new Iterator<Entry<K, V>>() {
-			
-			private final Entry<K, V>[] tab = KeyMap.this.table;
-			
-			private Entry<K, V> nextEntry;
-			
-			private int nextPos = 0;
-			
-			@Override
-			public boolean hasNext() {
-				if (nextEntry != null) {
-					return true;
-				}
-				else {
-					while (nextPos < tab.length) {
-						Entry<K, V> e = tab[nextPos++];
-						if (e != null) {
-							nextEntry = e;
-							return true;
-						}
-					}
-					return false;
-				}
-			}
-
-			@Override
-			public Entry<K, V> next() {
-				if (nextEntry != null || hasNext()) {
-					Entry<K, V> e = nextEntry;
-					nextEntry = nextEntry.next;
-					return e;
-				}
-				else {
-					throw new NoSuchElementException();
-				}
-			}
-
-			@Override
-			public void remove() {
-				throw new UnsupportedOperationException();
-			}
-		};
-	}
-	
-	// ------------------------------------------------------------------------
-	//  Properties
-	// ------------------------------------------------------------------------
-	
-	/**
-	 * Gets the number of elements currently in the map.
-	 * @return The number of elements currently in the map.
-	 */
-	public int size() {
-		return numElements;
-	}
-
-	/**
-	 * Checks whether the map is empty.
-	 * @return True, if the map is empty, false otherwise.
-	 */
-	public boolean isEmpty() {
-		return numElements == 0;
-	}
-
-	/**
-	 * Gets the current table capacity, i.e., the number of slots in the hash table, without
-	 * and overflow chaining.
-	 * @return The number of slots in the hash table.
-	 */
-	public int getCurrentTableCapacity() {
-		return table.length;
-	}
-
-	/**
-	 * Gets the base-2 logarithm of the hash table capacity, as returned by
-	 * {@link #getCurrentTableCapacity()}.
-	 * 
-	 * @return The base-2 logarithm of the hash table capacity.
-	 */
-	public int getLog2TableCapacity() {
-		return log2size;
-	}
-	
-	public int getRehashThreshold() {
-		return rehashThreshold;
-	}
-	
-	public int getShift() {
-		return shift;
-	}
-	
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-	
-	@SuppressWarnings("unchecked")
-	private Entry<K, V>[] allocateTable(int numElements) {
-		return (Entry<K, V>[]) new Entry<?, ?>[numElements];
-	}
-	
-	private void growTable() {
-		final int newSize = table.length << 1;
-				
-		// only grow if there is still space to grow the table
-		if (newSize > 0) {
-			final Entry<K, V>[] oldTable = this.table;
-			final Entry<K, V>[] newTable = allocateTable(newSize);
-
-			final int newShift = shift - 1;
-			final int newMask = newSize - 1;
-			
-			// go over all slots from the table. since we hash to adjacent positions in
-			// the new hash table, this is actually cache efficient
-			for (Entry<K, V> entry : oldTable) {
-				// traverse the chain for each slot
-				while (entry != null) {
-					final int newPos = (entry.hashCode >> newShift) & newMask;
-					Entry<K, V> nextEntry = entry.next;
-					entry.next = newTable[newPos];
-					newTable[newPos] = entry;
-					entry = nextEntry;
-				}
-			}
-			
-			this.table = newTable;
-			this.shift = newShift;
-			this.rehashThreshold = getRehashThreshold(newSize);
-			this.log2size += 1;
-		}
-	}
-	
-	private static int hash(Object key) {
-		int code = key.hashCode();
-		
-		// we need a strong hash function that generates diverse upper bits
-		// this hash function is more expensive than the "scramble" used by "java.util.HashMap",
-		// but required for this sort of hash table
-		code = (code + 0x7ed55d16) + (code << 12);
-		code = (code ^ 0xc761c23c) ^ (code >>> 19);
-		code = (code + 0x165667b1) + (code << 5);
-		code = (code + 0xd3a2646c) ^ (code << 9);
-		code = (code + 0xfd7046c5) + (code << 3);
-		return (code ^ 0xb55a4f09) ^ (code >>> 16);
-	}
-	
-	private static int getRehashThreshold(int capacity) {
-		// divide before multiply, to avoid overflow
-		return capacity / 4 * 3;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Testing Utilities
-	// ------------------------------------------------------------------------
-
-	/**
-	 * For testing only: Actively counts the number of entries, rather than using the
-	 * counter variable. This method has linear complexity, rather than constant.
-	 * 
-	 * @return The counted number of entries.
-	 */
-	int traverseAndCountElements() {
-		int num = 0;
-		
-		for (Entry<?, ?> entry : table) {
-			while (entry != null) {
-				num++;
-				entry = entry.next;
-			}
-		}
-		
-		return num;
-	}
-
-	/**
-	 * For testing only: Gets the length of the longest overflow chain.
-	 * This method has linear complexity.
-	 * 
-	 * @return The length of the longest overflow chain.
-	 */
-	int getLongestChainLength() {
-		int maxLen = 0;
-
-		for (Entry<?, ?> entry : table) {
-			int thisLen = 0;
-			while (entry != null) {
-				thisLen++;
-				entry = entry.next;
-			}
-			maxLen = Math.max(maxLen, thisLen);
-		}
-
-		return maxLen;
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * An entry in the hash table.
-	 * 
-	 * @param <K> Type of the key.
-	 * @param <V> Type of the value.
-	 */
-	public static final class Entry<K, V> {
-		
-		final K key;
-		final int hashCode;
-		
-		V value;
-		Entry<K, V> next;
-		long touchedTag;
-
-		Entry(K key, V value, int hashCode, Entry<K, V> next) {
-			this.key = key;
-			this.value = value;
-			this.next = next;
-			this.hashCode = hashCode;
-		}
-
-		public K getKey() {
-			return key;
-		}
-
-		public V getValue() {
-			return value;
-		}
-	}
-	
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Performs a traversal about logical the multi-map that results from the union of the
-	 * given maps. This method does not actually build a union of the map, but traverses the hash maps
-	 * together.
-	 * 
-	 * @param maps The array uf maps whose union should be traversed.
-	 * @param visitor The visitor that is called for each key and all values.
-	 * @param touchedTag A tag that is used to mark elements that have been touched in this specific
-	 *                   traversal. Each successive traversal should supply a larger value for this
-	 *                   tag than the previous one.
-	 * 
-	 * @param <K> The type of the map's key.
-	 * @param <V> The type of the map's value.
-	 */
-	public static <K, V> void traverseMaps(
-					final KeyMap<K, V>[] maps,
-					final TraversalEvaluator<K, V> visitor,
-					final long touchedTag)
-		throws Exception
-	{
-		// we need to work on the maps in descending size
-		Arrays.sort(maps, CapacityDescendingComparator.INSTANCE);
-		
-		final int[] shifts = new int[maps.length];
-		final int[] lowBitsMask = new int[maps.length];
-		final int numSlots = maps[0].table.length;
-		final int numTables = maps.length;
-		
-		// figure out how much each hash table collapses the entries
-		for (int i = 0; i < numTables; i++) {
-			shifts[i] = maps[0].log2size - maps[i].log2size;
-			lowBitsMask[i] = (1 << shifts[i]) - 1;
-		}
-		
-		// go over all slots (based on the largest hash table)
-		for (int pos = 0; pos < numSlots; pos++) {
-			
-			// for each slot, go over all tables, until the table does not have that slot any more
-			// for tables where multiple slots collapse into one, we visit that one when we process the
-			// latest of all slots that collapse to that one
-			int mask;
-			for (int rootTable = 0;
-					rootTable < numTables && ((mask = lowBitsMask[rootTable]) & pos) == mask;
-					rootTable++)
-			{
-				// use that table to gather keys and start collecting keys from the following tables
-				// go over all entries of that slot in the table
-				Entry<K, V> entry = maps[rootTable].table[pos >> shifts[rootTable]];
-				while (entry != null) {
-					// take only entries that have not been collected as part of other tables
-					if (entry.touchedTag < touchedTag) {
-						entry.touchedTag = touchedTag;
-						
-						final K key = entry.key;
-						final int hashCode = entry.hashCode;
-						visitor.startNewKey(key);
-						visitor.nextValue(entry.value);
-						
-						addEntriesFromChain(entry.next, visitor, key, touchedTag, hashCode);
-						
-						// go over the other hash tables and collect their entries for the key
-						for (int followupTable = rootTable + 1; followupTable < numTables; followupTable++) {
-							Entry<K, V> followupEntry = maps[followupTable].table[pos >> shifts[followupTable]];
-							if (followupEntry != null) {
-								addEntriesFromChain(followupEntry, visitor, key, touchedTag, hashCode);
-							}
-						}
-
-						visitor.keyDone();
-					}
-					
-					entry = entry.next;
-				}
-			}
-		}
-	}
-	
-	private static <K, V> void addEntriesFromChain(
-			Entry<K, V> entry,
-			TraversalEvaluator<K, V> visitor,
-			K key,
-			long touchedTag,
-			int hashCode) throws Exception
-	{
-		while (entry != null) {
-			if (entry.touchedTag < touchedTag && entry.hashCode == hashCode && entry.key.equals(key)) {
-				entry.touchedTag = touchedTag;
-				visitor.nextValue(entry.value);
-			}
-			entry = entry.next;
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	
-	/**
-	 * Comparator that defines a descending order on maps depending on their table capacity
-	 * and number of elements.
-	 */
-	static final class CapacityDescendingComparator implements Comparator<KeyMap<?, ?>> {
-		
-		static final CapacityDescendingComparator INSTANCE = new CapacityDescendingComparator();
-		
-		private CapacityDescendingComparator() {}
-
-
-		@Override
-		public int compare(KeyMap<?, ?> o1, KeyMap<?, ?> o2) {
-			// this sorts descending
-			int cmp = o2.getLog2TableCapacity() - o1.getLog2TableCapacity();
-			if (cmp != 0) {
-				return cmp;
-			}
-			else {
-				return o2.size() - o1.size();
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * A factory for lazy/on-demand instantiation of values.
-	 *
-	 * @param <V> The type created by the factory.
-	 */
-	public static interface LazyFactory<V> {
-
-		/**
-		 * The factory method; creates the value.
-		 * @return The value.
-		 */
-		V create();
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * A visitor for a traversal over the union of multiple hash maps. The visitor is
-	 * called for each key in the union of the maps and all values associated with that key
-	 * (one per map, but multiple across maps). 
-	 * 
-	 * @param <K> The type of the key.
-	 * @param <V> The type of the value.
-	 */
-	public static interface TraversalEvaluator<K, V> {
-
-		/**
-		 * Called whenever the traversal starts with a new key.
-		 * 
-		 * @param key The key traversed.
-		 * @throws Exception Method forwards all exceptions.
-		 */
-		void startNewKey(K key) throws Exception;
-
-		/**
-		 * Called for each value found for the current key.
-		 * 
-		 * @param value The next value.
-		 * @throws Exception Method forwards all exceptions.
-		 */
-		void nextValue(V value) throws Exception;
-
-		/**
-		 * Called when the traversal for the current key is complete.
-		 * 
-		 * @throws Exception Method forwards all exceptions.
-		 */
-		void keyDone() throws Exception;
-	}
-}