You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/09 12:16:44 UTC

[08/10] flink git commit: [FLINK-2780] Remove Old Windowing Logic and API

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArray.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArray.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArray.java
new file mode 100644
index 0000000..b1c080e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArray.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
+
+import java.lang.reflect.Array;
+
+/**
+ * Extracts multiple fields from an array and puts them into a new array of the
+ * specified type.
+ *
+ * @param <OUT>
+ *            The type of the output array. If out is set to String, the output
+ *            of the extractor will be a String[]. If it is set to String[] the
+ *            output will be String[][].
+ */
+public class FieldsFromArray<OUT> implements Extractor<Object, OUT[]> {
+
+	/**
+	 * Auto-generated version id
+	 */
+	private static final long serialVersionUID = 8075055384516397670L;
+	private int[] order;
+	private Class<OUT> clazz;
+
+	/**
+	 * Extracts multiple fields from an array and puts them in the given order
+	 * into a new array of the specified type.
+	 * 
+	 * @param clazz
+	 *            the Class object representing the component type of the new
+	 *            array
+	 * @param indexes
+	 *            The indexes of the fields to be extracted. Any order is
+	 *            possible, but not more than 255 fields due to limitations in
+	 *            {@link Array#newInstance(Class, int...)}.
+	 */
+	public FieldsFromArray(Class<OUT> clazz, int... indexes) {
+		this.order = indexes;
+		this.clazz = clazz;
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public OUT[] extract(Object in) {
+		OUT[] output = (OUT[]) Array.newInstance(clazz, order.length);
+		for (int i = 0; i < order.length; i++) {
+			output[i] = (OUT) Array.get(in, this.order[i]);
+		}
+		return output;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromTuple.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromTuple.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromTuple.java
new file mode 100644
index 0000000..fc7f3ab
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromTuple.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
+
+import org.apache.flink.api.java.tuple.Tuple;
+
+/**
+ * Extracts one or more fields of the type Double from a tuple and puts them
+ * into a new double[]
+ */
+public class FieldsFromTuple implements Extractor<Tuple, double[]> {
+
+	/**
+	 * auto generated version id
+	 */
+	private static final long serialVersionUID = -2554079091050273761L;
+	int[] indexes;
+
+	/**
+	 * Extracts one or more fields of the the type Double from a tuple and puts
+	 * them into a new double[] (in the specified order).
+	 * 
+	 * @param indexes
+	 *            The indexes of the fields to be extracted.
+	 */
+	public FieldsFromTuple(int... indexes) {
+		this.indexes = indexes;
+	}
+
+	@Override
+	public double[] extract(Tuple in) {
+		double[] out = new double[indexes.length];
+		for (int i = 0; i < indexes.length; i++) {
+			out[i] = (Double) in.getField(indexes[i]);
+		}
+		return out;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index cfa6d93..2ca82b1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -575,8 +575,6 @@ public class StreamGraph extends StreamingPlan {
 
 		setJobName(jobGraphName);
 
-		WindowingOptimizer.optimizeGraph(this);
-
 		StreamingJobGraphGenerator jobgraphGenerator = new StreamingJobGraphGenerator(this);
 
 		return jobgraphGenerator.createJobGraph(jobGraphName);
@@ -585,8 +583,6 @@ public class StreamGraph extends StreamingPlan {
 	@Override
 	public String getStreamingPlanAsJSON() {
 
-		WindowingOptimizer.optimizeGraph(this);
-
 		try {
 			return new JSONGenerator(this).getJSON();
 		} catch (JSONException e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java
deleted file mode 100644
index cbd2a40..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.graph;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer;
-import org.apache.flink.streaming.api.operators.windowing.WindowFlattener;
-import org.apache.flink.streaming.api.operators.windowing.WindowMerger;
-
-public class WindowingOptimizer {
-
-	public static void optimizeGraph(StreamGraph streamGraph) {
-
-		// Share common discrtizers
-		setDiscretizerReuse(streamGraph);
-
-		// Remove unnecessary merges before flatten operators
-		removeMergeBeforeFlatten(streamGraph);
-	}
-
-	@SuppressWarnings("rawtypes")
-	private static void removeMergeBeforeFlatten(StreamGraph streamGraph) {
-		Set<Tuple2<Integer, StreamOperator<?>>> operators = streamGraph.getOperators();
-		List<Integer> flatteners = new ArrayList<Integer>();
-
-		for (Tuple2<Integer, StreamOperator<?>> entry : operators) {
-			if (entry.f1 instanceof WindowFlattener) {
-				flatteners.add(entry.f0);
-			}
-		}
-
-		for (Integer flattenerId : flatteners) {
-			// Flatteners should have exactly one input
-			StreamNode input = streamGraph.getStreamNode(flattenerId).getInEdges().get(0)
-					.getSourceVertex();
-
-			// Check whether the flatten is applied after a merge
-			if (input.getOperator() instanceof WindowMerger) {
-
-				// Mergers should have exactly one input
-				StreamNode mergeInput = input.getInEdges().get(0).getSourceVertex();
-
-				// We connect the merge input to the flattener directly
-				streamGraph.addEdge(mergeInput.getId(), flattenerId, 0);
-
-				// If the merger is only connected to the flattener we delete it
-				// completely, otherwise we only remove the edge
-				if (input.getOutEdges().size() > 1) {
-					streamGraph.removeEdge(streamGraph.getStreamEdge(input.getId(), flattenerId));
-				} else {
-					streamGraph.removeVertex(input);
-				}
-
-				streamGraph.setParallelism(flattenerId, mergeInput.getParallelism());
-			}
-		}
-
-	}
-
-	private static void setDiscretizerReuse(StreamGraph streamGraph) {
-
-		Collection<StreamNode> nodes = streamGraph.getStreamNodes();
-		List<StreamNode> discretizers = new ArrayList<StreamNode>();
-
-		for (StreamNode node : nodes) {
-			if (node.getOperator() instanceof StreamDiscretizer) {
-				discretizers.add(node);
-			}
-		}
-
-		List<Tuple2<StreamDiscretizer<?>, List<StreamNode>>> matchingDiscretizers = new ArrayList<Tuple2<StreamDiscretizer<?>, List<StreamNode>>>();
-
-		for (StreamNode discretizer : discretizers) {
-			boolean matchedAny = false;
-			for (Tuple2<StreamDiscretizer<?>, List<StreamNode>> candidate : matchingDiscretizers) {
-
-				Set<Integer> discretizerInEdges = new HashSet<Integer>(
-						discretizer.getInEdgeIndices());
-				Set<Integer> toMatchInEdges = new HashSet<Integer>(candidate.f1.get(0)
-						.getInEdgeIndices());
-
-				boolean partitionersMatch = true;
-
-				for (StreamEdge edge1 : discretizer.getInEdges()) {
-					for (StreamEdge edge2 : candidate.f1.get(0).getInEdges()) {
-						if (edge1.getPartitioner().getClass() != edge2.getPartitioner().getClass()) {
-							partitionersMatch = false;
-						}
-					}
-				}
-
-				if (partitionersMatch
-						&& discretizer.getParallelism() == candidate.f1.get(0).getParallelism()
-						&& discretizer.getOperator().equals(candidate.f0)
-						&& discretizerInEdges.equals(toMatchInEdges)) {
-
-					candidate.f1.add(discretizer);
-					matchedAny = true;
-					break;
-				}
-			}
-			if (!matchedAny) {
-				List<StreamNode> matchingNodes = new ArrayList<StreamNode>();
-				matchingNodes.add(discretizer);
-				matchingDiscretizers.add(new Tuple2<StreamDiscretizer<?>, List<StreamNode>>(
-						(StreamDiscretizer<?>) discretizer.getOperator(), matchingNodes));
-			}
-		}
-
-		for (Tuple2<StreamDiscretizer<?>, List<StreamNode>> matching : matchingDiscretizers) {
-			List<StreamNode> matchList = matching.f1;
-			if (matchList.size() > 1) {
-				StreamNode first = matchList.get(0);
-				for (int i = 1; i < matchList.size(); i++) {
-					replaceDiscretizer(streamGraph, matchList.get(i).getId(), first.getId());
-				}
-			}
-		}
-	}
-
-	private static void replaceDiscretizer(StreamGraph streamGraph, Integer toReplaceID,
-			Integer replaceWithId) {
-		// Convert to array to create a copy
-		List<StreamEdge> outEdges = new ArrayList<StreamEdge>(streamGraph
-				.getStreamNode(toReplaceID).getOutEdges());
-
-		int numOutputs = outEdges.size();
-
-		// Reconnect outputs
-		for (int i = 0; i < numOutputs; i++) {
-			StreamEdge outEdge = outEdges.get(i);
-
-			streamGraph.addEdge(replaceWithId, outEdge.getTargetId(), 0);
-		}
-
-		// Remove the other discretizer
-		streamGraph.removeVertex(streamGraph.getStreamNode(toReplaceID));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/EmptyWindowFilter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/EmptyWindowFilter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/EmptyWindowFilter.java
deleted file mode 100644
index e0fbd89..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/EmptyWindowFilter.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-
-public class EmptyWindowFilter<OUT> implements FilterFunction<StreamWindow<OUT>> {
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public boolean filter(StreamWindow<OUT> value) throws Exception {
-		return !value.isEmpty();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java
deleted file mode 100644
index 5141598..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.windowing.policy.CentralActiveTrigger;
-import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class GroupedActiveDiscretizer<IN> extends GroupedStreamDiscretizer<IN> {
-
-	private static final long serialVersionUID = 1L;
-
-	private static final Logger LOG = LoggerFactory.getLogger(GroupedActiveDiscretizer.class);
-
-	private volatile IN last;
-	private Thread centralThread;
-	private CentralCheck centralCheck;
-
-	public GroupedActiveDiscretizer(KeySelector<IN, ?> keySelector,
-			CentralActiveTrigger<IN> triggerPolicy, CloneableEvictionPolicy<IN> evictionPolicy) {
-		super(keySelector, triggerPolicy, evictionPolicy);
-	}
-
-	@Override
-	protected StreamDiscretizer<IN> makeNewGroup(Object key) throws Exception {
-
-		StreamDiscretizer<IN> groupDiscretizer = new StreamDiscretizer<IN>(triggerPolicy.clone(),
-				evictionPolicy.clone());
-
-		groupDiscretizer.setup(this.output, this.runtimeContext);
-		// We omit the groupDiscretizer.open(...) call here to avoid starting
-		// new active threads
-		return groupDiscretizer;
-	}
-
-	@Override
-	public void processElement(StreamRecord<IN> element) throws Exception {
-
-//			last = copy(element);
-			last = element.getValue();
-			Object key = keySelector.getKey(element.getValue());
-
-			synchronized (groupedDiscretizers) {
-				StreamDiscretizer<IN> groupDiscretizer = groupedDiscretizers.get(key);
-
-				if (groupDiscretizer == null) {
-					groupDiscretizer = makeNewGroup(key);
-					groupedDiscretizers.put(key, groupDiscretizer);
-				}
-
-				groupDiscretizer.processRealElement(element);
-			}
-	}
-
-	@Override
-	public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
-		super.open(parameters);
-		centralCheck = new CentralCheck();
-		centralThread = new Thread(centralCheck);
-		centralThread.start();
-	}
-
-	@Override
-	public void dispose() {
-		try {
-			centralCheck.running = false;
-			centralThread.interrupt();
-			centralThread.join();
-		} catch (InterruptedException e) {
-			LOG.info("GroupedActiveDiscretizer got interruped while joining with central thread: {}", e);
-		}
-	}
-
-	private class CentralCheck implements Runnable {
-
-		volatile boolean running = true;
-
-		@Override
-		public void run() {
-			while (running) {
-				// wait for the specified granularity
-				try {
-					Thread.sleep(2000);
-				} catch (InterruptedException e) {
-					// ignore it...
-				}
-
-				try {
-					if (last != null) {
-						synchronized (groupedDiscretizers) {
-							for (StreamDiscretizer<IN> group : groupedDiscretizers.values()) {
-
-								CentralActiveTrigger<IN> groupTrigger = (CentralActiveTrigger<IN>) group.triggerPolicy;
-								Object[] fakes = groupTrigger.notifyOnLastGlobalElement(last);
-								if (fakes != null) {
-									for (Object fake : fakes) {
-										group.triggerOnFakeElement(fake);
-									}
-								}
-							}
-						}
-
-					}
-				} catch (Exception e) {
-					throw new RuntimeException(e);
-				}
-
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java
deleted file mode 100644
index e3cab5c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * This operator represents the grouped discretization step of a window
- * transformation. The user supplied eviction and trigger policies are applied
- * on a per group basis to create the {@link StreamWindow} that will be further
- * transformed in the next stages. </p> To allow pre-aggregations supply an
- * appropriate {@link WindowBuffer}.
- */
-public class GroupedStreamDiscretizer<IN> extends StreamDiscretizer<IN> {
-
-	private static final long serialVersionUID = 1L;
-
-	protected KeySelector<IN, ?> keySelector;
-	protected Configuration parameters;
-	protected CloneableTriggerPolicy<IN> triggerPolicy;
-	protected CloneableEvictionPolicy<IN> evictionPolicy;
-
-	protected Map<Object, StreamDiscretizer<IN>> groupedDiscretizers;
-
-	public GroupedStreamDiscretizer(KeySelector<IN, ?> keySelector,
-			CloneableTriggerPolicy<IN> triggerPolicy, CloneableEvictionPolicy<IN> evictionPolicy) {
-
-		super(triggerPolicy, evictionPolicy);
-
-		this.keySelector = keySelector;
-
-		this.triggerPolicy = triggerPolicy;
-		this.evictionPolicy = evictionPolicy;
-
-		this.groupedDiscretizers = new HashMap<Object, StreamDiscretizer<IN>>();
-	}
-
-	@Override
-	public void close() throws Exception {
-		super.close();
-		for (StreamDiscretizer<IN> group : groupedDiscretizers.values()) {
-			group.emitWindow();
-		}
-	}
-
-	@Override
-	public void processElement(StreamRecord<IN> element) throws Exception {
-
-
-			Object key = keySelector.getKey(element.getValue());
-
-			StreamDiscretizer<IN> groupDiscretizer = groupedDiscretizers.get(key);
-
-			if (groupDiscretizer == null) {
-				groupDiscretizer = makeNewGroup(key);
-				groupedDiscretizers.put(key, groupDiscretizer);
-			}
-
-			groupDiscretizer.processRealElement(element);
-
-	}
-
-	/**
-	 * This method creates a new group. The method gets called in case an
-	 * element arrives which has a key which was not seen before. The method
-	 * created a nested {@link StreamDiscretizer} and therefore created clones
-	 * of all distributed trigger and eviction policies.
-	 * 
-	 * @param key
-	 *            The key of the new group.
-	 */
-	protected StreamDiscretizer<IN> makeNewGroup(Object key) throws Exception {
-
-		StreamDiscretizer<IN> groupDiscretizer = new StreamDiscretizer<IN>(triggerPolicy.clone(),
-				evictionPolicy.clone());
-
-		// TODO: this seems very hacky, maybe we can get around this
-		groupDiscretizer.setup(this.output, this.runtimeContext);
-		groupDiscretizer.open(this.parameters);
-
-		return groupDiscretizer;
-	}
-
-	@Override
-	public boolean equals(Object other) {
-		if (other == null || !(other instanceof GroupedStreamDiscretizer)) {
-			return false;
-		} else {
-			try {
-				@SuppressWarnings("unchecked")
-				GroupedStreamDiscretizer<IN> otherDiscretizer = (GroupedStreamDiscretizer<IN>) other;
-
-				return triggerPolicy.equals(otherDiscretizer.triggerPolicy)
-						&& evictionPolicy.equals(otherDiscretizer.evictionPolicy)
-						&& keySelector.equals(otherDiscretizer.keySelector);
-
-			} catch (ClassCastException e) {
-				return false;
-			}
-		}
-	}
-
-	@Override
-	public String toString() {
-		return "GroupedDiscretizer(Key: " + keySelector.getClass().getSimpleName() + ", Trigger: "
-				+ triggerPolicy.toString() + ", Eviction: " + evictionPolicy.toString() + ")";
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedWindowBuffer.java
deleted file mode 100644
index c74b96e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedWindowBuffer.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.WindowEvent;
-import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * This operator flattens the results of the window transformations by
- * outputing the elements of the {@link StreamWindow} one-by-one
- */
-public class GroupedWindowBuffer<T> extends StreamWindowBuffer<T> {
-
-	private static final long serialVersionUID = 1L;
-
-	private KeySelector<T, ?> keySelector;
-
-	private transient Map<Object, WindowBuffer<T>> windowMap;
-
-	public GroupedWindowBuffer(WindowBuffer<T> buffer, KeySelector<T, ?> keySelector) {
-		super(buffer);
-		this.keySelector = keySelector;
-		this.windowMap = new HashMap<Object, WindowBuffer<T>>();
-	}
-
-	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-		in.defaultReadObject();
-		this.windowMap = new HashMap<Object, WindowBuffer<T>>();
-	}
-
-	@Override
-	public void processElement(StreamRecord<WindowEvent<T>> event) throws Exception {
-		if (event.getValue().getElement() != null) {
-			Object key = keySelector.getKey(event.getValue().getElement());
-			WindowBuffer<T> currentWindow = windowMap.get(key);
-
-			if (currentWindow == null) {
-				currentWindow = buffer.clone();
-				windowMap.put(key, currentWindow);
-			}
-
-			handleWindowEvent(event.getValue(), currentWindow);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelGroupedMerge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelGroupedMerge.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelGroupedMerge.java
deleted file mode 100644
index de947eb..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelGroupedMerge.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-
-/**
- * The version of the ParallelMerge CoFlatMap that does not reduce the incoming
- * elements only appends them to the current window. This is necessary for
- * grouped reduces.
- */
-public class ParallelGroupedMerge<OUT> extends ParallelMerge<OUT> {
-
-	private static final long serialVersionUID = 1L;
-
-	public ParallelGroupedMerge() {
-		super(null);
-	}
-
-	@Override
-	protected void updateCurrent(StreamWindow<OUT> current, StreamWindow<OUT> nextWindow)
-			throws Exception {
-		current.addAll(nextWindow);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMerge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMerge.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMerge.java
deleted file mode 100644
index ce7d887..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMerge.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.util.Collector;
-
-/**
- * Class that encapsulates the functionality necessary to merge windows created
- * in parallel. This CoFlatMap uses the information received on the number of
- * parts for each window to merge the different parts. It waits until it
- * receives an indication on the number of parts from all the discretizers
- * before producing any output.
- */
-public class ParallelMerge<OUT> extends
-		RichCoFlatMapFunction<StreamWindow<OUT>, Tuple2<Integer, Integer>, StreamWindow<OUT>> {
-
-	private static final long serialVersionUID = 1L;
-
-	protected Integer numberOfDiscretizers;
-	private ReduceFunction<OUT> reducer;
-
-	private Map<Integer, Integer> availableNumberOfParts = new HashMap<Integer, Integer>();
-	private Map<Integer, Tuple2<StreamWindow<OUT>, Integer>> receivedWindows = new HashMap<Integer, Tuple2<StreamWindow<OUT>, Integer>>();
-	private Map<Integer, Tuple2<Integer, Integer>> receivedNumberOfParts = new HashMap<Integer, Tuple2<Integer, Integer>>();
-
-	public ParallelMerge(ReduceFunction<OUT> reducer) {
-		this.reducer = reducer;
-	}
-
-	@Override
-	public void flatMap1(StreamWindow<OUT> nextWindow, Collector<StreamWindow<OUT>> out)
-			throws Exception {
-
-		Integer id = nextWindow.windowID;
-
-		Tuple2<StreamWindow<OUT>, Integer> current = receivedWindows.get(id);
-
-		if (current == null) {
-			current = new Tuple2<StreamWindow<OUT>, Integer>(nextWindow, 1);
-		} else {
-			updateCurrent(current.f0, nextWindow);
-			current.f1++;
-		}
-
-		Integer count = current.f1;
-
-		if (availableNumberOfParts.containsKey(id) && availableNumberOfParts.get(id) <= count) {
-			out.collect(current.f0);
-			receivedWindows.remove(id);
-			availableNumberOfParts.remove(id);
-
-			checkOld(id);
-
-		} else {
-			receivedWindows.put(id, (Tuple2<StreamWindow<OUT>, Integer>) current);
-		}
-	}
-
-	private void checkOld(Integer id) {
-		// In case we have remaining partial windows (which indicates errors in
-		// processing), output and log them
-		if (receivedWindows.containsKey(id - 1)) {
-			throw new RuntimeException("Error in processing logic, window with id " + id
-					+ " should have already been processed");
-		}
-
-	}
-
-	@Override
-	public void flatMap2(Tuple2<Integer, Integer> partInfo, Collector<StreamWindow<OUT>> out)
-			throws Exception {
-
-		Integer id = partInfo.f0;
-		Integer numOfParts = partInfo.f1;
-
-		Tuple2<Integer, Integer> currentPartInfo = receivedNumberOfParts.get(id);
-		if (currentPartInfo != null) {
-			currentPartInfo.f0 += numOfParts;
-			currentPartInfo.f1++;
-		} else {
-			currentPartInfo = new Tuple2<Integer, Integer>(numOfParts, 1);
-			receivedNumberOfParts.put(id, currentPartInfo);
-		}
-
-		if (currentPartInfo.f1 >= numberOfDiscretizers) {
-			receivedNumberOfParts.remove(id);
-
-			Tuple2<StreamWindow<OUT>, Integer> current = receivedWindows.get(id);
-
-			Integer count = current != null ? current.f1 : -1;
-
-			if (count >= currentPartInfo.f0) {
-				out.collect(current.f0);
-				receivedWindows.remove(id);
-				checkOld(id);
-			} else if (currentPartInfo.f0 > 0) {
-				availableNumberOfParts.put(id, currentPartInfo.f1);
-			}
-		}
-
-	}
-
-	protected void updateCurrent(StreamWindow<OUT> current, StreamWindow<OUT> nextWindow)
-			throws Exception {
-		if (current.size() != 1 || nextWindow.size() != 1) {
-			throw new RuntimeException(
-					"Error in parallel merge logic. Current window should contain only one element.");
-		}
-		OUT currentReduced = current.remove(0);
-		currentReduced = reducer.reduce(currentReduced, nextWindow.get(0));
-		current.add(currentReduced);
-	}
-
-	@Override
-	public void open(Configuration conf) {
-		this.numberOfDiscretizers = getRuntimeContext().getNumberOfParallelSubtasks();
-	}
-
-	Map<Integer, Tuple2<StreamWindow<OUT>, Integer>> getReceivedWindows() {
-		return receivedWindows;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeOperator.java
deleted file mode 100644
index 74df3ad..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeOperator.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-
-public class ParallelMergeOperator<OUT> extends CoStreamFlatMap<StreamWindow<OUT>, Tuple2<Integer, Integer>, StreamWindow<OUT>> {
-
-	private ParallelMerge<OUT> parallelMerge;
-
-	public ParallelMergeOperator(ParallelMerge<OUT> parallelMerge) {
-		super(parallelMerge);
-		this.parallelMerge = parallelMerge;
-	}
-
-	@Override
-	public void close() throws Exception {
-		// emit remaining (partial) windows
-
-		for (Tuple2<StreamWindow<OUT>, Integer> receivedWindow : parallelMerge.getReceivedWindows().values()) {
-			getCollector().collect(receivedWindow.f0);
-		}
-
-		super.close();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java
deleted file mode 100644
index a5e00aa..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.WindowEvent;
-import org.apache.flink.streaming.api.windowing.policy.ActiveEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerCallback;
-import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * This operator represents the discretization step of a window transformation.
- * The user supplied eviction and trigger policies are applied to create the
- * {@link StreamWindow} that will be further transformed in the next stages.
- */
-public class StreamDiscretizer<IN>
-		extends AbstractStreamOperator<WindowEvent<IN>>
-		implements OneInputStreamOperator<IN, WindowEvent<IN>> {
-
-	private static final long serialVersionUID = 1L;
-
-	protected TriggerPolicy<IN> triggerPolicy;
-	protected EvictionPolicy<IN> evictionPolicy;
-	private boolean isActiveTrigger;
-	private boolean isActiveEviction;
-	private int bufferSize = 0;
-
-	private transient Thread activePolicyThread;
-
-	protected WindowEvent<IN> windowEvent = new WindowEvent<IN>();
-
-	public StreamDiscretizer(TriggerPolicy<IN> triggerPolicy, EvictionPolicy<IN> evictionPolicy) {
-		this.triggerPolicy = triggerPolicy;
-		this.evictionPolicy = evictionPolicy;
-
-		this.isActiveTrigger = triggerPolicy instanceof ActiveTriggerPolicy;
-		this.isActiveEviction = evictionPolicy instanceof ActiveEvictionPolicy;
-
-		this.chainingStrategy = ChainingStrategy.FORCE_ALWAYS;
-	}
-
-	public TriggerPolicy<IN> getTrigger() {
-		return triggerPolicy;
-	}
-
-	public EvictionPolicy<IN> getEviction() {
-		return evictionPolicy;
-	}
-
-	@Override
-	public void processElement(StreamRecord<IN> element) throws Exception {
-		processRealElement(element);
-	}
-
-	/**
-	 * This method processed an arrived real element The method is synchronized
-	 * to ensure that it cannot interleave with
-	 * {@link StreamDiscretizer#triggerOnFakeElement(Object)}
-	 * 
-	 * @param input
-	 *            a real input element
-	 * @throws Exception
-	 */
-	protected synchronized void processRealElement(StreamRecord<IN> input) throws Exception {
-
-		// Setting the input element in order to avoid NullFieldException when triggering on fake element
-		windowEvent.setElement(input.getValue());
-		if (isActiveTrigger) {
-			ActiveTriggerPolicy<IN> trigger = (ActiveTriggerPolicy<IN>) triggerPolicy;
-			Object[] result = trigger.preNotifyTrigger(input.getValue());
-			for (Object in : result) {
-				triggerOnFakeElement(in);
-			}
-		}
-
-		boolean isTriggered = false;
-
-		if (triggerPolicy.notifyTrigger(input.getValue())) {
-			emitWindow();
-			isTriggered = true;
-		}
-
-		evict(input.getValue(), isTriggered);
-
-		output.collect(input.replace(windowEvent.setElement(input.getValue())));
-		bufferSize++;
-
-	}
-
-	/**
-	 * This method triggers on an arrived fake element The method is
-	 * synchronized to ensure that it cannot interleave with
-	 * {@link StreamDiscretizer#processRealElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)}
-	 * 
-	 * @param input
-	 *            a fake input element
-	 */
-	@SuppressWarnings("unchecked")
-	protected synchronized void triggerOnFakeElement(Object input) {
-		if (isActiveEviction) {
-			activeEvict(input);
-			emitWindow();
-		} else {
-			emitWindow();
-			evict((IN) input, true);
-		}
-	}
-
-	/**
-	 * This method emits the content of the buffer as a new {@link StreamWindow}
-	 * if not empty
-	 */
-	protected void emitWindow() {
-		output.collect(new StreamRecord<WindowEvent<IN>>(windowEvent.setTrigger()));
-	}
-
-	private void activeEvict(Object input) {
-		int numToEvict = 0;
-
-		if (isActiveEviction) {
-			ActiveEvictionPolicy<IN> ep = (ActiveEvictionPolicy<IN>) evictionPolicy;
-			numToEvict = ep.notifyEvictionWithFakeElement(input, bufferSize);
-		}
-
-		if (numToEvict > 0) {
-			output.collect(new StreamRecord<WindowEvent<IN>>(windowEvent.setEviction(numToEvict)));
-			bufferSize -= numToEvict;
-			bufferSize = bufferSize >= 0 ? bufferSize : 0;
-		}
-	}
-
-	private void evict(IN input, boolean isTriggered) {
-		int numToEvict = evictionPolicy.notifyEviction(input, isTriggered, bufferSize);
-
-		if (numToEvict > 0) {
-			output.collect(new StreamRecord<WindowEvent<IN>>(windowEvent.setEviction(numToEvict)));
-			bufferSize -= numToEvict;
-			bufferSize = bufferSize >= 0 ? bufferSize : 0;
-		}
-	}
-
-	@Override
-	public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
-		super.open(parameters);
-
-		if (isActiveTrigger) {
-			ActiveTriggerPolicy<IN> tp = (ActiveTriggerPolicy<IN>) triggerPolicy;
-
-			Runnable runnable = tp.createActiveTriggerRunnable(new WindowingCallback());
-			if (runnable != null) {
-				activePolicyThread = new Thread(runnable);
-				activePolicyThread.start();
-			}
-		}
-	}
-
-	@Override
-	public void close() throws Exception {
-		super.close();
-		if (activePolicyThread != null) {
-			activePolicyThread.interrupt();
-		}
-
-		emitWindow();
-	}
-
-	@Override
-	public void dispose() {
-		if (activePolicyThread != null) {
-			activePolicyThread.interrupt();
-		}
-	}
-
-	/**
-	 * This class allows the active trigger thread to call back and push fake
-	 * elements at any time.
-	 */
-	private class WindowingCallback implements ActiveTriggerCallback {
-
-		@Override
-		public void sendFakeElement(Object datapoint) {
-			triggerOnFakeElement(datapoint);
-		}
-
-	}
-
-	@Override
-	public boolean equals(Object other) {
-		if (other == null || !(other instanceof StreamDiscretizer)
-				|| (other instanceof GroupedStreamDiscretizer)) {
-			return false;
-		} else {
-			try {
-				@SuppressWarnings("unchecked")
-				StreamDiscretizer<IN> otherDiscretizer = (StreamDiscretizer<IN>) other;
-
-				return triggerPolicy.equals(otherDiscretizer.triggerPolicy)
-						&& evictionPolicy.equals(otherDiscretizer.evictionPolicy);
-
-			} catch (ClassCastException e) {
-				return false;
-			}
-		}
-	}
-
-	@Override
-	public String toString() {
-		return "Discretizer(Trigger: " + triggerPolicy.toString() + ", Eviction: "
-				+ evictionPolicy.toString() + ")";
-	}
-
-	@Override
-	public void processWatermark(Watermark mark) throws Exception {
-		output.emitWatermark(mark);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java
deleted file mode 100644
index c057f91..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.WindowEvent;
-import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * This operator manages the window buffers attached to the discretizers.
- */
-public class StreamWindowBuffer<T>
-		extends AbstractStreamOperator<StreamWindow<T>>
-		implements OneInputStreamOperator<WindowEvent<T>, StreamWindow<T>> {
-
-	private static final long serialVersionUID = 1L;
-
-	protected WindowBuffer<T> buffer;
-
-	public StreamWindowBuffer(WindowBuffer<T> buffer) {
-		this.buffer = buffer;
-		setChainingStrategy(ChainingStrategy.FORCE_ALWAYS);
-		disableInputCopy();
-	}
-
-	@Override
-	public void processElement(StreamRecord<WindowEvent<T>> windowEvent) throws Exception {
-		handleWindowEvent(windowEvent.getValue());
-	}
-
-	protected void handleWindowEvent(WindowEvent<T> windowEvent, WindowBuffer<T> buffer)
-			throws Exception {
-		if (windowEvent.isElement()) {
-			buffer.store(windowEvent.getElement());
-		} else if (windowEvent.isEviction()) {
-			buffer.evict(windowEvent.getEviction());
-		} else if (windowEvent.isTrigger()) {
-			buffer.emitWindow(output);
-		}
-	}
-
-	private void handleWindowEvent(WindowEvent<T> windowEvent) throws Exception {
-		handleWindowEvent(windowEvent, buffer);
-	}
-
-	@Override
-	public void processWatermark(Watermark mark) throws Exception {
-		output.emitWatermark(mark);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java
deleted file mode 100644
index fa7696a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * This operator flattens the results of the window transformations by
- * outputing the elements of the {@link StreamWindow} one-by-one
- */
-public class WindowFlattener<T> extends AbstractStreamOperator<T>
-		implements OneInputStreamOperator<StreamWindow<T>, T> {
-
-	private static final long serialVersionUID = 1L;
-
-	public WindowFlattener() {
-		chainingStrategy = ChainingStrategy.FORCE_ALWAYS;
-		disableInputCopy();
-	}
-
-	@Override
-	public void processElement(StreamRecord<StreamWindow<T>> window) throws Exception {
-		for (T element : window.getValue()) {
-			output.collect(new StreamRecord<T>(element));
-		}
-	}
-
-	@Override
-	public void processWatermark(Watermark mark) throws Exception {
-		output.emitWatermark(mark);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java
deleted file mode 100644
index cdfc35b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.WindowedDataStream;
-import org.apache.flink.streaming.api.operators.StreamMap;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-
-/**
- * This operator is used to apply foldWindow transformations on
- * {@link WindowedDataStream}s.
- */
-public class WindowFolder<IN, OUT> extends StreamMap<StreamWindow<IN>, StreamWindow<OUT>> {
-
-	private static final long serialVersionUID = 1L;
-
-	FoldFunction<IN, OUT> folder;
-
-	public WindowFolder(FoldFunction<IN, OUT> folder, OUT initialValue) {
-		super(new WindowFoldFunction<IN, OUT>(folder, initialValue));
-		this.folder = folder;
-		disableInputCopy();
-	}
-
-	private static class WindowFoldFunction<IN, OUT> extends AbstractRichFunction implements
-			MapFunction<StreamWindow<IN>, StreamWindow<OUT>> {
-
-		private static final long serialVersionUID = 1L;
-		private OUT initialValue;
-		FoldFunction<IN, OUT> folder;
-
-		public WindowFoldFunction(FoldFunction<IN, OUT> folder, OUT initialValue) {
-			this.folder = folder;
-			this.initialValue = initialValue;
-		}
-
-		@Override
-		public StreamWindow<OUT> map(StreamWindow<IN> window) throws Exception {
-			StreamWindow<OUT> outputWindow = new StreamWindow<OUT>(window.windowID);
-			outputWindow.numberOfParts = window.numberOfParts;
-
-			if (!window.isEmpty()) {
-				OUT accumulator = initialValue;
-				for (int i = 0; i < window.size(); i++) {
-					accumulator = folder.fold(accumulator, window.get(i));
-				}
-				outputWindow.add(accumulator);
-			}
-			return outputWindow;
-		}
-
-		// --------------------------------------------------------------------------------------------
-		//  Forwarding calls to the wrapped folder
-		// --------------------------------------------------------------------------------------------
-
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			FunctionUtils.openFunction(folder, parameters);
-		}
-
-		@Override
-		public void close() throws Exception {
-			FunctionUtils.closeFunction(folder);
-		}
-
-		@Override
-		public void setRuntimeContext(RuntimeContext t) {
-			FunctionUtils.setFunctionRuntimeContext(folder, t);
-		}
-
-		@Override
-		public RuntimeContext getRuntimeContext() {
-			return FunctionUtils.getFunctionRuntimeContext(folder, getRuntimeContext());
-		}
-
-		// streaming does not use iteration runtime context, so that is omitted
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java
deleted file mode 100644
index ec4309d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.WindowedDataStream;
-import org.apache.flink.streaming.api.functions.WindowMapFunction;
-import org.apache.flink.streaming.api.operators.StreamMap;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-
-/**
- * This operator is used to apply mapWindow transformations on
- * {@link WindowedDataStream}s.
- */
-public class WindowMapper<IN, OUT> extends StreamMap<StreamWindow<IN>, StreamWindow<OUT>> {
-
-	private static final long serialVersionUID = 1L;
-
-	WindowMapFunction<IN, OUT> mapper;
-
-	public WindowMapper(WindowMapFunction<IN, OUT> mapper) {
-		super(new WindowMap<IN, OUT>(mapper));
-		this.mapper = mapper;
-		disableInputCopy();
-	}
-
-	private static class WindowMap<T, R> extends AbstractRichFunction
-			implements MapFunction<StreamWindow<T>, StreamWindow<R>> {
-
-		private static final long serialVersionUID = 1L;
-		WindowMapFunction<T, R> mapper;
-
-		public WindowMap(WindowMapFunction<T, R> mapper) {
-			this.mapper = mapper;
-		}
-
-		@Override
-		public StreamWindow<R> map(StreamWindow<T> window) throws Exception {
-			StreamWindow<R> outputWindow = new StreamWindow<R>(window.windowID);
-
-			outputWindow.numberOfParts = window.numberOfParts;
-
-			mapper.mapWindow(window, outputWindow);
-
-			return outputWindow;
-		}
-
-		// --------------------------------------------------------------------------------------------
-		//  Forwarding calls to the wrapped mapper
-		// --------------------------------------------------------------------------------------------
-
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			FunctionUtils.openFunction(mapper, parameters);
-		}
-
-		@Override
-		public void close() throws Exception {
-			FunctionUtils.closeFunction(mapper);
-		}
-
-		@Override
-		public void setRuntimeContext(RuntimeContext t) {
-			FunctionUtils.setFunctionRuntimeContext(mapper, t);
-		}
-
-		@Override
-		public RuntimeContext getRuntimeContext() {
-			return FunctionUtils.getFunctionRuntimeContext(mapper, getRuntimeContext());
-		}
-
-		// streaming does not use iteration runtime context, so that is omitted
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
deleted file mode 100644
index 9ed5e82..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * This operator merges together the different partitions of the
- * {@link StreamWindow}s used to merge the results of parallel transformations
- * that belong in the same window.
- */
-public class WindowMerger<T> extends AbstractStreamOperator<StreamWindow<T>>
-		implements OneInputStreamOperator<StreamWindow<T>, StreamWindow<T>> {
-
-	private static final long serialVersionUID = 1L;
-
-	private Map<Integer, StreamWindow<T>> windows;
-
-	public WindowMerger() {
-		this.windows = new HashMap<Integer, StreamWindow<T>>();
-		chainingStrategy = ChainingStrategy.FORCE_ALWAYS;
-		disableInputCopy();
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	public void processElement(StreamRecord<StreamWindow<T>> nextWindowRecord) throws Exception {
-		StreamWindow<T> nextWindow = nextWindowRecord.getValue();
-
-		StreamWindow<T> current = windows.get(nextWindow.windowID);
-
-		if (current == null) {
-			current = nextWindow;
-		} else {
-			current = StreamWindow.merge(current, nextWindow);
-		}
-
-		if (current.numberOfParts == 1) {
-			nextWindowRecord.replace(current);
-			output.collect(nextWindowRecord);
-			windows.remove(nextWindow.windowID);
-		} else {
-			windows.put(nextWindow.windowID, current);
-		}
-	}
-
-	@Override
-	public void processWatermark(Watermark mark) throws Exception {
-		output.emitWatermark(mark);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartExtractor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartExtractor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartExtractor.java
deleted file mode 100644
index 50b4e7d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartExtractor.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.util.Collector;
-
-/**
- * This FlatMapFunction is used to send the number of parts for each window ID
- * (for each parallel discretizer) to the parallel merger that will use is to
- * merge parallel discretized windows
- */
-public class WindowPartExtractor<OUT> implements FlatMapFunction<StreamWindow<OUT>, Tuple2<Integer, Integer>> {
-
-	private static final long serialVersionUID = 1L;
-
-	Integer lastIndex = -1;
-
-	@Override
-	public void flatMap(StreamWindow<OUT> value, Collector<Tuple2<Integer, Integer>> out)
-			throws Exception {
-
-		// We dont emit new values for the same index, this avoids sending the
-		// same information for the same partitioned window multiple times
-		if (value.windowID != lastIndex) {
-			
-			// For empty windows we send 0 since these windows will be filtered
-			// out
-			if (value.isEmpty()) {
-				out.collect(new Tuple2<Integer, Integer>(value.windowID, 0));
-			} else {
-				out.collect(new Tuple2<Integer, Integer>(value.windowID, value.numberOfParts));
-			}
-			lastIndex = value.windowID;
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java
deleted file mode 100644
index 9f31fa0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * This operator applies either split or key partitioning depending on the
- * transformation.
- */
-public class WindowPartitioner<T> extends AbstractStreamOperator<StreamWindow<T>>
-		implements OneInputStreamOperator<StreamWindow<T>, StreamWindow<T>> {
-
-	private static final long serialVersionUID = 1L;
-
-	private KeySelector<T, ?> keySelector;
-	private int numberOfSplits;
-
-	public WindowPartitioner(KeySelector<T, ?> keySelector) {
-		this.keySelector = keySelector;
-
-		chainingStrategy = ChainingStrategy.FORCE_ALWAYS;
-		disableInputCopy();
-	}
-
-	public WindowPartitioner(int numberOfSplits) {
-		this.numberOfSplits = numberOfSplits;
-
-		chainingStrategy = ChainingStrategy.ALWAYS;
-	}
-
-	@Override
-	public void processElement(StreamRecord<StreamWindow<T>> currentWindow) throws Exception {
-
-		if (keySelector == null) {
-			if (numberOfSplits <= 1) {
-				output.collect(currentWindow);
-			} else {
-				StreamWindow<T> unpackedWindow = currentWindow.getValue();
-				for (StreamWindow<T> window : StreamWindow.split(unpackedWindow, numberOfSplits)) {
-					currentWindow.replace(window);
-					output.collect(currentWindow);
-				}
-			}
-		} else {
-
-			for (StreamWindow<T> window : StreamWindow
-					.partitionBy(currentWindow.getValue(), keySelector, true)) {
-				output.collect(new StreamRecord<StreamWindow<T>>(window));
-			}
-
-		}
-	}
-
-	@Override
-	public void processWatermark(Watermark mark) throws Exception {
-		output.emitWatermark(mark);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java
deleted file mode 100644
index a43405e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.WindowedDataStream;
-import org.apache.flink.streaming.api.operators.StreamMap;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-
-/**
- * This operator is used to apply reduceWindow transformations on
- * {@link WindowedDataStream}s.
- */
-public class WindowReducer<IN> extends StreamMap<StreamWindow<IN>, StreamWindow<IN>> {
-
-	private static final long serialVersionUID = 1L;
-
-	ReduceFunction<IN> reducer;
-
-	public WindowReducer(ReduceFunction<IN> reducer) {
-		super(new WindowReduceFunction<IN>(reducer));
-		this.reducer = reducer;
-		disableInputCopy();
-	}
-
-	private static class WindowReduceFunction<T> extends AbstractRichFunction implements
-			MapFunction<StreamWindow<T>, StreamWindow<T>> {
-
-		private static final long serialVersionUID = 1L;
-		ReduceFunction<T> reducer;
-
-		public WindowReduceFunction(ReduceFunction<T> reducer) {
-			this.reducer = reducer;
-		}
-
-		@Override
-		public StreamWindow<T> map(StreamWindow<T> window) throws Exception {
-			StreamWindow<T> outputWindow = new StreamWindow<T>(window.windowID);
-			outputWindow.numberOfParts = window.numberOfParts;
-
-			if (!window.isEmpty()) {
-				T reduced = window.get(0);
-				for (int i = 1; i < window.size(); i++) {
-					reduced = reducer.reduce(reduced, window.get(i));
-				}
-				outputWindow.add(reduced);
-			}
-			return outputWindow;
-		}
-
-		// --------------------------------------------------------------------------------------------
-		//  Forwarding calls to the wrapped reducer
-		// --------------------------------------------------------------------------------------------
-
-
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			FunctionUtils.openFunction(reducer, parameters);
-		}
-
-		@Override
-		public void close() throws Exception {
-			FunctionUtils.closeFunction(reducer);
-		}
-
-		@Override
-		public void setRuntimeContext(RuntimeContext t) {
-			FunctionUtils.setFunctionRuntimeContext(reducer, t);
-		}
-
-		@Override
-		public RuntimeContext getRuntimeContext() {
-			return FunctionUtils.getFunctionRuntimeContext(reducer, getRuntimeContext());
-		}
-
-		// streaming does not use iteration runtime context, so that is omitted
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java
deleted file mode 100644
index 5a63940..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.datastream.WindowedDataStream;
-import org.apache.flink.util.Collector;
-
-/**
- * Core abstraction for representing windows for {@link WindowedDataStream}s.
- * The user can apply transformations on these windows with the appropriate
- * {@link WindowedDataStream} methods. </p> Each stream window consists of a
- * random ID, a number representing the number of partitions for this specific
- * window (ID) and the elements itself. The ID and number of parts will be used
- * to merge the subwindows after distributed transformations.
- */
-public class StreamWindow<T> extends ArrayList<T> implements Collector<T> {
-
-	private static final long serialVersionUID = -5150196421193988403L;
-	private static Random rnd = new Random();
-
-	public int windowID;
-	public int numberOfParts;
-
-	/**
-	 * Creates a new window with a random id
-	 */
-	public StreamWindow() {
-		this(rnd.nextInt(), 1);
-	}
-
-	/**
-	 * Creates a new window with the specific id
-	 * 
-	 * @param windowID
-	 *            ID of the window
-	 */
-	public StreamWindow(int windowID) {
-		this(windowID, 1);
-	}
-
-	/**
-	 * Creates a new window with the given id and number of parts
-	 * 
-	 * @param windowID
-	 * @param numberOfParts
-	 */
-	public StreamWindow(int windowID, int numberOfParts) {
-		super();
-		this.windowID = windowID;
-		this.numberOfParts = numberOfParts;
-	}
-
-	/**
-	 * Creates a shallow copy of the window
-	 * 
-	 * @param window
-	 *            The window to be copied
-	 */
-	public StreamWindow(StreamWindow<T> window) {
-		this(window.windowID, window.numberOfParts);
-		addAll(window);
-	}
-
-	/**
-	 * Creates a deep copy of the window using the given serializer
-	 * 
-	 * @param window
-	 *            The window to be copied
-	 * @param serializer
-	 *            The serializer used for copying the records.
-	 */
-	public StreamWindow(StreamWindow<T> window, TypeSerializer<T> serializer) {
-		this(window.windowID, window.numberOfParts);
-		for (T element : window) {
-			add(serializer.copy(element));
-		}
-	}
-
-	/**
-	 * Partitions the window using the given keyselector. A subwindow will be
-	 * created for each key.
-	 * 
-	 * @param streamWindow
-	 *            StreamWindow instance to partition
-	 * @param keySelector
-	 *            The keyselector used for extracting keys.
-	 * @param withKey
-	 *            Flag to decide whether the key object should be included in
-	 *            the created window
-	 * @return A list of the subwindows
-	 */
-	public static <X> List<StreamWindow<X>> partitionBy(StreamWindow<X> streamWindow,
-			KeySelector<X, ?> keySelector, boolean withKey) throws Exception {
-		Map<Object, StreamWindow<X>> partitions = new HashMap<Object, StreamWindow<X>>();
-
-		for (X value : streamWindow) {
-			Object key = keySelector.getKey(value);
-			StreamWindow<X> window = partitions.get(key);
-			if (window == null) {
-				window = new StreamWindow<X>(streamWindow.windowID, 0);
-				partitions.put(key, window);
-			}
-			window.add(value);
-		}
-
-		List<StreamWindow<X>> output = new ArrayList<StreamWindow<X>>();
-		int numkeys = partitions.size();
-
-		for (StreamWindow<X> window : partitions.values()) {
-			output.add(window.setNumberOfParts(numkeys));
-		}
-
-		return output;
-	}
-
-	/**
-	 * Splits the window into n equal (if possible) sizes.
-	 * 
-	 * @param window
-	 *            Window to split
-	 * @param n
-	 *            Number of desired partitions
-	 * @return The list of subwindows.
-	 */
-	public static <X> List<StreamWindow<X>> split(StreamWindow<X> window, int n) {
-		int numElements = window.size();
-		if (n == 0) {
-			return new ArrayList<StreamWindow<X>>();
-		}
-		if (n > numElements) {
-			return split(window, numElements);
-		} else {
-			List<StreamWindow<X>> splitsList = new ArrayList<StreamWindow<X>>();
-			int splitSize = numElements / n;
-
-			int index = -1;
-
-			StreamWindow<X> currentSubWindow = new StreamWindow<X>(window.windowID, n);
-			splitsList.add(currentSubWindow);
-
-			for (X element : window) {
-				index++;
-				if (index == splitSize && splitsList.size() < n) {
-					currentSubWindow = new StreamWindow<X>(window.windowID, n);
-					splitsList.add(currentSubWindow);
-					index = 0;
-				}
-				currentSubWindow.add(element);
-			}
-			return splitsList;
-		}
-	}
-
-	public StreamWindow<T> setNumberOfParts(int n) {
-		this.numberOfParts = n;
-		return this;
-	}
-
-	public void setID(int id) {
-		this.windowID = id;
-	}
-
-	/**
-	 * Checks whether this window can be merged with the given one.
-	 * 
-	 * @param otherWindow
-	 *            The window to test
-	 * @return Window compatibility
-	 */
-	public boolean compatibleWith(StreamWindow<T> otherWindow) {
-		return this.windowID == otherWindow.windowID && this.numberOfParts > 1;
-	}
-
-	/**
-	 * Merges compatible windows together.
-	 * 
-	 * @param windows
-	 *            Windows to merge
-	 * @return Merged window
-	 */
-	public static <R> StreamWindow<R> merge(StreamWindow<R>... windows) {
-		StreamWindow<R> window = new StreamWindow<R>(windows[0]);
-		for (int i = 1; i < windows.length; i++) {
-			StreamWindow<R> next = windows[i];
-			if (window.compatibleWith(next)) {
-				window.addAll(next);
-				window.numberOfParts--;
-			} else {
-				throw new RuntimeException("Can only merge compatible windows");
-			}
-		}
-		return window;
-	}
-
-	/**
-	 * Merges compatible windows together.
-	 * 
-	 * @param windows
-	 *            Windows to merge
-	 * @return Merged window
-	 */
-	public static <R> StreamWindow<R> merge(List<StreamWindow<R>> windows) {
-		if (windows.isEmpty()) {
-			throw new RuntimeException("Need at least one window to merge");
-		} else {
-			StreamWindow<R> window = new StreamWindow<R>(windows.get(0));
-			for (int i = 1; i < windows.size(); i++) {
-				StreamWindow<R> next = windows.get(i);
-				if (window.compatibleWith(next)) {
-					window.addAll(next);
-					window.numberOfParts--;
-				} else {
-					throw new RuntimeException("Can only merge compatible windows");
-				}
-			}
-			return window;
-		}
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		return super.equals(o);
-	}
-
-	@Override
-	public void collect(T record) {
-		add(record);
-	}
-
-	@Override
-	public void close() {
-	}
-
-	@Override
-	public String toString() {
-		return super.toString();
-	}
-
-	/**
-	 * Creates a new {@link StreamWindow} with random id from the given elements
-	 * 
-	 * @param elements
-	 *            The elements contained in the resulting window
-	 * @return The window
-	 */
-	public static <R> StreamWindow<R> fromElements(R... elements) {
-		StreamWindow<R> window = new StreamWindow<R>();
-		for (R element : elements) {
-			window.add(element);
-		}
-		return window;
-	}
-}