You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/04/15 11:38:54 UTC

[13/19] flink git commit: [streaming] Major internal renaming and restructure

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java
new file mode 100644
index 0000000..b688ea4
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer;
+import org.apache.flink.streaming.api.operators.windowing.WindowFlattener;
+import org.apache.flink.streaming.api.operators.windowing.WindowMerger;
+import org.apache.flink.streaming.runtime.partitioner.DistributePartitioner;
+
+public class WindowingOptimizer {
+
+	public static void optimizeGraph(StreamGraph streamGraph) {
+
+		// Share common discrtizers
+		setDiscretizerReuse(streamGraph);
+
+		// Remove unnecessary merges before flatten operators
+		removeMergeBeforeFlatten(streamGraph);
+	}
+
+	@SuppressWarnings("rawtypes")
+	private static void removeMergeBeforeFlatten(StreamGraph streamGraph) {
+		Set<Tuple2<Integer, StreamOperator<?, ?>>> operators = streamGraph.getOperators();
+		List<Integer> flatteners = new ArrayList<Integer>();
+
+		for (Tuple2<Integer, StreamOperator<?, ?>> entry : operators) {
+			if (entry.f1 instanceof WindowFlattener) {
+				flatteners.add(entry.f0);
+			}
+		}
+
+		for (Integer flattenerID : flatteners) {
+			// Flatteners should have exactly one input
+			StreamNode input = streamGraph.getVertex(flattenerID).getInEdges().get(0)
+					.getSourceVertex();
+
+			// Check whether the flatten is applied after a merge
+			if (input.getOperator() instanceof WindowMerger) {
+
+				// Mergers should have exactly one input
+				StreamNode mergeInput = input.getInEdges().get(0).getSourceVertex();
+
+				// We connect the merge input to the flattener directly
+				streamGraph.addEdge(mergeInput.getID(), flattenerID,
+						new DistributePartitioner(true), 0, new ArrayList<String>());
+
+				// If the merger is only connected to the flattener we delete it
+				// completely, otherwise we only remove the edge
+				if (input.getOutEdges().size() > 1) {
+					streamGraph.removeEdge(streamGraph.getEdge(input.getID(), flattenerID));
+				} else {
+					streamGraph.removeVertex(input);
+				}
+
+				streamGraph.setParallelism(flattenerID, mergeInput.getParallelism());
+			}
+		}
+
+	}
+
+	private static void setDiscretizerReuse(StreamGraph streamGraph) {
+
+		Set<Tuple2<Integer, StreamOperator<?, ?>>> operators = streamGraph.getOperators();
+		List<Tuple2<Integer, StreamDiscretizer<?>>> discretizers = new ArrayList<Tuple2<Integer, StreamDiscretizer<?>>>();
+
+		// Get the discretizers
+		for (Tuple2<Integer, StreamOperator<?, ?>> entry : operators) {
+			if (entry.f1 instanceof StreamDiscretizer) {
+				discretizers.add(new Tuple2<Integer, StreamDiscretizer<?>>(entry.f0,
+						(StreamDiscretizer<?>) entry.f1));
+			}
+		}
+
+		List<Tuple2<StreamDiscretizer<?>, List<Integer>>> matchingDiscretizers = new ArrayList<Tuple2<StreamDiscretizer<?>, List<Integer>>>();
+
+		for (Tuple2<Integer, StreamDiscretizer<?>> discretizer : discretizers) {
+			boolean inMatching = false;
+			for (Tuple2<StreamDiscretizer<?>, List<Integer>> matching : matchingDiscretizers) {
+				Set<Integer> discretizerInEdges = new HashSet<Integer>(streamGraph.getVertex(
+						discretizer.f0).getInEdgeIndices());
+				Set<Integer> matchingInEdges = new HashSet<Integer>(streamGraph.getVertex(
+						matching.f1.get(0)).getInEdgeIndices());
+
+				if (discretizer.f1.equals(matching.f0)
+						&& discretizerInEdges.equals(matchingInEdges)) {
+					matching.f1.add(discretizer.f0);
+					inMatching = true;
+					break;
+				}
+			}
+			if (!inMatching) {
+				List<Integer> matchingNames = new ArrayList<Integer>();
+				matchingNames.add(discretizer.f0);
+				matchingDiscretizers.add(new Tuple2<StreamDiscretizer<?>, List<Integer>>(
+						discretizer.f1, matchingNames));
+			}
+		}
+
+		for (Tuple2<StreamDiscretizer<?>, List<Integer>> matching : matchingDiscretizers) {
+			List<Integer> matchList = matching.f1;
+			if (matchList.size() > 1) {
+				Integer first = matchList.get(0);
+				for (int i = 1; i < matchList.size(); i++) {
+					replaceDiscretizer(streamGraph, matchList.get(i), first);
+				}
+			}
+		}
+	}
+
+	private static void replaceDiscretizer(StreamGraph streamGraph, Integer toReplaceID,
+			Integer replaceWithID) {
+		// Convert to array to create a copy
+		List<StreamEdge> outEdges = new ArrayList<StreamEdge>(streamGraph.getVertex(toReplaceID)
+				.getOutEdges());
+
+		int numOutputs = outEdges.size();
+
+		// Reconnect outputs
+		for (int i = 0; i < numOutputs; i++) {
+			StreamEdge outEdge = outEdges.get(i);
+
+			streamGraph.addEdge(replaceWithID, outEdge.getTargetID(), outEdge.getPartitioner(), 0,
+					new ArrayList<String>());
+		}
+
+		// Remove the other discretizer
+		streamGraph.removeVertex(streamGraph.getVertex(toReplaceID));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java
deleted file mode 100644
index 4e09a98..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.util.Collector;
-
-public abstract class ChainableInvokable<IN, OUT> extends StreamInvokable<IN, OUT> implements
-		Collector<IN> {
-
-	private static final long serialVersionUID = 1L;
-	private boolean copyInput = true;
-
-	public ChainableInvokable(Function userFunction) {
-		super(userFunction);
-		setChainingStrategy(ChainingStrategy.ALWAYS);
-	}
-
-	public void setup(Collector<OUT> collector, StreamRecordSerializer<IN> inSerializer) {
-		this.collector = collector;
-		this.inSerializer = inSerializer;
-		this.objectSerializer = inSerializer.getObjectSerializer();
-	}
-
-	public ChainableInvokable<IN, OUT> withoutInputCopy() {
-		copyInput = false;
-		return this;
-	}
-
-	protected IN copyInput(IN input) {
-		return copyInput ? copy(input) : input;
-	}
-
-	@Override
-	public void collect(IN record) {
-		if (isRunning) {
-			nextObject = copyInput(record);
-			callUserFunctionAndLogException();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
deleted file mode 100644
index 29d2ed2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable;
-
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
-
-public class SinkInvokable<IN> extends ChainableInvokable<IN, IN> {
-	private static final long serialVersionUID = 1L;
-
-	private SinkFunction<IN> sinkFunction;
-
-	public SinkInvokable(SinkFunction<IN> sinkFunction) {
-		super(sinkFunction);
-		this.sinkFunction = sinkFunction;
-	}
-
-	@Override
-	public void invoke() throws Exception {
-		while (isRunning && readNext() != null) {
-			callUserFunctionAndLogException();
-		}
-	}
-
-	@Override
-	protected void callUserFunction() throws Exception {
-		sinkFunction.invoke(nextObject);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
deleted file mode 100644
index c3f25a0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable;
-
-import java.io.Serializable;
-
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-
-public class SourceInvokable<OUT> extends StreamInvokable<OUT, OUT> implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-	private SourceFunction<OUT> sourceFunction;
-
-	public SourceInvokable(SourceFunction<OUT> sourceFunction) {
-		super(sourceFunction);
-		this.sourceFunction = sourceFunction;
-	}
-
-	@Override
-	public void invoke() {
-		callUserFunctionAndLogException();
-	}
-
-	@Override
-	protected void callUserFunction() throws Exception {
-		sourceFunction.run(collector);
-	}
-
-	@Override
-	public void cancel() {
-		super.cancel();
-		sourceFunction.cancel();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
deleted file mode 100644
index 6281de3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.api.streamvertex.StreamTaskContext;
-import org.apache.flink.streaming.io.IndexedReaderIterator;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The StreamInvokable represents the base class for all invokables in the
- * streaming topology.
- * 
- * @param <OUT>
- *            The output type of the invokable
- */
-public abstract class StreamInvokable<IN, OUT> implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-	private static final Logger LOG = LoggerFactory.getLogger(StreamInvokable.class);
-
-	protected StreamTaskContext<OUT> taskContext;
-
-	protected ExecutionConfig executionConfig = null;
-
-	protected IndexedReaderIterator<StreamRecord<IN>> recordIterator;
-	protected StreamRecordSerializer<IN> inSerializer;
-	protected TypeSerializer<IN> objectSerializer;
-	protected StreamRecord<IN> nextRecord;
-	protected IN nextObject;
-	protected boolean isMutable;
-
-	public Collector<OUT> collector;
-	protected Function userFunction;
-	protected volatile boolean isRunning;
-
-	private ChainingStrategy chainingStrategy = ChainingStrategy.HEAD;
-
-	public StreamInvokable(Function userFunction) {
-		this.userFunction = userFunction;
-	}
-
-	/**
-	 * Initializes the {@link StreamInvokable} for input and output handling
-	 * 
-	 * @param taskContext
-	 *            StreamTaskContext representing the vertex
-	 */
-	public void setup(StreamTaskContext<OUT> taskContext) {
-		this.collector = taskContext.getOutputCollector();
-		this.recordIterator = taskContext.getIndexedInput(0);
-		this.inSerializer = taskContext.getInputSerializer(0);
-		if (this.inSerializer != null) {
-			this.nextRecord = inSerializer.createInstance();
-			this.objectSerializer = inSerializer.getObjectSerializer();
-		}
-		this.taskContext = taskContext;
-		this.executionConfig = taskContext.getExecutionConfig();
-	}
-
-	/**
-	 * Method that will be called when the operator starts, should encode the
-	 * processing logic
-	 */
-	public abstract void invoke() throws Exception;
-
-	/*
-	 * Reads the next record from the reader iterator and stores it in the
-	 * nextRecord variable
-	 */
-	protected StreamRecord<IN> readNext() throws IOException {
-		this.nextRecord = inSerializer.createInstance();
-		try {
-			nextRecord = recordIterator.next(nextRecord);
-			try {
-				nextObject = nextRecord.getObject();
-			} catch (NullPointerException e) {
-				// end of stream
-			}
-			return nextRecord;
-		} catch (IOException e) {
-			if (isRunning) {
-				throw new RuntimeException("Could not read next record due to: "
-						+ StringUtils.stringifyException(e));
-			} else {
-				// Task already cancelled do nothing
-				return null;
-			}
-		}  catch (IllegalStateException e) {
-			if (isRunning) {
-				throw new RuntimeException("Could not read next record due to: "
-						+ StringUtils.stringifyException(e));
-			} else {
-				// Task already cancelled do nothing
-				return null;
-			}
-		}
-	}
-
-	/**
-	 * The call of the user implemented function should be implemented here
-	 */
-	protected void callUserFunction() throws Exception {
-	}
-
-	/**
-	 * Method for logging exceptions thrown during the user function call
-	 */
-	protected void callUserFunctionAndLogException() {
-		try {
-			callUserFunction();
-		} catch (Exception e) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Calling user function failed due to: {}",
-						StringUtils.stringifyException(e));
-			}
-			throw new RuntimeException(e);
-		}
-	}
-
-	/**
-	 * Open method to be used if the user defined function extends the
-	 * RichFunction class
-	 * 
-	 * @param parameters
-	 *            The configuration parameters for the operator
-	 */
-	public void open(Configuration parameters) throws Exception {
-		isRunning = true;
-		FunctionUtils.openFunction(userFunction, parameters);
-	}
-
-	/**
-	 * Close method to be used if the user defined function extends the
-	 * RichFunction class
-	 * 
-	 */
-	public void close() {
-		isRunning = false;
-		collector.close();
-		try {
-			FunctionUtils.closeFunction(userFunction);
-		} catch (Exception e) {
-			throw new RuntimeException("Error when closing the function: " + e.getMessage());
-		}
-	}
-
-	public void cancel() {
-		isRunning = false;
-	}
-
-	public void setRuntimeContext(RuntimeContext t) {
-		FunctionUtils.setFunctionRuntimeContext(userFunction, t);
-	}
-
-	protected IN copy(IN record) {
-		return objectSerializer.copy(record);
-	}
-
-	public void setChainingStrategy(ChainingStrategy strategy) {
-		if (strategy == ChainingStrategy.ALWAYS) {
-			if (!(this instanceof ChainableInvokable)) {
-				throw new RuntimeException(
-						"Invokable needs to extend ChainableInvokable to be chained");
-			}
-		}
-		this.chainingStrategy = strategy;
-	}
-
-	public ChainingStrategy getChainingStrategy() {
-		return chainingStrategy;
-	}
-
-	public static enum ChainingStrategy {
-		ALWAYS, NEVER, HEAD;
-	}
-
-	public Function getUserFunction() {
-		return userFunction;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
deleted file mode 100644
index 8bb546c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import org.apache.flink.streaming.api.invokable.ChainableInvokable;
-
-public class CounterInvokable<IN> extends ChainableInvokable<IN, Long> {
-	private static final long serialVersionUID = 1L;
-
-	Long count = 0L;
-
-	public CounterInvokable() {
-		super(null);
-	}
-
-	@Override
-	public void invoke() throws Exception {
-		while (isRunning && readNext() != null) {
-			collector.collect(++count);
-		}
-	}
-
-	@Override
-	public void collect(IN record) {
-		if (isRunning) {
-			collector.collect(++count);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
deleted file mode 100644
index 00d432b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.streaming.api.invokable.ChainableInvokable;
-
-public class FilterInvokable<IN> extends ChainableInvokable<IN, IN> {
-
-	private static final long serialVersionUID = 1L;
-
-	FilterFunction<IN> filterFunction;
-	private boolean collect;
-
-	public FilterInvokable(FilterFunction<IN> filterFunction) {
-		super(filterFunction);
-		this.filterFunction = filterFunction;
-	}
-
-	@Override
-	public void invoke() throws Exception {
-		while (isRunning && readNext() != null) {
-			callUserFunctionAndLogException();
-		}
-	}
-
-	@Override
-	protected void callUserFunction() throws Exception {
-		collect = filterFunction.filter(nextObject);
-		if (collect) {
-			collector.collect(nextObject);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
deleted file mode 100644
index dfead14..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.streaming.api.invokable.ChainableInvokable;
-
-public class FlatMapInvokable<IN, OUT> extends ChainableInvokable<IN, OUT> {
-	private static final long serialVersionUID = 1L;
-
-	private FlatMapFunction<IN, OUT> flatMapper;
-
-	public FlatMapInvokable(FlatMapFunction<IN, OUT> flatMapper) {
-		super(flatMapper);
-		this.flatMapper = flatMapper;
-	}
-
-	@Override
-	public void invoke() throws Exception {
-		while (isRunning && readNext() != null) {
-			callUserFunctionAndLogException();
-		}
-	}
-
-	@Override
-	protected void callUserFunction() throws Exception {
-		flatMapper.flatMap(nextObject, collector);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokable.java
deleted file mode 100644
index 4a0f4f5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokable.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-
-public class GroupedFoldInvokable<IN, OUT> extends StreamFoldInvokable<IN, OUT> {
-	private static final long serialVersionUID = 1L;
-
-	private KeySelector<IN, ?> keySelector;
-	private Map<Object, OUT> values;
-	private OUT initialValue;
-
-	public GroupedFoldInvokable(FoldFunction<IN, OUT> folder, KeySelector<IN, ?> keySelector,
-			OUT initialValue, TypeInformation<OUT> outTypeInformation) {
-		super(folder, initialValue, outTypeInformation);
-		this.keySelector = keySelector;
-		this.initialValue = initialValue;
-		values = new HashMap<Object, OUT>();
-	}
-
-	@Override
-	protected void callUserFunction() throws Exception {
-		Object key = nextRecord.getKey(keySelector);
-		OUT accumulator = values.get(key);
-		if (accumulator != null) {
-			OUT folded = folder.fold(outTypeSerializer.copy(accumulator), nextObject);
-			values.put(key, folded);
-			collector.collect(folded);
-		} else {
-			OUT first = folder.fold(outTypeSerializer.copy(initialValue), nextObject);
-			values.put(key, first);
-			collector.collect(first);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
deleted file mode 100644
index 72f52ea..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-
-public class GroupedReduceInvokable<IN> extends StreamReduceInvokable<IN> {
-	private static final long serialVersionUID = 1L;
-
-	private KeySelector<IN, ?> keySelector;
-	private Map<Object, IN> values;
-
-	public GroupedReduceInvokable(ReduceFunction<IN> reducer, KeySelector<IN, ?> keySelector) {
-		super(reducer);
-		this.keySelector = keySelector;
-		values = new HashMap<Object, IN>();
-	}
-
-	@Override
-	protected void callUserFunction() throws Exception {
-		Object key = keySelector.getKey(nextObject);
-		IN currentValue = values.get(key);
-		if (currentValue != null) {
-			IN reduced = reducer.reduce(copy(currentValue), nextObject);
-			values.put(key, reduced);
-			collector.collect(reduced);
-		} else {
-			values.put(key, nextObject);
-			collector.collect(nextObject);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
deleted file mode 100644
index 53cb825..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.streaming.api.invokable.ChainableInvokable;
-
-public class MapInvokable<IN, OUT> extends ChainableInvokable<IN, OUT> {
-	private static final long serialVersionUID = 1L;
-
-	private MapFunction<IN, OUT> mapper;
-
-	public MapInvokable(MapFunction<IN, OUT> mapper) {
-		super(mapper);
-		this.mapper = mapper;
-	}
-
-	@Override
-	public void invoke() throws Exception {
-		while (isRunning && readNext() != null) {
-			callUserFunctionAndLogException();
-		}
-	}
-
-	@Override
-	protected void callUserFunction() throws Exception {
-		collector.collect(mapper.map(nextObject));
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
deleted file mode 100644
index bc58188..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.invokable.ChainableInvokable;
-
-public class ProjectInvokable<IN, OUT extends Tuple> extends ChainableInvokable<IN, OUT> {
-	private static final long serialVersionUID = 1L;
-
-	transient OUT outTuple;
-	TypeSerializer<OUT> outTypeSerializer;
-	TypeInformation<OUT> outTypeInformation;
-	int[] fields;
-	int numFields;
-
-	public ProjectInvokable(int[] fields, TypeInformation<OUT> outTypeInformation) {
-		super(null);
-		this.fields = fields;
-		this.numFields = this.fields.length;
-		this.outTypeInformation = outTypeInformation;
-	}
-
-	@Override
-	public void invoke() throws Exception {
-		while (isRunning && readNext() != null) {
-			callUserFunctionAndLogException();
-		}
-	}
-
-	@Override
-	protected void callUserFunction() throws Exception {
-		for (int i = 0; i < this.numFields; i++) {
-			outTuple.setField(((Tuple)nextObject).getField(fields[i]), i);
-		}
-		collector.collect(outTuple);
-	}
-
-	@Override
-	public void open(Configuration config) throws Exception {
-		super.open(config);
-		this.outTypeSerializer = outTypeInformation.createSerializer(executionConfig);
-		outTuple = outTypeSerializer.createInstance();
-	}
-	
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java
deleted file mode 100644
index 1353c01..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.streaming.api.invokable.ChainableInvokable;
-
-public class StreamFoldInvokable<IN, OUT> extends ChainableInvokable<IN, OUT> {
-	private static final long serialVersionUID = 1L;
-
-	protected FoldFunction<IN, OUT> folder;
-	private OUT accumulator;
-	protected TypeSerializer<OUT> outTypeSerializer;
-
-	public StreamFoldInvokable(FoldFunction<IN, OUT> folder, OUT initialValue,
-			TypeInformation<OUT> outTypeInformation) {
-		super(folder);
-		this.folder = folder;
-		this.accumulator = initialValue;
-		this.outTypeSerializer = outTypeInformation.createSerializer(executionConfig);
-	}
-
-	@Override
-	public void invoke() throws Exception {
-		while (isRunning && readNext() != null) {
-			callUserFunctionAndLogException();
-		}
-	}
-
-	@Override
-	protected void callUserFunction() throws Exception {
-
-		accumulator = folder.fold(outTypeSerializer.copy(accumulator), nextObject);
-		collector.collect(accumulator);
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
deleted file mode 100644
index f0f378d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.streaming.api.invokable.ChainableInvokable;
-
-public class StreamReduceInvokable<IN> extends ChainableInvokable<IN, IN> {
-	private static final long serialVersionUID = 1L;
-
-	protected ReduceFunction<IN> reducer;
-	private IN currentValue;
-
-	public StreamReduceInvokable(ReduceFunction<IN> reducer) {
-		super(reducer);
-		this.reducer = reducer;
-		currentValue = null;
-	}
-
-	@Override
-	public void invoke() throws Exception {
-		while (isRunning && readNext() != null) {
-			callUserFunctionAndLogException();
-		}
-	}
-
-	@Override
-	protected void callUserFunction() throws Exception {
-
-		if (currentValue != null) {
-			currentValue = reducer.reduce(copy(currentValue), nextObject);
-		} else {
-			currentValue = nextObject;
-
-		}
-		collector.collect(currentValue);
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapInvokable.java
deleted file mode 100644
index 4cbaebb..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapInvokable.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.co;
-
-import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
-
-public class CoFlatMapInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
-	private static final long serialVersionUID = 1L;
-
-	private CoFlatMapFunction<IN1, IN2, OUT> flatMapper;
-
-	public CoFlatMapInvokable(CoFlatMapFunction<IN1, IN2, OUT> flatMapper) {
-		super(flatMapper);
-		this.flatMapper = flatMapper;
-	}
-
-	@Override
-	public void handleStream1() throws Exception {
-		callUserFunctionAndLogException1();
-	}
-
-	@Override
-	public void handleStream2() throws Exception {
-		callUserFunctionAndLogException2();
-	}
-
-	@Override
-	protected void callUserFunction1() throws Exception {
-		flatMapper.flatMap1(reuse1.getObject(), collector);
-
-	}
-
-	@Override
-	protected void callUserFunction2() throws Exception {
-		flatMapper.flatMap2(reuse2.getObject(), collector);
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedReduceInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedReduceInvokable.java
deleted file mode 100644
index 4907ac5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedReduceInvokable.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.co;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-
-public class CoGroupedReduceInvokable<IN1, IN2, OUT> extends CoReduceInvokable<IN1, IN2, OUT> {
-	private static final long serialVersionUID = 1L;
-
-	protected KeySelector<IN1, ?> keySelector1;
-	protected KeySelector<IN2, ?> keySelector2;
-	private Map<Object, IN1> values1;
-	private Map<Object, IN2> values2;
-	IN1 reduced1;
-	IN2 reduced2;
-
-	public CoGroupedReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer,
-			KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> keySelector2) {
-		super(coReducer);
-		this.coReducer = coReducer;
-		this.keySelector1 = keySelector1;
-		this.keySelector2 = keySelector2;
-		values1 = new HashMap<Object, IN1>();
-		values2 = new HashMap<Object, IN2>();
-	}
-
-	@Override
-	public void handleStream1() throws Exception {
-		Object key = reuse1.getKey(keySelector1);
-		currentValue1 = values1.get(key);
-		nextValue1 = reuse1.getObject();
-		if (currentValue1 != null) {
-			callUserFunctionAndLogException1();
-			values1.put(key, reduced1);
-			collector.collect(coReducer.map1(reduced1));
-		} else {
-			values1.put(key, nextValue1);
-			collector.collect(coReducer.map1(nextValue1));
-		}
-	}
-
-	@Override
-	public void handleStream2() throws Exception {
-		Object key = reuse2.getKey(keySelector2);
-		currentValue2 = values2.get(key);
-		nextValue2 = reuse2.getObject();
-		if (currentValue2 != null) {
-			callUserFunctionAndLogException2();
-			values2.put(key, reduced2);
-			collector.collect(coReducer.map2(reduced2));
-		} else {
-			values2.put(key, nextValue2);
-			collector.collect(coReducer.map2(nextValue2));
-		}
-	}
-
-	@Override
-	protected void callUserFunction1() throws Exception {
-		reduced1 = coReducer.reduce1(currentValue1, nextValue1);
-
-	}
-
-	@Override
-	protected void callUserFunction2() throws Exception {
-		reduced2 = coReducer.reduce2(currentValue2, nextValue2);
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
deleted file mode 100644
index f727a32..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.co;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.api.streamvertex.StreamTaskContext;
-import org.apache.flink.streaming.io.CoReaderIterator;
-import org.apache.flink.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<IN1, OUT> {
-
-	public CoInvokable(Function userFunction) {
-		super(userFunction);
-	}
-
-	private static final long serialVersionUID = 1L;
-	private static final Logger LOG = LoggerFactory.getLogger(CoInvokable.class);
-
-	protected CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> recordIterator;
-	protected StreamRecord<IN1> reuse1;
-	protected StreamRecord<IN2> reuse2;
-	protected StreamRecordSerializer<IN1> srSerializer1;
-	protected StreamRecordSerializer<IN2> srSerializer2;
-	protected TypeSerializer<IN1> serializer1;
-	protected TypeSerializer<IN2> serializer2;
-
-	@Override
-	public void setup(StreamTaskContext<OUT> taskContext) {
-		this.collector = taskContext.getOutputCollector();
-
-		this.recordIterator = taskContext.getCoReader();
-
-		this.srSerializer1 = taskContext.getInputSerializer(0);
-		this.srSerializer2 = taskContext.getInputSerializer(1);
-
-		this.reuse1 = srSerializer1.createInstance();
-		this.reuse2 = srSerializer2.createInstance();
-
-		this.serializer1 = srSerializer1.getObjectSerializer();
-		this.serializer2 = srSerializer2.getObjectSerializer();
-	}
-
-	protected void resetReuseAll() {
-		this.reuse1 = srSerializer1.createInstance();
-		this.reuse2 = srSerializer2.createInstance();
-	}
-
-	protected void resetReuse1() {
-		this.reuse1 = srSerializer1.createInstance();
-	}
-
-	protected void resetReuse2() {
-		this.reuse2 = srSerializer2.createInstance();
-	}
-
-	@Override
-	public void invoke() throws Exception {
-		while (isRunning) {
-			int next;
-			try {
-				next = recordIterator.next(reuse1, reuse2);
-			} catch (IOException e) {
-				if (isRunning) {
-					throw new RuntimeException("Could not read next record.", e);
-				} else {
-					// Task already cancelled do nothing
-					next = 0;
-				}
-			} catch (IllegalStateException e) {
-				if (isRunning) {
-					throw new RuntimeException("Could not read next record.", e);
-				} else {
-					// Task already cancelled do nothing
-					next = 0;
-				}
-			}
-
-			if (next == 0) {
-				break;
-			} else if (next == 1) {
-				initialize1();
-				handleStream1();
-				resetReuse1();
-			} else {
-				initialize2();
-				handleStream2();
-				resetReuse2();
-			}
-		}
-	}
-
-	protected abstract void handleStream1() throws Exception;
-
-	protected abstract void handleStream2() throws Exception;
-
-	protected abstract void callUserFunction1() throws Exception;
-
-	protected abstract void callUserFunction2() throws Exception;
-
-	protected void initialize1() {
-
-	};
-
-	protected void initialize2() {
-
-	};
-
-	protected void callUserFunctionAndLogException1() {
-		try {
-			callUserFunction1();
-		} catch (Exception e) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Calling user function failed due to: {}",
-						StringUtils.stringifyException(e));
-			}
-			throw new RuntimeException(e);
-		}
-	}
-
-	protected void callUserFunctionAndLogException2() {
-		try {
-			callUserFunction2();
-		} catch (Exception e) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Calling user function failed due to: {}",
-						StringUtils.stringifyException(e));
-			}
-			throw new RuntimeException(e);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
deleted file mode 100644
index 5499dba..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.co;
-
-import org.apache.flink.streaming.api.function.co.CoMapFunction;
-
-public class CoMapInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
-	private static final long serialVersionUID = 1L;
-
-	private CoMapFunction<IN1, IN2, OUT> mapper;
-
-	public CoMapInvokable(CoMapFunction<IN1, IN2, OUT> mapper) {
-		super(mapper);
-		this.mapper = mapper;
-	}
-
-	@Override
-	public void handleStream1() throws Exception {
-		callUserFunctionAndLogException1();
-	}
-
-	@Override
-	public void handleStream2() throws Exception {
-		callUserFunctionAndLogException2();
-	}
-
-	@Override
-	protected void callUserFunction1() throws Exception {
-		collector.collect(mapper.map1(reuse1.getObject()));
-
-	}
-
-	@Override
-	protected void callUserFunction2() throws Exception {
-		collector.collect(mapper.map2(reuse2.getObject()));
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoReduceInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoReduceInvokable.java
deleted file mode 100644
index 057dfce..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoReduceInvokable.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.co;
-
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-
-public class CoReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
-	private static final long serialVersionUID = 1L;
-
-	protected CoReduceFunction<IN1, IN2, OUT> coReducer;
-	protected IN1 currentValue1 = null;
-	protected IN2 currentValue2 = null;
-	protected IN1 nextValue1 = null;
-	protected IN2 nextValue2 = null;
-
-	public CoReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer) {
-		super(coReducer);
-		this.coReducer = coReducer;
-		currentValue1 = null;
-		currentValue2 = null;
-	}
-
-	@Override
-	public void handleStream1() throws Exception {
-		nextValue1 = reuse1.getObject();
-		callUserFunctionAndLogException1();
-	}
-
-	@Override
-	public void handleStream2() throws Exception {
-		nextValue2 = reuse2.getObject();
-		callUserFunctionAndLogException2();
-	}
-
-	@Override
-	protected void callUserFunction1() throws Exception {
-		if (currentValue1 != null) {
-			currentValue1 = coReducer.reduce1(currentValue1, nextValue1);
-		} else {
-			currentValue1 = nextValue1;
-		}
-		collector.collect(coReducer.map1(currentValue1));
-	}
-
-	@Override
-	protected void callUserFunction2() throws Exception {
-		if (currentValue2 != null) {
-			currentValue2 = coReducer.reduce2(currentValue2, nextValue2);
-		} else {
-			currentValue2 = nextValue2;
-		}
-		collector.collect(coReducer.map2(currentValue2));
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java
deleted file mode 100644
index 93f597f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.co;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.math.util.MathUtils;
-import org.apache.flink.streaming.api.function.co.CoWindowFunction;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.apache.flink.streaming.state.CircularFifoList;
-
-public class CoWindowInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
-	private static final long serialVersionUID = 1L;
-
-	protected CoWindowFunction<IN1, IN2, OUT> coWindowFunction;
-	protected long windowSize;
-	protected long slideSize;
-	protected CircularFifoList<StreamRecord<IN1>> circularList1;
-	protected CircularFifoList<StreamRecord<IN2>> circularList2;
-	protected TimestampWrapper<IN1> timeStamp1;
-	protected TimestampWrapper<IN2> timeStamp2;
-
-	protected StreamWindow window;
-
-	protected long startTime;
-	protected long nextRecordTime;
-
-	public CoWindowInvokable(CoWindowFunction<IN1, IN2, OUT> coWindowFunction, long windowSize,
-			long slideInterval, TimestampWrapper<IN1> timeStamp1, TimestampWrapper<IN2> timeStamp2) {
-		super(coWindowFunction);
-		this.coWindowFunction = coWindowFunction;
-		this.windowSize = windowSize;
-		this.slideSize = slideInterval;
-		this.circularList1 = new CircularFifoList<StreamRecord<IN1>>();
-		this.circularList2 = new CircularFifoList<StreamRecord<IN2>>();
-		this.timeStamp1 = timeStamp1;
-		this.timeStamp2 = timeStamp2;
-		this.startTime = timeStamp1.getStartTime();
-
-		this.window = new StreamWindow();
-	}
-
-	@Override
-	protected void handleStream1() throws Exception {
-		window.addToBuffer1(reuse1.getObject());
-	}
-
-	@Override
-	protected void handleStream2() throws Exception {
-		window.addToBuffer2(reuse2.getObject());
-	}
-
-	@Override
-	protected void callUserFunction() throws Exception {
-
-		List<IN1> first = new ArrayList<IN1>();
-		List<IN2> second = new ArrayList<IN2>();
-
-		for (IN1 element : window.circularList1.getElements()) {
-			first.add(serializer1.copy(element));
-		}
-		for (IN2 element : window.circularList2.getElements()) {
-			second.add(serializer2.copy(element));
-		}
-
-		if (!window.circularList1.isEmpty() || !window.circularList2.isEmpty()) {
-			coWindowFunction.coWindow(first, second, collector);
-		}
-	}
-
-	protected class StreamWindow implements Serializable {
-		private static final long serialVersionUID = 1L;
-
-		protected int granularity;
-		protected int batchPerSlide;
-		protected long numberOfBatches;
-
-		protected long minibatchCounter;
-
-		protected CircularFifoList<IN1> circularList1;
-		protected CircularFifoList<IN2> circularList2;
-
-		public StreamWindow() {
-			this.granularity = (int) MathUtils.gcd(windowSize, slideSize);
-			this.batchPerSlide = (int) (slideSize / granularity);
-			this.numberOfBatches = windowSize / granularity;
-			this.circularList1 = new CircularFifoList<IN1>();
-			this.circularList2 = new CircularFifoList<IN2>();
-			this.minibatchCounter = 0;
-		}
-
-		public void addToBuffer1(IN1 nextValue) throws Exception {
-			checkWindowEnd(timeStamp1.getTimestamp(nextValue));
-			if (minibatchCounter >= 0) {
-				circularList1.add(nextValue);
-			}
-		}
-
-		public void addToBuffer2(IN2 nextValue) throws Exception {
-			checkWindowEnd(timeStamp2.getTimestamp(nextValue));
-			if (minibatchCounter >= 0) {
-				circularList2.add(nextValue);
-			}
-		}
-
-		protected synchronized void checkWindowEnd(long timeStamp) {
-			nextRecordTime = timeStamp;
-
-			while (miniBatchEnd()) {
-				circularList1.newSlide();
-				circularList2.newSlide();
-				minibatchCounter++;
-				if (windowEnd()) {
-					callUserFunctionAndLogException();
-					circularList1.shiftWindow(batchPerSlide);
-					circularList2.shiftWindow(batchPerSlide);
-				}
-			}
-		}
-
-		protected boolean miniBatchEnd() {
-			if (nextRecordTime < startTime + granularity) {
-				return false;
-			} else {
-				startTime += granularity;
-				return true;
-			}
-		}
-
-		public boolean windowEnd() {
-			if (minibatchCounter == numberOfBatches) {
-				minibatchCounter -= batchPerSlide;
-				return true;
-			}
-			return false;
-		}
-
-		public void reduceLastBatch() {
-			if (!miniBatchEnd()) {
-				callUserFunctionAndLogException();
-			}
-		}
-
-		public Iterable<IN1> getIterable1() {
-			return circularList1.getIterable();
-		}
-
-		public Iterable<IN2> getIterable2() {
-			return circularList2.getIterable();
-		}
-
-		@Override
-		public String toString() {
-			return circularList1.toString();
-		}
-
-	}
-
-	@Override
-	public void close() {
-		if (!window.miniBatchEnd()) {
-			callUserFunctionAndLogException();
-		}
-		super.close();
-	}
-
-	@Override
-	protected void callUserFunction1() throws Exception {
-	}
-
-	@Override
-	protected void callUserFunction2() throws Exception {
-	}
-
-	public void setSlideSize(long slideSize) {
-		this.slideSize = slideSize;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/EmptyWindowFilter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/EmptyWindowFilter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/EmptyWindowFilter.java
deleted file mode 100644
index 0f2ee31..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/EmptyWindowFilter.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-
-public class EmptyWindowFilter<OUT> implements FilterFunction<StreamWindow<OUT>> {
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public boolean filter(StreamWindow<OUT> value) throws Exception {
-		return !value.isEmpty();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedActiveDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedActiveDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedActiveDiscretizer.java
deleted file mode 100644
index 35f466f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedActiveDiscretizer.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.windowing.policy.CentralActiveTrigger;
-import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
-
-public class GroupedActiveDiscretizer<IN> extends GroupedStreamDiscretizer<IN> {
-
-	private static final long serialVersionUID = -3469545957144404137L;
-
-	private volatile IN last;
-	private Thread centralThread;
-
-	public GroupedActiveDiscretizer(KeySelector<IN, ?> keySelector,
-			CentralActiveTrigger<IN> triggerPolicy, CloneableEvictionPolicy<IN> evictionPolicy) {
-		super(keySelector, triggerPolicy, evictionPolicy);
-	}
-
-	@Override
-	protected StreamDiscretizer<IN> makeNewGroup(Object key) throws Exception {
-
-		StreamDiscretizer<IN> groupDiscretizer = new StreamDiscretizer<IN>(triggerPolicy.clone(),
-				evictionPolicy.clone());
-
-		groupDiscretizer.collector = taskContext.getOutputCollector();
-		// We omit the groupDiscretizer.open(...) call here to avoid starting
-		// new active threads
-		return groupDiscretizer;
-	}
-
-	@Override
-	public void invoke() throws Exception {
-
-		while (isRunning && readNext() != null) {
-			last = copy(nextObject);
-			Object key = keySelector.getKey(nextObject);
-
-			synchronized (groupedDiscretizers) {
-				StreamDiscretizer<IN> groupDiscretizer = groupedDiscretizers.get(key);
-
-				if (groupDiscretizer == null) {
-					groupDiscretizer = makeNewGroup(key);
-					groupedDiscretizers.put(key, groupDiscretizer);
-				}
-
-				groupDiscretizer.processRealElement(nextObject);
-			}
-
-		}
-
-		for (StreamDiscretizer<IN> group : groupedDiscretizers.values()) {
-			group.emitWindow();
-		}
-
-	}
-
-	@Override
-	public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
-		super.open(parameters);
-		centralThread = new Thread(new CentralCheck());
-		centralThread.start();
-	}
-
-	private class CentralCheck implements Runnable {
-
-		@Override
-		public void run() {
-			while (isRunning) {
-				// wait for the specified granularity
-				try {
-					Thread.sleep(2000);
-				} catch (InterruptedException e) {
-					// ignore it...
-				}
-
-				try {
-					if (last != null) {
-						synchronized (groupedDiscretizers) {
-							for (StreamDiscretizer<IN> group : groupedDiscretizers.values()) {
-
-								CentralActiveTrigger<IN> groupTrigger = (CentralActiveTrigger<IN>) group.triggerPolicy;
-								Object[] fakes = groupTrigger.notifyOnLastGlobalElement(last);
-								if (fakes != null) {
-									for (Object fake : fakes) {
-										group.triggerOnFakeElement(fake);
-									}
-								}
-							}
-						}
-
-					}
-				} catch (Exception e) {
-					throw new RuntimeException(e);
-				}
-
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
deleted file mode 100644
index f14a6ae..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
-
-/**
- * This invokable represents the grouped discretization step of a window
- * transformation. The user supplied eviction and trigger policies are applied
- * on a per group basis to create the {@link StreamWindow} that will be further
- * transformed in the next stages. </p> To allow pre-aggregations supply an
- * appropriate {@link WindowBuffer}
- */
-public class GroupedStreamDiscretizer<IN> extends StreamDiscretizer<IN> {
-
-	private static final long serialVersionUID = -3469545957144404137L;
-
-	protected KeySelector<IN, ?> keySelector;
-	protected Configuration parameters;
-	protected CloneableTriggerPolicy<IN> triggerPolicy;
-	protected CloneableEvictionPolicy<IN> evictionPolicy;
-
-	protected Map<Object, StreamDiscretizer<IN>> groupedDiscretizers;
-
-	public GroupedStreamDiscretizer(KeySelector<IN, ?> keySelector,
-			CloneableTriggerPolicy<IN> triggerPolicy, CloneableEvictionPolicy<IN> evictionPolicy) {
-
-		super(triggerPolicy, evictionPolicy);
-
-		this.keySelector = keySelector;
-
-		this.triggerPolicy = triggerPolicy;
-		this.evictionPolicy = evictionPolicy;
-
-		this.groupedDiscretizers = new HashMap<Object, StreamDiscretizer<IN>>();
-	}
-
-	@Override
-	public void invoke() throws Exception {
-
-		while (isRunning && readNext() != null) {
-
-			Object key = keySelector.getKey(nextObject);
-
-			StreamDiscretizer<IN> groupDiscretizer = groupedDiscretizers.get(key);
-
-			if (groupDiscretizer == null) {
-				groupDiscretizer = makeNewGroup(key);
-				groupedDiscretizers.put(key, groupDiscretizer);
-			}
-
-			groupDiscretizer.processRealElement(nextObject);
-		}
-
-		for (StreamDiscretizer<IN> group : groupedDiscretizers.values()) {
-			group.emitWindow();
-		}
-
-	}
-
-	/**
-	 * This method creates a new group. The method gets called in case an
-	 * element arrives which has a key which was not seen before. The method
-	 * created a nested {@link StreamDiscretizer} and therefore created clones
-	 * of all distributed trigger and eviction policies.
-	 * 
-	 * @param key
-	 *            The key of the new group.
-	 */
-	protected StreamDiscretizer<IN> makeNewGroup(Object key) throws Exception {
-
-		StreamDiscretizer<IN> groupDiscretizer = new StreamDiscretizer<IN>(triggerPolicy.clone(),
-				evictionPolicy.clone());
-
-		groupDiscretizer.collector = taskContext.getOutputCollector();
-		groupDiscretizer.open(this.parameters);
-
-		return groupDiscretizer;
-	}
-
-	@Override
-	public boolean equals(Object other) {
-		if (other == null || !(other instanceof GroupedStreamDiscretizer)) {
-			return false;
-		} else {
-			try {
-				@SuppressWarnings("unchecked")
-				GroupedStreamDiscretizer<IN> otherDiscretizer = (GroupedStreamDiscretizer<IN>) other;
-
-				return triggerPolicy.equals(otherDiscretizer.triggerPolicy)
-						&& evictionPolicy.equals(otherDiscretizer.evictionPolicy)
-						&& keySelector.equals(otherDiscretizer.keySelector);
-
-			} catch (ClassCastException e) {
-				return false;
-			}
-		}
-	}
-
-	@Override
-	public String toString() {
-		return "GroupedDiscretizer(Key: " + keySelector.getClass().getSimpleName() + ", Trigger: "
-				+ triggerPolicy.toString() + ", Eviction: " + evictionPolicy.toString() + ")";
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedWindowBufferInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedWindowBufferInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedWindowBufferInvokable.java
deleted file mode 100644
index 2c3bd75..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedWindowBufferInvokable.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.WindowEvent;
-import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
-
-/**
- * This invokable flattens the results of the window transformations by
- * outputing the elements of the {@link StreamWindow} one-by-one
- */
-public class GroupedWindowBufferInvokable<T> extends WindowBufferInvokable<T> {
-
-	private static final long serialVersionUID = 1L;
-	private Map<Object, WindowBuffer<T>> windowMap = new HashMap<Object, WindowBuffer<T>>();
-	private KeySelector<T, ?> keySelector;
-
-	public GroupedWindowBufferInvokable(WindowBuffer<T> buffer, KeySelector<T, ?> keySelector) {
-		super(buffer);
-		this.keySelector = keySelector;
-	}
-
-	@Override
-	public void invoke() throws Exception {
-		while (isRunning && readNext() != null) {
-			callUserFunctionAndLogException();
-		}
-	}
-
-	@Override
-	protected void callUserFunction() throws Exception {
-		if (nextObject.getElement() != null) {
-			Object key = keySelector.getKey(nextObject.getElement());
-			WindowBuffer<T> currentWindow = windowMap.get(key);
-
-			if (currentWindow == null) {
-				currentWindow = buffer.clone();
-				windowMap.put(key, currentWindow);
-			}
-
-			handleWindowEvent(nextObject, currentWindow);
-		}
-	}
-
-	@Override
-	public void collect(WindowEvent<T> record) {
-		if (isRunning) {
-			nextObject = record;
-			callUserFunctionAndLogException();
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelGroupedMerge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelGroupedMerge.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelGroupedMerge.java
deleted file mode 100644
index 737485f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelGroupedMerge.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-
-/**
- * The version of the ParallelMerge CoFlatMap that does not reduce the incoming
- * elements only appends them to the current window. This is necessary for
- * grouped reduces.
- */
-public class ParallelGroupedMerge<OUT> extends ParallelMerge<OUT> {
-
-	private static final long serialVersionUID = 1L;
-
-	public ParallelGroupedMerge() {
-		super(null);
-	}
-
-	@Override
-	protected void updateCurrent(StreamWindow<OUT> current, StreamWindow<OUT> nextWindow)
-			throws Exception {
-		current.addAll(nextWindow);
-	}
-
-}