You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/12/18 23:22:48 UTC

[1/5] incubator-flink git commit: [streaming] Make windowed data stream aware of time based trigger/eviction in tumbling window situations.

Repository: incubator-flink
Updated Branches:
  refs/heads/master 88e64fc78 -> 227e40fe1


[streaming] Make windowed data stream aware of time based trigger/eviction in tumbling window situations.

[streaming] Changed TimeEvictionPolicy to keep timestamps in the buffer instead of data-items


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/227e40fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/227e40fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/227e40fe

Branch: refs/heads/master
Commit: 227e40fe11d5794a41433fb48efa887ab8bb91d2
Parents: 6884a0f
Author: Jonas Traub (powibol) <jo...@s-traub.com>
Authored: Thu Dec 18 16:11:16 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Thu Dec 18 20:07:27 2014 +0100

----------------------------------------------------------------------
 .../streaming/api/datastream/WindowedDataStream.java   | 13 ++++++++++++-
 .../api/windowing/policy/TimeEvictionPolicy.java       | 11 +++++++----
 2 files changed, 19 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/227e40fe/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 09b2678..788f28d 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -36,6 +36,7 @@ import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.invokable.operator.GroupedWindowInvokable;
 import org.apache.flink.streaming.api.invokable.operator.WindowGroupReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.WindowReduceInvokable;
+import org.apache.flink.streaming.api.windowing.helper.Time;
 import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
 import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
@@ -490,7 +491,17 @@ public class WindowedDataStream<OUT> {
 			}
 		} else {
 			if (userEvicters == null) {
-				evicters.add(new TumblingEvictionPolicy<OUT>());
+				boolean notOnlyTime=false;
+				for (WindowingHelper<OUT> helper : triggerHelpers){
+					if (helper instanceof Time<?>){
+						evicters.add(helper.toEvict());
+					} else {
+						notOnlyTime=true;
+					}
+				}
+				if (notOnlyTime){
+					evicters.add(new TumblingEvictionPolicy<OUT>());
+				}
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/227e40fe/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
index 99116d0..aca1dee 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
@@ -41,7 +41,7 @@ public class TimeEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA>,
 
 	private long granularity;
 	private TimeStamp<DATA> timestamp;
-	private LinkedList<DATA> buffer = new LinkedList<DATA>();
+	private LinkedList<Long> buffer = new LinkedList<Long>();
 
 	/**
 	 * This eviction policy evicts all elements which are older than a specified
@@ -91,12 +91,15 @@ public class TimeEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA>,
 
 		checkForDeleted(bufferSize);
 
+		//remember timestamp
+		long time=timestamp.getTimestamp(datapoint);
+		
 		// delete and count expired tuples
-		long threshold = timestamp.getTimestamp(datapoint) - granularity;
+		long threshold = time - granularity;
 		int counter = deleteAndCountExpired(threshold);
 
 		// Add current element to buffer
-		buffer.add(datapoint);
+		buffer.add(time);
 
 		// return result
 		return counter;
@@ -114,7 +117,7 @@ public class TimeEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA>,
 		int counter = 0;
 		while (!buffer.isEmpty()) {
 
-			if (timestamp.getTimestamp(buffer.getFirst()) <= threshold) {
+			if (buffer.getFirst() <= threshold) {
 				buffer.removeFirst();
 				counter++;
 			} else {


[3/5] incubator-flink git commit: [streaming] Added immutability for window and filter operators

Posted by mb...@apache.org.
[streaming] Added immutability for window and filter operators


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/6884a0ff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/6884a0ff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/6884a0ff

Branch: refs/heads/master
Commit: 6884a0ff59ce018f32169969b40116cbde3e27bc
Parents: ff05df9
Author: Gyula Fora <gy...@apache.org>
Authored: Thu Dec 18 14:52:15 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Thu Dec 18 20:07:27 2014 +0100

----------------------------------------------------------------------
 .../flink/streaming/api/invokable/StreamInvokable.java    |  7 +++++++
 .../streaming/api/invokable/operator/FilterInvokable.java |  2 +-
 .../invokable/operator/WindowGroupReduceInvokable.java    | 10 +++++++++-
 .../api/invokable/operator/WindowReduceInvokable.java     |  2 +-
 4 files changed, 18 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6884a0ff/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
index d19d7ad..87ad4e0 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
@@ -23,6 +23,7 @@ import java.io.Serializable;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
@@ -49,6 +50,7 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable {
 
 	protected MutableObjectIterator<StreamRecord<IN>> recordIterator;
 	protected StreamRecordSerializer<IN> inSerializer;
+	protected TypeSerializer<IN> objectSerializer;
 	protected StreamRecord<IN> nextRecord;
 	protected boolean isMutable;
 
@@ -72,6 +74,7 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable {
 		this.inSerializer = taskContext.getInputSerializer(0);
 		if (this.inSerializer != null) {
 			this.nextRecord = inSerializer.createInstance();
+			this.objectSerializer = inSerializer.getObjectSerializer();
 		}
 		this.taskContext = taskContext;
 	}
@@ -141,4 +144,8 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable {
 	public void setRuntimeContext(RuntimeContext t) {
 		FunctionUtils.setFunctionRuntimeContext(userFunction, t);
 	}
+
+	protected IN copy(IN record) {
+		return objectSerializer.copy(record);
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6884a0ff/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
index 796196d..48b8ad0 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
@@ -44,6 +44,6 @@ public class FilterInvokable<IN> extends StreamInvokable<IN, IN> {
 
 	@Override
 	protected void callUserFunction() throws Exception {
-		collect = filterFunction.filter(nextRecord.getObject());
+		collect = filterFunction.filter(copy(nextRecord.getObject()));
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6884a0ff/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
index 9d0b584..b3fdfe8 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
@@ -37,7 +37,15 @@ public class WindowGroupReduceInvokable<IN, OUT> extends WindowInvokable<IN, OUT
 
 	@Override
 	protected void callUserFunction() throws Exception {
-		reducer.reduce(buffer, collector);
+		reducer.reduce(copyBuffer(), collector);
+	}
+
+	public LinkedList<IN> copyBuffer() {
+		LinkedList<IN> copy = new LinkedList<IN>();
+		for (IN element : buffer) {
+			copy.add(copy(element));
+		}
+		return copy;
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6884a0ff/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
index b6456e1..ed246c8 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
@@ -49,7 +49,7 @@ public class WindowReduceInvokable<IN> extends WindowInvokable<IN, IN> {
 		while (reducedIterator.hasNext()) {
 			IN next = reducedIterator.next();
 			if (next != null) {
-				reduced = reducer.reduce(reduced, next);
+				reduced = reducer.reduce(copy(reduced), copy(next));
 			}
 		}
 		if (reduced != null) {


[5/5] incubator-flink git commit: [streaming] StreamInvokable rework for simpler logic and easier use

Posted by mb...@apache.org.
[streaming] StreamInvokable rework for simpler logic and easier use


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/c5e9a512
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/c5e9a512
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/c5e9a512

Branch: refs/heads/master
Commit: c5e9a512242e050b71635cacaaca7890fadc6b67
Parents: 88e64fc
Author: Gyula Fora <gy...@apache.org>
Authored: Wed Dec 17 23:34:26 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Thu Dec 18 20:07:27 2014 +0100

----------------------------------------------------------------------
 docs/streaming_guide.md                         |  10 -
 .../flink/streaming/api/JobGraphBuilder.java    |   8 -
 .../flink/streaming/api/StreamConfig.java       |  13 +-
 .../streaming/api/datastream/DataStream.java    |   9 +-
 .../datastream/SingleOutputStreamOperator.java  |  18 --
 .../streaming/api/invokable/SinkInvokable.java  |  14 +-
 .../api/invokable/SourceInvokable.java          |  22 +-
 .../api/invokable/StreamInvokable.java          |  87 +++-----
 .../invokable/operator/CounterInvokable.java    |  23 +-
 .../api/invokable/operator/FilterInvokable.java |  24 +-
 .../invokable/operator/FlatMapInvokable.java    |  14 +-
 .../operator/GroupedReduceInvokable.java        |   4 +-
 .../operator/GroupedWindowInvokable.java        |  76 +++----
 .../api/invokable/operator/MapInvokable.java    |  14 +-
 .../invokable/operator/ProjectInvokable.java    |  11 +-
 .../operator/StreamReduceInvokable.java         |  14 +-
 .../api/invokable/operator/WindowInvokable.java |  29 +--
 .../operator/co/CoBatchReduceInvokable.java     |   9 +-
 .../api/invokable/operator/co/CoInvokable.java  |  45 +---
 .../operator/co/CoWindowInvokable.java          |   5 -
 .../api/streamvertex/CoStreamVertex.java        |  37 +++-
 .../api/streamvertex/StreamTaskContext.java     |  40 ++++
 .../api/streamvertex/StreamVertex.java          |  48 +++-
 .../streaming/api/AggregationFunctionTest.java  |  36 +--
 .../invokable/operator/CoBatchReduceTest.java   |   6 +-
 .../api/invokable/operator/CoFlatMapTest.java   |   4 +-
 .../operator/CoGroupedBatchReduceTest.java      |   6 +-
 .../invokable/operator/CoGroupedReduceTest.java |   6 +-
 .../operator/CoGroupedWindowReduceTest.java     |   6 +-
 .../api/invokable/operator/CoMapTest.java       |   4 +-
 .../invokable/operator/CoStreamReduceTest.java  |   4 +-
 .../invokable/operator/CoWindowReduceTest.java  |   6 +-
 .../api/invokable/operator/CoWindowTest.java    |   6 +-
 .../operator/CounterInvokableTest.java          |   4 +-
 .../api/invokable/operator/FilterTest.java      |   4 +-
 .../api/invokable/operator/FlatMapTest.java     |   4 +-
 .../operator/GroupedReduceInvokableTest.java    |   4 +-
 .../operator/GroupedWindowInvokableTest.java    |  16 +-
 .../api/invokable/operator/MapTest.java         |   4 +-
 .../api/invokable/operator/ProjectTest.java     |   4 +-
 .../invokable/operator/StreamReduceTest.java    |   4 +-
 .../invokable/operator/WindowInvokableTest.java |  10 +-
 .../flink/streaming/util/MockCoContext.java     | 217 +++++++++++++++++++
 .../flink/streaming/util/MockCoInvokable.java   | 169 ---------------
 .../flink/streaming/util/MockContext.java       | 148 +++++++++++++
 .../flink/streaming/util/MockInvokable.java     | 105 ---------
 46 files changed, 664 insertions(+), 687 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/docs/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md
index 6e7f932..c51afbc 100644
--- a/docs/streaming_guide.md
+++ b/docs/streaming_guide.md
@@ -609,16 +609,6 @@ env.genereateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
 To maximise the throughput the user can call `.setBufferTimeout(-1)` which will remove the timeout and buffers will only be flushed when they are full.
 To minimise latency, set the timeout to a value close to 0 (fro example 5 or 10 ms). Theoretically a buffer timeout of 0 will cause all outputs to be flushed when produced, but this setting should be avoided because it can cause severe performance degradation.
 
-### Mutability
-
-This is currently a beta feature and it is only supported for a subset of the available operators.
-
-Most operators allow setting mutability for reading input data. If the operator is set mutable then the variable used to store input data for operators will be reused in a mutable fashion to avoid excessive object creation. By default, all operators are set to immutable.
-Usage:
-
-~~~java
-operator.setMutability(isMutable)
-~~~
 
 [Back to top](#top)
     

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index d63042a..d66e388 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -64,7 +64,6 @@ public class JobGraphBuilder {
 	private Map<String, List<Integer>> outEdgeType;
 	private Map<String, List<List<String>>> outEdgeNames;
 	private Map<String, List<Boolean>> outEdgeSelectAll;
-	private Map<String, Boolean> mutability;
 	private Map<String, List<String>> inEdgeList;
 	private Map<String, List<StreamPartitioner<?>>> connectionTypes;
 	private Map<String, String> operatorNames;
@@ -97,7 +96,6 @@ public class JobGraphBuilder {
 		outEdgeType = new HashMap<String, List<Integer>>();
 		outEdgeNames = new HashMap<String, List<List<String>>>();
 		outEdgeSelectAll = new HashMap<String, List<Boolean>>();
-		mutability = new HashMap<String, Boolean>();
 		inEdgeList = new HashMap<String, List<String>>();
 		connectionTypes = new HashMap<String, List<StreamPartitioner<?>>>();
 		operatorNames = new HashMap<String, String>();
@@ -302,7 +300,6 @@ public class JobGraphBuilder {
 
 		vertexClasses.put(vertexName, vertexClass);
 		setParallelism(vertexName, parallelism);
-		mutability.put(vertexName, false);
 		invokableObjects.put(vertexName, invokableObject);
 		operatorNames.put(vertexName, operatorName);
 		serializedFunctions.put(vertexName, serializedFunction);
@@ -355,7 +352,6 @@ public class JobGraphBuilder {
 
 		StreamConfig config = new StreamConfig(vertex.getConfiguration());
 
-		config.setMutability(mutability.get(vertexName));
 		config.setBufferTimeout(bufferTimeout.get(vertexName));
 
 		config.setTypeSerializerIn1(typeSerializersIn1.get(vertexName));
@@ -447,10 +443,6 @@ public class JobGraphBuilder {
 		inputFormatList.put(vertexName, inputFormat);
 	}
 
-	public void setMutability(String vertexName, boolean isMutable) {
-		mutability.put(vertexName, isMutable);
-	}
-
 	public void setBufferTimeout(String vertexName, long bufferTimeout) {
 		this.bufferTimeout.put(vertexName, bufferTimeout);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index 1d863a7..9800b63 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -57,14 +57,11 @@ public class StreamConfig {
 	private static final String TYPE_SERIALIZER_IN_2 = "typeSerializer_in_2";
 	private static final String TYPE_SERIALIZER_OUT_1 = "typeSerializer_out_1";
 	private static final String TYPE_SERIALIZER_OUT_2 = "typeSerializer_out_2";
-	private static final String MUTABILITY = "isMutable";
 	private static final String ITERATON_WAIT = "iterationWait";
 
 	// DEFAULT VALUES
 
-	private static final boolean DEFAULT_IS_MUTABLE = false;
-
-	private static final long DEFAULT_TIMEOUT = 0;
+	private static final long DEFAULT_TIMEOUT = 100;
 
 	// CONFIG METHODS
 
@@ -138,14 +135,6 @@ public class StreamConfig {
 		config.setBytes(key, SerializationUtils.serialize(typeWrapper));
 	}
 
-	public void setMutability(boolean isMutable) {
-		config.setBoolean(MUTABILITY, isMutable);
-	}
-
-	public boolean getMutability() {
-		return config.getBoolean(MUTABILITY, DEFAULT_IS_MUTABLE);
-	}
-
 	public void setBufferTimeout(long timeout) {
 		config.setLong(BUFFER_TIMEOUT, timeout);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 474d57b..6e8da0a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -366,7 +366,7 @@ public class DataStream<OUT> {
 	 * the data stream that will be fed back and used as the input for the
 	 * iteration head. A common usage pattern for streaming iterations is to use
 	 * output splitting to send a part of the closing data stream to the head.
-	 * Refer to {@link SingleOutputStreamOperator#split(OutputSelector)} for
+	 * Refer to {@link SingleOutputStreamOperator#split(outputSelector)} for
 	 * more information.
 	 * <p>
 	 * The iteration edge will be partitioned the same way as the first input of
@@ -940,7 +940,6 @@ public class DataStream<OUT> {
 			WriteFormatAsText<OUT> format, long millis, OUT endTuple) {
 		DataStreamSink<OUT> returnStream = addSink(inputStream, new WriteSinkFunctionByMillis<OUT>(
 				path, format, millis, endTuple), inputStream.typeInfo);
-		jobGraphBuilder.setMutability(returnStream.getId(), false);
 		return returnStream;
 	}
 
@@ -968,7 +967,6 @@ public class DataStream<OUT> {
 		DataStreamSink<OUT> returnStream = addSink(inputStream,
 				new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple),
 				inputStream.typeInfo);
-		jobGraphBuilder.setMutability(returnStream.getId(), false);
 		return returnStream;
 	}
 
@@ -1063,9 +1061,6 @@ public class DataStream<OUT> {
 	 * @return The closed DataStream
 	 */
 	public DataStreamSink<OUT> writeAsCsv(String path, int batchSize, OUT endTuple) {
-		if (this instanceof SingleOutputStreamOperator) {
-			((SingleOutputStreamOperator<?, ?>) this).setMutability(false);
-		}
 		return writeAsCsv(this, path, new WriteFormatAsCsv<OUT>(), batchSize, endTuple);
 	}
 
@@ -1091,7 +1086,6 @@ public class DataStream<OUT> {
 			WriteFormatAsCsv<OUT> format, long millis, OUT endTuple) {
 		DataStreamSink<OUT> returnStream = addSink(inputStream, new WriteSinkFunctionByMillis<OUT>(
 				path, format, millis, endTuple), inputStream.typeInfo);
-		jobGraphBuilder.setMutability(returnStream.getId(), false);
 		return returnStream;
 	}
 
@@ -1119,7 +1113,6 @@ public class DataStream<OUT> {
 		DataStreamSink<OUT> returnStream = addSink(inputStream,
 				new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple),
 				inputStream.typeInfo);
-		jobGraphBuilder.setMutability(returnStream.getId(), false);
 		return returnStream;
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 3e1c940..016322b 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -71,24 +71,6 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 		return this;
 	}
 
-	/**
-	 * This is a beta feature, use with care </br><br/>
-	 * Sets the mutability of the operator. If the operator is set to mutable,
-	 * the tuples received in the user defined functions, will be reused after
-	 * the function call. Setting an operator to mutable reduces garbage
-	 * collection overhead and thus increases scalability. Please note that if a
-	 * {@link DataStream#batchReduce} or {@link DataStream#windowReduce} is used
-	 * as mutable, the user can only iterate through the iterator once in every
-	 * invoke.
-	 * 
-	 * @param isMutable
-	 *            The mutability of the operator.
-	 * @return The operator with mutability set.
-	 */
-	public SingleOutputStreamOperator<OUT, O> setMutability(boolean isMutable) {
-		jobGraphBuilder.setMutability(id, isMutable);
-		return this;
-	}
 
 	/**
 	 * Sets the maximum time frequency (ms) for the flushing of the output

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
index ec33224..74591a8 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
@@ -30,23 +30,15 @@ public class SinkInvokable<IN> extends StreamInvokable<IN, IN> {
 	}
 
 	@Override
-	protected void immutableInvoke() throws Exception {
-		while ((reuse = recordIterator.next(reuse)) != null) {
-			callUserFunctionAndLogException();
-			resetReuse();
-		}
-	}
-
-	@Override
-	protected void mutableInvoke() throws Exception {
-		while ((reuse = recordIterator.next(reuse)) != null) {
+	public void invoke() throws Exception {
+		while (readNext() != null) {
 			callUserFunctionAndLogException();
 		}
 	}
 
 	@Override
 	protected void callUserFunction() throws Exception {
-		sinkFunction.invoke((IN) reuse.getObject());		
+		sinkFunction.invoke((IN) nextRecord.getObject());
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
index 0cfe028..f1cf2c5 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
@@ -21,38 +21,24 @@ import java.io.Serializable;
 
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 
-public class SourceInvokable<OUT> extends StreamInvokable<OUT,OUT> implements Serializable {
+public class SourceInvokable<OUT> extends StreamInvokable<OUT, OUT> implements Serializable {
 
 	private static final long serialVersionUID = 1L;
 
 	private SourceFunction<OUT> sourceFunction;
 
-
 	public SourceInvokable(SourceFunction<OUT> sourceFunction) {
 		super(sourceFunction);
 		this.sourceFunction = sourceFunction;
 	}
 
 	@Override
-	public void invoke() throws Exception {
-		sourceFunction.invoke(collector);
-	}
-
-	@Override
-	protected void immutableInvoke() throws Exception {		
-	}
-
-	@Override
-	protected void mutableInvoke() throws Exception {		
+	public void invoke() {
+		callUserFunctionAndLogException();
 	}
 
 	@Override
 	protected void callUserFunction() throws Exception {
+		sourceFunction.invoke(collector);
 	}
-	
-	@Override
-	public SourceFunction<OUT> getSourceFunction(){
-		return sourceFunction;
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
index e587b93..d19d7ad 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
@@ -17,16 +17,16 @@
 
 package org.apache.flink.streaming.api.invokable;
 
+import java.io.IOException;
 import java.io.Serializable;
 
 import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.api.streamvertex.StreamTaskContext;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 import org.apache.flink.util.StringUtils;
@@ -45,9 +45,11 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable {
 	private static final long serialVersionUID = 1L;
 	private static final Logger LOG = LoggerFactory.getLogger(StreamInvokable.class);
 
+	protected StreamTaskContext<OUT> taskContext;
+
 	protected MutableObjectIterator<StreamRecord<IN>> recordIterator;
 	protected StreamRecordSerializer<IN> inSerializer;
-	protected StreamRecord<IN> reuse;
+	protected StreamRecord<IN> nextRecord;
 	protected boolean isMutable;
 
 	protected Collector<OUT> collector;
@@ -61,48 +63,43 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable {
 	/**
 	 * Initializes the {@link StreamInvokable} for input and output handling
 	 * 
-	 * @param collector
-	 *            Collector object for collecting the outputs for the operator
-	 * @param recordIterator
-	 *            Iterator for reading in the input records
-	 * @param serializer
-	 *            Serializer used to deserialize inputs
-	 * @param isMutable
-	 *            Mutability setting for the operator
+	 * @param taskContext
+	 *            StreamTaskContext representing the vertex
 	 */
-	public void initialize(Collector<OUT> collector,
-			MutableObjectIterator<StreamRecord<IN>> recordIterator,
-			StreamRecordSerializer<IN> serializer, boolean isMutable) {
-		this.collector = collector;
-		this.recordIterator = recordIterator;
-		this.inSerializer = serializer;
+	public void setup(StreamTaskContext<OUT> taskContext) {
+		this.collector = taskContext.getOutputCollector();
+		this.recordIterator = taskContext.getInput(0);
+		this.inSerializer = taskContext.getInputSerializer(0);
 		if (this.inSerializer != null) {
-			this.reuse = serializer.createInstance();
+			this.nextRecord = inSerializer.createInstance();
 		}
-		this.isMutable = isMutable;
-	}
-
-	/**
-	 * Re-initializes the object in which the next input record will be read in
-	 */
-	protected void resetReuse() {
-		this.reuse = inSerializer.createInstance();
+		this.taskContext = taskContext;
 	}
 
 	/**
-	 * Method that will be called if the mutability setting is set to immutable
+	 * Method that will be called when the operator starts, should encode the
+	 * processing logic
 	 */
-	protected abstract void immutableInvoke() throws Exception;
+	public abstract void invoke() throws Exception;
 
-	/**
-	 * Method that will be called if the mutability setting is set to mutable
+	/*
+	 * Reads the next record from the reader iterator and stores it in the
+	 * nextRecord variable
 	 */
-	protected abstract void mutableInvoke() throws Exception;
+	protected StreamRecord<IN> readNext() {
+		this.nextRecord = inSerializer.createInstance();
+		try {
+			return nextRecord = recordIterator.next(nextRecord);
+		} catch (IOException e) {
+			throw new RuntimeException("Could not read next record.");
+		}
+	}
 
 	/**
 	 * The call of the user implemented function should be implemented here
 	 */
-	protected abstract void callUserFunction() throws Exception;
+	protected void callUserFunction() throws Exception {
+	}
 
 	/**
 	 * Method for logging exceptions thrown during the user function call
@@ -119,20 +116,6 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable {
 	}
 
 	/**
-	 * Method that will be called when the stream starts. The user should encode
-	 * the processing functionality in {@link #mutableInvoke()} and
-	 * {@link #immutableInvoke()}
-	 * 
-	 */
-	public void invoke() throws Exception {
-		if (this.isMutable) {
-			mutableInvoke();
-		} else {
-			immutableInvoke();
-		}
-	}
-
-	/**
 	 * Open method to be used if the user defined function extends the
 	 * RichFunction class
 	 * 
@@ -141,9 +124,7 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable {
 	 */
 	public void open(Configuration parameters) throws Exception {
 		isRunning = true;
-		if (userFunction instanceof RichFunction) {
-			((RichFunction) userFunction).open(parameters);
-		}
+		FunctionUtils.openFunction(userFunction, parameters);
 	}
 
 	/**
@@ -154,16 +135,10 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable {
 	public void close() throws Exception {
 		isRunning = false;
 		collector.close();
-		if (userFunction instanceof RichFunction) {
-			((RichFunction) userFunction).close();
-		}
+		FunctionUtils.closeFunction(userFunction);
 	}
 
 	public void setRuntimeContext(RuntimeContext t) {
 		FunctionUtils.setFunctionRuntimeContext(userFunction, t);
 	}
-
-	public SourceFunction<OUT> getSourceFunction() {
-		return null;
-	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
index 7924595..0267253 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
@@ -21,30 +21,17 @@ import org.apache.flink.streaming.api.invokable.StreamInvokable;
 
 public class CounterInvokable<IN> extends StreamInvokable<IN, Long> {
 	private static final long serialVersionUID = 1L;
-	
+
 	Long count = 0L;
-	
+
 	public CounterInvokable() {
 		super(null);
 	}
-	
-	@Override
-	protected void immutableInvoke() throws Exception {
-		while ((reuse = recordIterator.next(reuse)) != null) {
-			callUserFunctionAndLogException();
-			resetReuse();
-		}
-	}
 
 	@Override
-	protected void mutableInvoke() throws Exception {
-		while ((reuse = recordIterator.next(reuse)) != null) {
-			callUserFunctionAndLogException();
+	public void invoke() throws Exception {
+		while (readNext() != null) {
+			collector.collect(++count);
 		}
 	}
-
-	@Override
-	protected void callUserFunction() throws Exception {
-		collector.collect(++count);
-	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
index a54b6ad..796196d 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
@@ -25,37 +25,25 @@ public class FilterInvokable<IN> extends StreamInvokable<IN, IN> {
 	private static final long serialVersionUID = 1L;
 
 	FilterFunction<IN> filterFunction;
+	private boolean collect;
 
 	public FilterInvokable(FilterFunction<IN> filterFunction) {
 		super(filterFunction);
 		this.filterFunction = filterFunction;
 	}
 
-	private boolean canCollect;
-	
 	@Override
-	protected void immutableInvoke() throws Exception {
-		while ((reuse = recordIterator.next(reuse)) != null) {
+	public void invoke() throws Exception {
+		while (readNext() != null) {
 			callUserFunctionAndLogException();
-			if (canCollect) {
-				collector.collect(reuse.getObject());
-			}
-			resetReuse();
-		}
-	}
-
-	@Override
-	protected void mutableInvoke() throws Exception {
-		while ((reuse = recordIterator.next(reuse)) != null) {
-			callUserFunctionAndLogException();
-			if (canCollect) {
-				collector.collect(reuse.getObject());
+			if (collect) {
+				collector.collect(nextRecord.getObject());
 			}
 		}
 	}
 
 	@Override
 	protected void callUserFunction() throws Exception {
-		canCollect = filterFunction.filter(reuse.getObject());
+		collect = filterFunction.filter(nextRecord.getObject());
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
index 3452a82..8ff78eb 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
@@ -31,23 +31,15 @@ public class FlatMapInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
 	}
 
 	@Override
-	protected void immutableInvoke() throws Exception {
-		while ((reuse = recordIterator.next(reuse)) != null) {
-			callUserFunctionAndLogException();
-			resetReuse();
-		}
-	}
-
-	@Override
-	protected void mutableInvoke() throws Exception {
-		while ((reuse = recordIterator.next(reuse)) != null) {
+	public void invoke() throws Exception {
+		while (readNext() != null) {
 			callUserFunctionAndLogException();
 		}
 	}
 
 	@Override
 	protected void callUserFunction() throws Exception {
-		flatMapper.flatMap(reuse.getObject(), collector);
+		flatMapper.flatMap(nextRecord.getObject(), collector);
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
index d64fb6f..fdcf520 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
@@ -38,9 +38,9 @@ public class GroupedReduceInvokable<IN> extends StreamReduceInvokable<IN> {
 
 	@Override
 	protected void reduce() throws Exception {
-		Object key = reuse.getKey(keySelector);
+		Object key = nextRecord.getKey(keySelector);
 		currentValue = values.get(key);
-		nextValue = reuse.getObject();
+		nextValue = nextRecord.getObject();
 		if (currentValue != null) {
 			callUserFunctionAndLogException();
 			values.put(key, reduced);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
index ae16be0..33348e4 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
@@ -37,8 +37,6 @@ import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * This invokable allows windowing based on {@link TriggerPolicy} and
@@ -80,8 +78,6 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
 	 */
 	private static final long serialVersionUID = -3469545957144404137L;
 
-	private static final Logger LOG = LoggerFactory.getLogger(GroupedWindowInvokable.class);
-
 	private KeySelector<IN, ?> keySelector;
 	private Configuration parameters;
 	private LinkedList<ActiveTriggerPolicy<IN>> activeCentralTriggerPolicies;
@@ -226,23 +222,23 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
 	}
 
 	@Override
-	protected void immutableInvoke() throws Exception {
+	public void invoke() throws Exception {
 		// Prevent empty data streams
-		if ((reuse = recordIterator.next(reuse)) == null) {
+		if (readNext() == null) {
 			throw new RuntimeException("DataStream must not be empty");
 		}
 
 		// Continuously run
-		while (reuse != null) {
-			WindowInvokable<IN, OUT> groupInvokable = windowingGroups.get(keySelector.getKey(reuse
-					.getObject()));
+		while (nextRecord != null) {
+			WindowInvokable<IN, OUT> groupInvokable = windowingGroups.get(keySelector
+					.getKey(nextRecord.getObject()));
 			if (groupInvokable == null) {
-				groupInvokable = makeNewGroup(reuse);
+				groupInvokable = makeNewGroup(nextRecord);
 			}
 
 			// Run the precalls for central active triggers
 			for (ActiveTriggerPolicy<IN> trigger : activeCentralTriggerPolicies) {
-				Object[] result = trigger.preNotifyTrigger(reuse.getObject());
+				Object[] result = trigger.preNotifyTrigger(nextRecord.getObject());
 				for (Object in : result) {
 
 					// If central eviction is used, handle it here
@@ -260,7 +256,7 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
 
 			// Process non-active central triggers
 			for (TriggerPolicy<IN> triggerPolicy : centralTriggerPolicies) {
-				if (triggerPolicy.notifyTrigger(reuse.getObject())) {
+				if (triggerPolicy.notifyTrigger(nextRecord.getObject())) {
 					currentTriggerPolicies.add(triggerPolicy);
 				}
 			}
@@ -268,12 +264,12 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
 			if (currentTriggerPolicies.isEmpty()) {
 
 				// only add the element to its group
-				groupInvokable.processRealElement(reuse.getObject());
+				groupInvokable.processRealElement(nextRecord.getObject());
 				checkForEmptyGroupBuffer(groupInvokable);
 
 				// If central eviction is used, handle it here
 				if (!centralEvictionPolicies.isEmpty()) {
-					evictElements(centralEviction(reuse.getObject(), false));
+					evictElements(centralEviction(nextRecord.getObject(), false));
 					deleteOrderForCentralEviction.add(groupInvokable);
 				}
 
@@ -283,20 +279,21 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
 				for (WindowInvokable<IN, OUT> group : windowingGroups.values()) {
 					if (group == groupInvokable) {
 						// process real with initialized policies
-						group.processRealElement(reuse.getObject(), currentTriggerPolicies);
+						group.processRealElement(nextRecord.getObject(), currentTriggerPolicies);
 					} else {
 						// process like a fake but also initialized with
 						// policies
-						group.externalTriggerFakeElement(reuse.getObject(), currentTriggerPolicies);
+						group.externalTriggerFakeElement(nextRecord.getObject(),
+								currentTriggerPolicies);
 					}
-					
-					//remove group in case it has an empty buffer
-					//checkForEmptyGroupBuffer(group);
+
+					// remove group in case it has an empty buffer
+					// checkForEmptyGroupBuffer(group);
 				}
 
 				// If central eviction is used, handle it here
 				if (!centralEvictionPolicies.isEmpty()) {
-					evictElements(centralEviction(reuse.getObject(), true));
+					evictElements(centralEviction(nextRecord.getObject(), true));
 					deleteOrderForCentralEviction.add(groupInvokable);
 				}
 			}
@@ -304,9 +301,8 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
 			// clear current trigger list
 			currentTriggerPolicies.clear();
 
-			// Recreate the reuse-StremRecord object and load next StreamRecord
-			resetReuse();
-			reuse = recordIterator.next(reuse);
+			// read next record
+			readNext();
 		}
 
 		// Stop all remaining threads from policies
@@ -358,7 +354,7 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
 					clonedDistributedEvictionPolicies);
 		}
 
-		groupInvokable.initialize(collector, recordIterator, inSerializer, isMutable);
+		groupInvokable.setup(taskContext);
 		groupInvokable.open(this.parameters);
 		windowingGroups.put(keySelector.getKey(element.getObject()), groupInvokable);
 
@@ -366,21 +362,6 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
 	}
 
 	@Override
-	protected void mutableInvoke() throws Exception {
-		if (LOG.isInfoEnabled()) {
-			LOG.info("There is currently no mutable implementation of this operator. Immutable version is used.");
-		}
-		immutableInvoke();
-	}
-
-	@Override
-	protected void callUserFunction() throws Exception {
-		// This method gets never called directly. The user function calls are
-		// all delegated to the invokable instanced which handle/represent the
-		// groups.
-	}
-
-	@Override
 	public void open(Configuration parameters) throws Exception {
 		super.open(parameters);
 		this.parameters = parameters;
@@ -456,12 +437,13 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
 	 *            buffer.
 	 */
 	private void evictElements(int numToEvict) {
-		HashSet<WindowInvokable<IN, OUT>> usedGroups=new HashSet<WindowInvokable<IN,OUT>>();
+		HashSet<WindowInvokable<IN, OUT>> usedGroups = new HashSet<WindowInvokable<IN, OUT>>();
 		for (; numToEvict > 0; numToEvict--) {
-			WindowInvokable<IN, OUT> currentGroup=deleteOrderForCentralEviction.getFirst();
-			//Do the eviction
+			WindowInvokable<IN, OUT> currentGroup = deleteOrderForCentralEviction.getFirst();
+			// Do the eviction
 			currentGroup.evictFirst();
-			//Remember groups which possibly have an empty buffer after the eviction
+			// Remember groups which possibly have an empty buffer after the
+			// eviction
 			usedGroups.add(currentGroup);
 			try {
 				deleteOrderForCentralEviction.removeFirst();
@@ -471,13 +453,13 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
 			}
 
 		}
-		
-		//Remove groups with empty buffer
-		for (WindowInvokable<IN, OUT> group:usedGroups){
+
+		// Remove groups with empty buffer
+		for (WindowInvokable<IN, OUT> group : usedGroups) {
 			checkForEmptyGroupBuffer(group);
 		}
 	}
-	
+
 	/**
 	 * Checks if the element buffer of a given windowing group is empty. If so,
 	 * the group will be deleted.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
index 4feb4f3..6be96ec 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
@@ -31,22 +31,14 @@ public class MapInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
 	}
 
 	@Override
-	protected void immutableInvoke() throws Exception {
-		while ((reuse = recordIterator.next(reuse)) != null) {
-			callUserFunctionAndLogException();
-			resetReuse();
-		}
-	}
-
-	@Override
-	protected void mutableInvoke() throws Exception {
-		while ((reuse = recordIterator.next(reuse)) != null) {
+	public void invoke() throws Exception {
+		while (readNext() != null) {
 			callUserFunctionAndLogException();
 		}
 	}
 
 	@Override
 	protected void callUserFunction() throws Exception {
-		collector.collect(mapper.map(reuse.getObject()));
+		collector.collect(mapper.map(nextRecord.getObject()));
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
index 4666a85..c9d9e5a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
@@ -39,13 +39,8 @@ public class ProjectInvokable<IN, OUT extends Tuple> extends StreamInvokable<IN,
 	}
 
 	@Override
-	protected void immutableInvoke() throws Exception {
-		mutableInvoke();
-	}
-
-	@Override
-	protected void mutableInvoke() throws Exception {
-		while ((reuse = recordIterator.next(reuse)) != null) {
+	public void invoke() throws Exception {
+		while (readNext() != null) {
 			callUserFunctionAndLogException();
 		}
 	}
@@ -53,7 +48,7 @@ public class ProjectInvokable<IN, OUT extends Tuple> extends StreamInvokable<IN,
 	@Override
 	protected void callUserFunction() throws Exception {
 		for (int i = 0; i < this.numFields; i++) {
-			outTuple.setField(reuse.getField(fields[i]), i);
+			outTuple.setField(nextRecord.getField(fields[i]), i);
 		}
 		collector.collect(outTuple);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
index d327c76..4bb78b8 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
@@ -34,22 +34,14 @@ public class StreamReduceInvokable<IN> extends StreamInvokable<IN, IN> {
 	}
 
 	@Override
-	protected void immutableInvoke() throws Exception {
-		while ((reuse = recordIterator.next(reuse)) != null) {
-			reduce();
-			resetReuse();
-		}
-	}
-
-	@Override
-	protected void mutableInvoke() throws Exception {
-		while ((reuse = recordIterator.next(reuse)) != null) {
+	public void invoke() throws Exception {
+		while (readNext() != null) {
 			reduce();
 		}
 	}
 
 	protected void reduce() throws Exception {
-		nextValue = reuse.getObject();
+		nextValue = nextRecord.getObject();
 		callUserFunctionAndLogException();
 
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
index 0e740bb..ea891c9 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
@@ -29,8 +29,6 @@ import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerCallback;
 import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public abstract class WindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
 
@@ -39,8 +37,6 @@ public abstract class WindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT>
 	 */
 	private static final long serialVersionUID = -8038984294071650730L;
 
-	private static final Logger LOG = LoggerFactory.getLogger(WindowInvokable.class);
-
 	private LinkedList<TriggerPolicy<IN>> triggerPolicies;
 	private LinkedList<EvictionPolicy<IN>> evictionPolicies;
 	private LinkedList<ActiveTriggerPolicy<IN>> activeTriggerPolicies;
@@ -120,20 +116,19 @@ public abstract class WindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT>
 	}
 
 	@Override
-	protected void immutableInvoke() throws Exception {
+	public void invoke() throws Exception {
 
 		// Prevent empty data streams
-		if ((reuse = recordIterator.next(reuse)) == null) {
+		if (readNext() == null) {
 			throw new RuntimeException("DataStream must not be empty");
 		}
 
 		// Continuously run
-		while (reuse != null) {
-			processRealElement(reuse.getObject());
+		while (nextRecord != null) {
+			processRealElement(nextRecord.getObject());
 
-			// Recreate the reuse-StremRecord object and load next StreamRecord
-			resetReuse();
-			reuse = recordIterator.next(reuse);
+			// Load next StreamRecord
+			readNext();
 		}
 
 		// Stop all remaining threads from policies
@@ -146,14 +141,6 @@ public abstract class WindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT>
 
 	}
 
-	@Override
-	protected void mutableInvoke() throws Exception {
-		if (LOG.isInfoEnabled()) {
-			LOG.info("There is currently no mutable implementation of this operator. Immutable version is used.");
-		}
-		immutableInvoke();
-	}
-
 	/**
 	 * This method gets called in case of an grouped windowing in case central
 	 * trigger occurred and the arriving element causing the trigger is not part
@@ -363,10 +350,10 @@ public abstract class WindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT>
 	 * 
 	 * @return true in case the buffer is empty otherwise false.
 	 */
-	protected boolean isBufferEmpty(){
+	protected boolean isBufferEmpty() {
 		return buffer.isEmpty();
 	}
-	
+
 	/**
 	 * This method does the final reduce at the end of the stream and emits the
 	 * result.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchReduceInvokable.java
index edf5a8f..4ed49fd 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchReduceInvokable.java
@@ -63,7 +63,7 @@ public class CoBatchReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2,
 	}
 
 	@Override
-	public void immutableInvoke() throws Exception {
+	public void invoke() throws Exception {
 		while (true) {
 			int next = recordIterator.next(reuse1, reuse2);
 			if (next == 0) {
@@ -100,13 +100,6 @@ public class CoBatchReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2,
 		return batch2;
 	}
 
-	@Override
-	// TODO: implement mutableInvoke for reduce
-	protected void mutableInvoke() throws Exception {
-		System.out.println("Immutable setting is used");
-		immutableInvoke();
-	}
-
 	protected void reduce1(StreamBatch<IN1> batch) {
 		this.currentBatch1 = batch;
 		callUserFunctionAndLogException1();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
index 25ed62c..604873e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
@@ -22,8 +22,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.api.streamvertex.StreamTaskContext;
 import org.apache.flink.streaming.io.CoReaderIterator;
-import org.apache.flink.util.Collector;
 import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,19 +45,18 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<IN1, OU
 	protected TypeSerializer<IN1> serializer1;
 	protected TypeSerializer<IN2> serializer2;
 
-	public void initialize(Collector<OUT> collector,
-			CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> recordIterator,
-			StreamRecordSerializer<IN1> serializer1, StreamRecordSerializer<IN2> serializer2,
-			boolean isMutable) {
-		this.collector = collector;
+	@Override
+	public void setup(StreamTaskContext<OUT> taskContext) {
+		this.collector = taskContext.getOutputCollector();
+
+		this.recordIterator = taskContext.getCoReader();
+
+		this.srSerializer1 = taskContext.getInputSerializer(0);
+		this.srSerializer2 = taskContext.getInputSerializer(1);
 
-		this.recordIterator = recordIterator;
-		this.reuse1 = serializer1.createInstance();
-		this.reuse2 = serializer2.createInstance();
+		this.reuse1 = srSerializer1.createInstance();
+		this.reuse2 = srSerializer2.createInstance();
 
-		this.srSerializer1 = serializer1;
-		this.srSerializer2 = serializer2;
-		this.isMutable = isMutable;
 		this.serializer1 = srSerializer1.getObjectSerializer();
 		this.serializer2 = srSerializer2.getObjectSerializer();
 	}
@@ -76,7 +75,7 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<IN1, OU
 	}
 
 	@Override
-	protected void immutableInvoke() throws Exception {
+	public void invoke() throws Exception {
 		while (true) {
 			int next = recordIterator.next(reuse1, reuse2);
 			if (next == 0) {
@@ -93,22 +92,6 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<IN1, OU
 		}
 	}
 
-	@Override
-	protected void mutableInvoke() throws Exception {
-		while (true) {
-			int next = recordIterator.next(reuse1, reuse2);
-			if (next == 0) {
-				break;
-			} else if (next == 1) {
-				initialize1();
-				handleStream1();
-			} else {
-				initialize2();
-				handleStream2();
-			}
-		}
-	}
-
 	protected abstract void handleStream1() throws Exception;
 
 	protected abstract void handleStream2() throws Exception;
@@ -147,8 +130,4 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<IN1, OU
 		}
 	}
 
-	@Override
-	protected void callUserFunction() throws Exception {
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java
index be3f578..7df5668 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java
@@ -59,11 +59,6 @@ public class CoWindowInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT>
 	}
 
 	@Override
-	protected void mutableInvoke() throws Exception {
-		throw new RuntimeException("Reducing mutable sliding batch is not supported.");
-	}
-
-	@Override
 	protected void handleStream1() throws Exception {
 		window.addToBuffer1(reuse1.getObject());
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
index 0058c66..9321bc7 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
@@ -30,8 +30,6 @@ import org.apache.flink.util.MutableObjectIterator;
 
 public class CoStreamVertex<IN1, IN2, OUT> extends StreamVertex<IN1, OUT> {
 
-	private OutputHandler<OUT> outputHandler;
-
 	protected StreamRecordSerializer<IN1> inputDeserializer1 = null;
 	protected StreamRecordSerializer<IN2> inputDeserializer2 = null;
 
@@ -68,8 +66,7 @@ public class CoStreamVertex<IN1, IN2, OUT> extends StreamVertex<IN1, OUT> {
 	@Override
 	protected void setInvokable() {
 		userInvokable = configuration.getUserInvokable(userClassLoader);
-		userInvokable.initialize(outputHandler.getCollector(), coIter, inputDeserializer1,
-				inputDeserializer2, isMutable);
+		userInvokable.setup(this);
 	}
 
 	protected void setConfigInputs() throws StreamVertexException {
@@ -105,4 +102,36 @@ public class CoStreamVertex<IN1, IN2, OUT> extends StreamVertex<IN1, OUT> {
 		outputHandler.invokeUserFunction("CO-TASK", userInvokable);
 	}
 
+	@SuppressWarnings("unchecked")
+	@Override
+	public <X> MutableObjectIterator<X> getInput(int index) {
+		switch (index) {
+		case 0:
+			return (MutableObjectIterator<X>) inputIter1;
+		case 1:
+			return (MutableObjectIterator<X>) inputIter2;
+		default:
+			throw new IllegalArgumentException("CoStreamVertex has only 2 inputs");
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public <X> StreamRecordSerializer<X> getInputSerializer(int index) {
+		switch (index) {
+		case 0:
+			return (StreamRecordSerializer<X>) inputDeserializer1;
+		case 1:
+			return (StreamRecordSerializer<X>) inputDeserializer2;
+		default:
+			throw new IllegalArgumentException("CoStreamVertex has only 2 inputs");
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public <X, Y> CoReaderIterator<X, Y> getCoReader() {
+		return (CoReaderIterator<X, Y>) coIter;
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamTaskContext.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamTaskContext.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamTaskContext.java
new file mode 100644
index 0000000..7fbab3b
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamTaskContext.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.streamvertex;
+
+import org.apache.flink.streaming.api.StreamConfig;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.io.CoReaderIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+public interface StreamTaskContext<OUT> {
+
+	StreamConfig getConfig();
+
+	ClassLoader getUserCodeClassLoader();
+
+	<X> MutableObjectIterator<X> getInput(int index);
+
+	<X> StreamRecordSerializer<X> getInputSerializer(int index);
+
+	Collector<OUT> getOutputCollector();
+
+	<X, Y> CoReaderIterator<X, Y> getCoReader();
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
index 13e6c9f..7504efd 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -23,9 +23,13 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.streaming.api.StreamConfig;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.io.CoReaderIterator;
 import org.apache.flink.streaming.state.OperatorState;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
 
-public class StreamVertex<IN, OUT> extends AbstractInvokable {
+public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTaskContext<OUT> {
 
 	private static int numTasks;
 
@@ -33,12 +37,11 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable {
 	protected int instanceID;
 	protected String name;
 	private static int numVertices = 0;
-	protected boolean isMutable;
 	protected Object function;
 	protected String functionName;
 
 	private InputHandler<IN> inputHandler;
-	private OutputHandler<OUT> outputHandler;
+	protected OutputHandler<OUT> outputHandler;
 	private StreamInvokable<IN, OUT> userInvokable;
 
 	private StreamingRuntimeContext context;
@@ -68,7 +71,6 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable {
 		this.userClassLoader = getUserCodeClassLoader();
 		this.configuration = new StreamConfig(getTaskConfiguration());
 		this.name = configuration.getVertexName();
-		this.isMutable = configuration.getMutability();
 		this.functionName = configuration.getFunctionName();
 		this.function = configuration.getFunction(userClassLoader);
 		this.states = configuration.getOperatorStates(userClassLoader);
@@ -89,8 +91,7 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable {
 
 	protected void setInvokable() {
 		userInvokable = configuration.getUserInvokable(userClassLoader);
-		userInvokable.initialize(outputHandler.getCollector(), inputHandler.getInputIter(),
-				inputHandler.getInputSerializer(), isMutable);
+		userInvokable.setup(this);
 	}
 
 	public String getName() {
@@ -111,4 +112,39 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable {
 	public void invoke() throws Exception {
 		outputHandler.invokeUserFunction("TASK", userInvokable);
 	}
+
+	@Override
+	public StreamConfig getConfig() {
+		return configuration;
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public <X> MutableObjectIterator<X> getInput(int index) {
+		if (index == 0) {
+			return (MutableObjectIterator<X>) inputHandler.getInputIter();
+		} else {
+			throw new IllegalArgumentException("There is only 1 input");
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public <X> StreamRecordSerializer<X> getInputSerializer(int index) {
+		if (index == 0) {
+			return (StreamRecordSerializer<X>) inputHandler.getInputSerializer();
+		} else {
+			throw new IllegalArgumentException("There is only 1 input");
+		}
+	}
+
+	@Override
+	public Collector<OUT> getOutputCollector() {
+		return outputHandler.getCollector();
+	}
+
+	@Override
+	public <X, Y> CoReaderIterator<X, Y> getCoReader() {
+		throw new IllegalArgumentException("CoReader not available");
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
index 0fbf72a..9376166 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
@@ -33,7 +33,7 @@ import org.apache.flink.streaming.api.function.aggregation.ComparableAggregator;
 import org.apache.flink.streaming.api.function.aggregation.SumAggregator;
 import org.apache.flink.streaming.api.invokable.operator.GroupedReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable;
-import org.apache.flink.streaming.util.MockInvokable;
+import org.apache.flink.streaming.util.MockContext;
 import org.apache.flink.streaming.util.keys.TupleKeySelector;
 import org.junit.Test;
 
@@ -102,24 +102,24 @@ public class AggregationFunctionTest {
 				.getAggregator(1, type1, AggregationType.MAX);
 		ReduceFunction<Integer> maxFunction0 = ComparableAggregator.getAggregator(0,
 				type2, AggregationType.MAX);
-		List<Tuple2<Integer, Integer>> sumList = MockInvokable.createAndExecute(
+		List<Tuple2<Integer, Integer>> sumList = MockContext.createAndExecute(
 				new StreamReduceInvokable<Tuple2<Integer, Integer>>(sumFunction), getInputList());
 
-		List<Tuple2<Integer, Integer>> minList = MockInvokable.createAndExecute(
+		List<Tuple2<Integer, Integer>> minList = MockContext.createAndExecute(
 				new StreamReduceInvokable<Tuple2<Integer, Integer>>(minFunction), getInputList());
 
-		List<Tuple2<Integer, Integer>> maxList = MockInvokable.createAndExecute(
+		List<Tuple2<Integer, Integer>> maxList = MockContext.createAndExecute(
 				new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxFunction), getInputList());
 
-		List<Tuple2<Integer, Integer>> groupedSumList = MockInvokable.createAndExecute(
+		List<Tuple2<Integer, Integer>> groupedSumList = MockContext.createAndExecute(
 				new GroupedReduceInvokable<Tuple2<Integer, Integer>>(sumFunction,
 						new TupleKeySelector<Tuple2<Integer, Integer>>(0)), getInputList());
 
-		List<Tuple2<Integer, Integer>> groupedMinList = MockInvokable.createAndExecute(
+		List<Tuple2<Integer, Integer>> groupedMinList = MockContext.createAndExecute(
 				new GroupedReduceInvokable<Tuple2<Integer, Integer>>(minFunction,
 						new TupleKeySelector<Tuple2<Integer, Integer>>(0)), getInputList());
 
-		List<Tuple2<Integer, Integer>> groupedMaxList = MockInvokable.createAndExecute(
+		List<Tuple2<Integer, Integer>> groupedMaxList = MockContext.createAndExecute(
 				new GroupedReduceInvokable<Tuple2<Integer, Integer>>(maxFunction,
 						new TupleKeySelector<Tuple2<Integer, Integer>>(0)), getInputList());
 
@@ -129,11 +129,11 @@ public class AggregationFunctionTest {
 		assertEquals(expectedGroupSumList, groupedSumList);
 		assertEquals(expectedGroupMinList, groupedMinList);
 		assertEquals(expectedGroupMaxList, groupedMaxList);
-		assertEquals(expectedSumList0, MockInvokable.createAndExecute(
+		assertEquals(expectedSumList0, MockContext.createAndExecute(
 				new StreamReduceInvokable<Integer>(sumFunction0), simpleInput));
-		assertEquals(expectedMinList0, MockInvokable.createAndExecute(
+		assertEquals(expectedMinList0, MockContext.createAndExecute(
 				new StreamReduceInvokable<Integer>(minFunction0), simpleInput));
-		assertEquals(expectedMaxList0, MockInvokable.createAndExecute(
+		assertEquals(expectedMaxList0, MockContext.createAndExecute(
 				new StreamReduceInvokable<Integer>(maxFunction0), simpleInput));
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
@@ -210,16 +210,16 @@ public class AggregationFunctionTest {
 		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
 		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
 
-		assertEquals(maxByFirstExpected, MockInvokable.createAndExecute(
+		assertEquals(maxByFirstExpected, MockContext.createAndExecute(
 				new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxByFunctionFirst),
 				getInputList()));
-		assertEquals(maxByLastExpected, MockInvokable.createAndExecute(
+		assertEquals(maxByLastExpected, MockContext.createAndExecute(
 				new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxByFunctionLast),
 				getInputList()));
-		assertEquals(minByLastExpected, MockInvokable.createAndExecute(
+		assertEquals(minByLastExpected, MockContext.createAndExecute(
 				new StreamReduceInvokable<Tuple2<Integer, Integer>>(minByFunctionLast),
 				getInputList()));
-		assertEquals(minByFirstExpected, MockInvokable.createAndExecute(
+		assertEquals(minByFirstExpected, MockContext.createAndExecute(
 				new StreamReduceInvokable<Tuple2<Integer, Integer>>(minByFunctionFirst),
 				getInputList()));
 
@@ -284,16 +284,16 @@ public class AggregationFunctionTest {
 		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
 		minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
 
-		assertEquals(maxByFirstExpected, MockInvokable.createAndExecute(
+		assertEquals(maxByFirstExpected, MockContext.createAndExecute(
 				new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxByFunctionFirst),
 				getInputList()));
-		assertEquals(maxByLastExpected, MockInvokable.createAndExecute(
+		assertEquals(maxByLastExpected, MockContext.createAndExecute(
 				new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxByFunctionLast),
 				getInputList()));
-		assertEquals(minByLastExpected, MockInvokable.createAndExecute(
+		assertEquals(minByLastExpected, MockContext.createAndExecute(
 				new StreamReduceInvokable<Tuple2<Integer, Integer>>(minByFunctionLast),
 				getInputList()));
-		assertEquals(minByFirstExpected, MockInvokable.createAndExecute(
+		assertEquals(minByFirstExpected, MockContext.createAndExecute(
 				new StreamReduceInvokable<Tuple2<Integer, Integer>>(minByFunctionFirst),
 				getInputList()));
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java
index 44b0513..1db286c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java
@@ -25,7 +25,7 @@ import java.util.List;
 
 import org.apache.flink.streaming.api.function.co.CoReduceFunction;
 import org.apache.flink.streaming.api.invokable.operator.co.CoBatchReduceInvokable;
-import org.apache.flink.streaming.util.MockCoInvokable;
+import org.apache.flink.streaming.util.MockCoContext;
 import org.junit.Test;
 
 public class CoBatchReduceTest {
@@ -84,7 +84,7 @@ public class CoBatchReduceTest {
 		expected.add("def");
 		expected.add("ghi");
 
-		List<String> result = MockCoInvokable.createAndExecute(invokable, inputs, inputs2);
+		List<String> result = MockCoContext.createAndExecute(invokable, inputs, inputs2);
 
 		Collections.sort(result);
 		Collections.sort(expected);
@@ -125,7 +125,7 @@ public class CoBatchReduceTest {
 		expected.add("efg");
 		expected.add("ghi");
 
-		List<String> result = MockCoInvokable.createAndExecute(invokable, inputs, inputs2);
+		List<String> result = MockCoContext.createAndExecute(invokable, inputs, inputs2);
 
 		Collections.sort(result);
 		Collections.sort(expected);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
index 5009496..a91bd0c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
@@ -29,7 +29,7 @@ import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
 import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable;
-import org.apache.flink.streaming.util.MockCoInvokable;
+import org.apache.flink.streaming.util.MockCoContext;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
@@ -59,7 +59,7 @@ public class CoFlatMapTest implements Serializable {
 
 		List<String> expectedList = Arrays.asList("a", "b", "c", "1", "d", "e", "f", "2", "g", "h",
 				"e", "3", "4", "5");
-		List<String> actualList = MockCoInvokable.createAndExecute(invokable,
+		List<String> actualList = MockCoContext.createAndExecute(invokable,
 				Arrays.asList("abc", "def", "ghe"), Arrays.asList(1, 2, 3, 4, 5));
 
 		assertEquals(expectedList, actualList);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java
index ce01a7d..bc19a89 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java
@@ -26,7 +26,7 @@ import java.util.List;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.function.co.CoReduceFunction;
 import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedBatchReduceInvokable;
-import org.apache.flink.streaming.util.MockCoInvokable;
+import org.apache.flink.streaming.util.MockCoContext;
 import org.apache.flink.streaming.util.keys.TupleKeySelector;
 import org.junit.Test;
 
@@ -99,7 +99,7 @@ public class CoGroupedBatchReduceTest {
 				new MyCoReduceFunction(), 4L, 3L, 4L, 3L, new TupleKeySelector(0),
 				new TupleKeySelector(0));
 
-		List<String> result = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);
+		List<String> result = MockCoContext.createAndExecute(invokable, inputs1, inputs2);
 
 		Collections.sort(result);
 		Collections.sort(expected);
@@ -146,7 +146,7 @@ public class CoGroupedBatchReduceTest {
 				new MyCoReduceFunction(), 4L, 3L, 2L, 2L, new TupleKeySelector(0),
 				new TupleKeySelector(0));
 
-		List<String> result = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);
+		List<String> result = MockCoContext.createAndExecute(invokable, inputs1, inputs2);
 
 		Collections.sort(result);
 		Collections.sort(expected);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java
index 4570e23..15d42a4 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.streaming.api.function.co.CoReduceFunction;
 import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedReduceInvokable;
-import org.apache.flink.streaming.util.MockCoInvokable;
+import org.apache.flink.streaming.util.MockCoContext;
 import org.apache.flink.streaming.util.keys.TupleKeySelector;
 import org.junit.Test;
 
@@ -77,7 +77,7 @@ public class CoGroupedReduceTest {
 		List<String> expected = Arrays.asList("word1", "1", "word2", "2", "word1word3", "3", "5",
 				"7");
 
-		List<String> actualList = MockCoInvokable.createAndExecute(invokable,
+		List<String> actualList = MockCoContext.createAndExecute(invokable,
 				Arrays.asList(word1, word2, word3), Arrays.asList(int1, int2, int3, int4, int5));
 
 		assertEquals(expected, actualList);
@@ -87,7 +87,7 @@ public class CoGroupedReduceTest {
 
 		expected = Arrays.asList("word1", "1", "word2", "2", "word2word3", "3", "5", "7");
 
-		actualList = MockCoInvokable.createAndExecute(invokable,
+		actualList = MockCoContext.createAndExecute(invokable,
 				Arrays.asList(word1, word2, word3), Arrays.asList(int1, int2, int3, int4, int5));
 
 		assertEquals(expected, actualList);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
index f36a7b5..b5a7e8d 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
@@ -29,7 +29,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.function.co.CoReduceFunction;
 import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedWindowReduceInvokable;
 import org.apache.flink.streaming.api.invokable.util.TimeStamp;
-import org.apache.flink.streaming.util.MockCoInvokable;
+import org.apache.flink.streaming.util.MockCoContext;
 import org.apache.flink.streaming.util.keys.TupleKeySelector;
 import org.junit.Test;
 
@@ -129,7 +129,7 @@ public class CoGroupedWindowReduceTest {
 				new TupleKeySelector( 0), new MyTimeStamp<Tuple2<String, Integer>>(
 						timestamps1), new MyTimeStamp<Tuple2<String, String>>(timestamps2));
 
-		List<String> result = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);
+		List<String> result = MockCoContext.createAndExecute(invokable, inputs1, inputs2);
 
 		Collections.sort(result);
 		Collections.sort(expected);
@@ -182,7 +182,7 @@ public class CoGroupedWindowReduceTest {
 				new TupleKeySelector( 0), new MyTimeStamp<Tuple2<String, Integer>>(
 						timestamps1), new MyTimeStamp<Tuple2<String, String>>(timestamps2));
 
-		List<String> result = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);
+		List<String> result = MockCoContext.createAndExecute(invokable, inputs1, inputs2);
 
 		Collections.sort(result);
 		Collections.sort(expected);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
index 9c62aec..93d1741 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
@@ -25,7 +25,7 @@ import java.util.List;
 
 import org.apache.flink.streaming.api.function.co.CoMapFunction;
 import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
-import org.apache.flink.streaming.util.MockCoInvokable;
+import org.apache.flink.streaming.util.MockCoContext;
 import org.junit.Test;
 
 public class CoMapTest implements Serializable {
@@ -50,7 +50,7 @@ public class CoMapTest implements Serializable {
 		CoMapInvokable<Double, Integer, String> invokable = new CoMapInvokable<Double, Integer, String>(new MyCoMap());
 
 		List<String> expectedList = Arrays.asList("1.1", "1", "1.2", "2", "1.3", "3", "1.4", "1.5");
-		List<String> actualList = MockCoInvokable.createAndExecute(invokable, Arrays.asList(1.1, 1.2, 1.3, 1.4, 1.5), Arrays.asList(1, 2, 3));
+		List<String> actualList = MockCoContext.createAndExecute(invokable, Arrays.asList(1.1, 1.2, 1.3, 1.4, 1.5), Arrays.asList(1, 2, 3));
 		
 		assertEquals(expectedList, actualList);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoStreamReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoStreamReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoStreamReduceTest.java
index 996320a..3343ba0 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoStreamReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoStreamReduceTest.java
@@ -24,7 +24,7 @@ import java.util.List;
 
 import org.apache.flink.streaming.api.function.co.CoReduceFunction;
 import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable;
-import org.apache.flink.streaming.util.MockCoInvokable;
+import org.apache.flink.streaming.util.MockCoContext;
 import org.junit.Test;
 
 public class CoStreamReduceTest {
@@ -62,7 +62,7 @@ public class CoStreamReduceTest {
 				new MyCoReduceFunction());
 
 		List<Integer> expected1 = Arrays.asList(1, 9, 2, 99, 6, 998, 24);
-		List<Integer> result = MockCoInvokable.createAndExecute(coReduce,
+		List<Integer> result = MockCoContext.createAndExecute(coReduce,
 				Arrays.asList(1, 2, 3, 4), Arrays.asList("9", "9", "8"));
 
 		assertEquals(expected1, result);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java
index 4604b27..90ad483 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java
@@ -28,7 +28,7 @@ import java.util.List;
 import org.apache.flink.streaming.api.function.co.CoReduceFunction;
 import org.apache.flink.streaming.api.invokable.operator.co.CoWindowReduceInvokable;
 import org.apache.flink.streaming.api.invokable.util.TimeStamp;
-import org.apache.flink.streaming.util.MockCoInvokable;
+import org.apache.flink.streaming.util.MockCoContext;
 import org.junit.Test;
 
 public class CoWindowReduceTest {
@@ -114,7 +114,7 @@ public class CoWindowReduceTest {
 		expected.add("abcde");
 		expected.add("fghi");
 
-		List<String> result = MockCoInvokable.createAndExecute(invokable, inputs, inputs2);
+		List<String> result = MockCoContext.createAndExecute(invokable, inputs, inputs2);
 
 		Collections.sort(result);
 		Collections.sort(expected);
@@ -160,7 +160,7 @@ public class CoWindowReduceTest {
 		expected.add("fgh");
 		expected.add("hi");
 
-		List<String> result = MockCoInvokable.createAndExecute(invokable, inputs, inputs2);
+		List<String> result = MockCoContext.createAndExecute(invokable, inputs, inputs2);
 
 		Collections.sort(result);
 		Collections.sort(expected);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java
index ebdd963..c6d446a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java
@@ -28,7 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.function.co.CoWindowFunction;
 import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable;
 import org.apache.flink.streaming.api.invokable.util.TimeStamp;
-import org.apache.flink.streaming.util.MockCoInvokable;
+import org.apache.flink.streaming.util.MockCoContext;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
@@ -148,7 +148,7 @@ public class CoWindowTest {
 		expected1.add(0);
 		expected1.add(1);
 
-		List<Integer> actual1 = MockCoInvokable.createAndExecute(invokable1, input11, input12);
+		List<Integer> actual1 = MockCoContext.createAndExecute(invokable1, input11, input12);
 		assertEquals(expected1, actual1);
 
 		CoWindowInvokable<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer> invokable2 = new CoWindowInvokable<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer>(
@@ -183,7 +183,7 @@ public class CoWindowTest {
 		expected2.add(8);
 		expected2.add(7);
 
-		List<Integer> actual2 = MockCoInvokable.createAndExecute(invokable2, input21, input22);
+		List<Integer> actual2 = MockCoContext.createAndExecute(invokable2, input21, input22);
 		assertEquals(expected2, actual2);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java
index 70b9d7e..969a06b 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java
@@ -22,7 +22,7 @@ import static org.junit.Assert.assertEquals;
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.flink.streaming.util.MockInvokable;
+import org.apache.flink.streaming.util.MockContext;
 import org.junit.Test;
 
 public class CounterInvokableTest {
@@ -32,7 +32,7 @@ public class CounterInvokableTest {
 		CounterInvokable<String> invokable = new CounterInvokable<String>();
 
 		List<Long> expected = Arrays.asList(1L, 2L, 3L);
-		List<Long> actual = MockInvokable.createAndExecute(invokable, Arrays.asList("one", "two", "three"));
+		List<Long> actual = MockContext.createAndExecute(invokable, Arrays.asList("one", "two", "three"));
 		
 		assertEquals(expected, actual);
 	}


[2/5] incubator-flink git commit: [dist] Updated the assembly of the examples subdirectory

Posted by mb...@apache.org.
[dist] Updated the assembly of the examples subdirectory

Excluded the scala example jars
Excluded the example source code subdirectories

This closes #274


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ff05df90
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ff05df90
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ff05df90

Branch: refs/heads/master
Commit: ff05df90c2d97acb4a98da5cac9e07a0f37825d9
Parents: c5e9a51
Author: mbalassi <mb...@apache.org>
Authored: Thu Dec 18 00:28:07 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Thu Dec 18 20:07:27 2014 +0100

----------------------------------------------------------------------
 flink-dist/src/main/assemblies/bin.xml  | 30 ----------------------------
 flink-dist/src/main/assemblies/yarn.xml | 14 -------------
 2 files changed, 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ff05df90/flink-dist/src/main/assemblies/bin.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/assemblies/bin.xml b/flink-dist/src/main/assemblies/bin.xml
index 9ad8e47..31f80a6 100644
--- a/flink-dist/src/main/assemblies/bin.xml
+++ b/flink-dist/src/main/assemblies/bin.xml
@@ -132,36 +132,6 @@ under the License.
 			</excludes>
 		</fileSet>
 
-		<fileSet>
-			<!-- copy jar files of scala examples -->
-			<directory>../flink-examples/flink-scala-examples/target</directory>
-			<outputDirectory>examples</outputDirectory>
-			<fileMode>0644</fileMode>
-			<includes>
-				<include>*.jar</include>
-			</includes>
-			<excludes>
-				<exclude>flink-scala-examples-${project.version}.jar</exclude>
-				<exclude>original-flink-scala-examples-${project.version}.jar</exclude>
-				<exclude>flink-scala-examples-${project.version}-sources.jar</exclude>
-				<exclude>flink-scala-examples-${project.version}-javadoc.jar</exclude>
-			</excludes>
-		</fileSet>
-
-		<fileSet>
-			<!-- copy java examples src -->
-			<directory>../flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java</directory>
-			<outputDirectory>examples/java-src/</outputDirectory>
-			<fileMode>0644</fileMode>
-		</fileSet>
-
-		<fileSet>
-			<!-- copy scala examples src -->
-			<directory>../flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala</directory>
-			<outputDirectory>examples/scala-src/</outputDirectory>
-			<fileMode>0644</fileMode>
-		</fileSet>
-
 	</fileSets>
 
 </assembly>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ff05df90/flink-dist/src/main/assemblies/yarn.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/assemblies/yarn.xml b/flink-dist/src/main/assemblies/yarn.xml
index 7d49b01..1580175 100644
--- a/flink-dist/src/main/assemblies/yarn.xml
+++ b/flink-dist/src/main/assemblies/yarn.xml
@@ -92,20 +92,6 @@ under the License.
 			</excludes>
 		</fileSet>
 
-		<fileSet>
-			<!-- copy jar files of scala examples -->
-			<directory>../flink-examples/flink-scala-examples/target</directory>
-			<outputDirectory>examples</outputDirectory>
-			<fileMode>0644</fileMode>
-			<includes>
-				<include>*.jar</include>
-			</includes>
-			<excludes>
-				<exclude>flink-scala-examples-${project.version}.jar</exclude>
-				<exclude>original-flink-scala-examples-${project.version}.jar</exclude>
-				<exclude>flink-scala-examples-${project.version}-sources.jar</exclude>
-			</excludes>
-		</fileSet>
 	</fileSets>
 
 	<files>


[4/5] incubator-flink git commit: [streaming] StreamInvokable rework for simpler logic and easier use

Posted by mb...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
index e06e0ef..403dd17 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
@@ -24,7 +24,7 @@ import java.util.Arrays;
 import java.util.List;
 
 import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.streaming.util.MockInvokable;
+import org.apache.flink.streaming.util.MockContext;
 import org.junit.Test;
 
 public class FilterTest implements Serializable {
@@ -44,7 +44,7 @@ public class FilterTest implements Serializable {
 		FilterInvokable<Integer> invokable = new FilterInvokable<Integer>(new MyFilter());
 
 		List<Integer> expected = Arrays.asList(2, 4, 6);
-		List<Integer> actual = MockInvokable.createAndExecute(invokable, Arrays.asList(1, 2, 3, 4, 5, 6, 7));
+		List<Integer> actual = MockContext.createAndExecute(invokable, Arrays.asList(1, 2, 3, 4, 5, 6, 7));
 		
 		assertEquals(expected, actual);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
index a89de50..7424e21 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
@@ -23,7 +23,7 @@ import java.util.Arrays;
 import java.util.List;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.streaming.util.MockInvokable;
+import org.apache.flink.streaming.util.MockContext;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
@@ -47,7 +47,7 @@ public class FlatMapTest {
 		FlatMapInvokable<Integer, Integer> invokable = new FlatMapInvokable<Integer, Integer>(new MyFlatMap());
 		
 		List<Integer> expected = Arrays.asList(2, 4, 4, 16, 6, 36, 8, 64);
-		List<Integer> actual = MockInvokable.createAndExecute(invokable, Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8));
+		List<Integer> actual = MockContext.createAndExecute(invokable, Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8));
 		
 		assertEquals(expected, actual);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java
index 0b68207..ceaccf3 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java
@@ -23,7 +23,7 @@ import java.util.Arrays;
 import java.util.List;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.streaming.util.MockInvokable;
+import org.apache.flink.streaming.util.MockContext;
 import org.apache.flink.streaming.util.keys.ObjectKeySelector;
 import org.junit.Test;
 
@@ -46,7 +46,7 @@ public class GroupedReduceInvokableTest {
 				new MyReducer(), new ObjectKeySelector<Integer>());
 
 		List<Integer> expected = Arrays.asList(1, 2, 2, 4, 3);
-		List<Integer> actual = MockInvokable.createAndExecute(invokable1,
+		List<Integer> actual = MockContext.createAndExecute(invokable1,
 				Arrays.asList(1, 1, 2, 2, 3));
 
 		assertEquals(expected, actual);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java
index 5ac5529..c3a48d5 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java
@@ -39,7 +39,7 @@ import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
-import org.apache.flink.streaming.util.MockInvokable;
+import org.apache.flink.streaming.util.MockContext;
 import org.apache.flink.streaming.util.keys.TupleKeySelector;
 import org.junit.Test;
 
@@ -206,7 +206,7 @@ public class GroupedWindowInvokableTest {
 		GroupedWindowInvokable<Integer, Integer> invokable = new GroupedWindowInvokable<Integer, Integer>(
 				reduceFunction, keySelector, triggers, evictions, centralTriggers, null);
 
-		List<Integer> result = MockInvokable.createAndExecute(invokable, inputs);
+		List<Integer> result = MockContext.createAndExecute(invokable, inputs);
 
 		List<Integer> actual = new LinkedList<Integer>();
 		for (Integer current : result) {
@@ -225,7 +225,7 @@ public class GroupedWindowInvokableTest {
 		invokable = new GroupedWindowInvokable<Integer, Integer>(
 				reduceFunction, keySelector, triggers, null, centralTriggers,centralEvictions);
 		
-		result = MockInvokable.createAndExecute(invokable, inputs);
+		result = MockContext.createAndExecute(invokable, inputs);
 		actual = new LinkedList<Integer>();
 		for (Integer current : result) {
 			actual.add(current);
@@ -282,7 +282,7 @@ public class GroupedWindowInvokableTest {
 				}, new TupleKeySelector<Tuple2<Integer, String>>(1), triggers, evictions,
 				centralTriggers, null);
 
-		List<Tuple2<Integer, String>> result = MockInvokable.createAndExecute(invokable2, inputs2);
+		List<Tuple2<Integer, String>> result = MockContext.createAndExecute(invokable2, inputs2);
 
 		List<Tuple2<Integer, String>> actual2 = new LinkedList<Tuple2<Integer, String>>();
 		for (Tuple2<Integer, String> current : result) {
@@ -391,7 +391,7 @@ public class GroupedWindowInvokableTest {
 				distributedTriggers, evictions, triggers, null);
 
 		ArrayList<Tuple2<Integer, String>> result = new ArrayList<Tuple2<Integer, String>>();
-		for (Tuple2<Integer, String> t : MockInvokable.createAndExecute(invokable, inputs)) {
+		for (Tuple2<Integer, String> t : MockContext.createAndExecute(invokable, inputs)) {
 			result.add(t);
 		}
 
@@ -411,7 +411,7 @@ public class GroupedWindowInvokableTest {
 				distributedTriggers, evictions, triggers, centralEvictions);
 
 		result = new ArrayList<Tuple2<Integer, String>>();
-		for (Tuple2<Integer, String> t : MockInvokable.createAndExecute(invokable, inputs)) {
+		for (Tuple2<Integer, String> t : MockContext.createAndExecute(invokable, inputs)) {
 			result.add(t);
 		}
 
@@ -480,7 +480,7 @@ public class GroupedWindowInvokableTest {
 				}, distributedTriggers, evictions, triggers, null);
 
 		ArrayList<Integer> result = new ArrayList<Integer>();
-		for (Integer t : MockInvokable.createAndExecute(invokable, inputs)) {
+		for (Integer t : MockContext.createAndExecute(invokable, inputs)) {
 			result.add(t);
 		}
 
@@ -556,7 +556,7 @@ public class GroupedWindowInvokableTest {
 				}, distributedTriggers, evictions, triggers, null);
 
 		ArrayList<Integer> result = new ArrayList<Integer>();
-		for (Integer t : MockInvokable.createAndExecute(invokable, inputs)) {
+		for (Integer t : MockContext.createAndExecute(invokable, inputs)) {
 			result.add(t);
 		}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
index 7124ff8..5390ec9 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
@@ -23,7 +23,7 @@ import java.util.Arrays;
 import java.util.List;
 
 import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.streaming.util.MockInvokable;
+import org.apache.flink.streaming.util.MockContext;
 import org.junit.Test;
 
 public class MapTest {
@@ -42,7 +42,7 @@ public class MapTest {
 		MapInvokable<Integer, String> invokable = new MapInvokable<Integer, String>(new Map());
 		
 		List<String> expectedList = Arrays.asList("+2", "+3", "+4");
-		List<String> actualList = MockInvokable.createAndExecute(invokable, Arrays.asList(1, 2, 3));
+		List<String> actualList = MockContext.createAndExecute(invokable, Arrays.asList(1, 2, 3));
 		
 		assertEquals(expectedList, actualList);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java
index 288d4ee..11c44cd 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java
@@ -28,7 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.datastream.StreamProjection;
-import org.apache.flink.streaming.util.MockInvokable;
+import org.apache.flink.streaming.util.MockContext;
 import org.junit.Test;
 
 public class ProjectTest implements Serializable {
@@ -62,6 +62,6 @@ public class ProjectTest implements Serializable {
 		expected.add(new Tuple3<Integer, Integer, String>(2, 2, "c"));
 		expected.add(new Tuple3<Integer, Integer, String>(7, 7, "a"));
 
-		assertEquals(expected, MockInvokable.createAndExecute(invokable, input));
+		assertEquals(expected, MockContext.createAndExecute(invokable, input));
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java
index 68b8f8e..ae866e6 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java
@@ -23,7 +23,7 @@ import java.util.Arrays;
 import java.util.List;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.streaming.util.MockInvokable;
+import org.apache.flink.streaming.util.MockContext;
 import org.junit.Test;
 
 public class StreamReduceTest {
@@ -45,7 +45,7 @@ public class StreamReduceTest {
 				new MyReducer());
 
 		List<Integer> expected = Arrays.asList(1,2,4,7,10);
-		List<Integer> actual = MockInvokable.createAndExecute(invokable1,
+		List<Integer> actual = MockContext.createAndExecute(invokable1,
 				Arrays.asList(1, 1, 2, 3, 3));
 
 		assertEquals(expected, actual);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java
index 612da84..421a999 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java
@@ -31,7 +31,7 @@ import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-import org.apache.flink.streaming.util.MockInvokable;
+import org.apache.flink.streaming.util.MockContext;
 import org.junit.Test;
 
 public class WindowInvokableTest {
@@ -98,7 +98,7 @@ public class WindowInvokableTest {
 				myReduceFunction, triggers, evictions);
 
 		ArrayList<Integer> result = new ArrayList<Integer>();
-		for (Integer t : MockInvokable.createAndExecute(invokable, inputs)) {
+		for (Integer t : MockContext.createAndExecute(invokable, inputs)) {
 			result.add(t);
 		}
 
@@ -148,7 +148,7 @@ public class WindowInvokableTest {
 		expected.add(24);
 		expected.add(19);
 		List<Integer> result = new ArrayList<Integer>();
-		for (Integer t : MockInvokable.createAndExecute(invokable, inputs)) {
+		for (Integer t : MockContext.createAndExecute(invokable, inputs)) {
 			result.add(t);
 		}
 		assertEquals(expected, result);
@@ -199,7 +199,7 @@ public class WindowInvokableTest {
 		expected2.add(-4);
 
 		result = new ArrayList<Integer>();
-		for (Integer t : MockInvokable.createAndExecute(invokable2, inputs2)) {
+		for (Integer t : MockContext.createAndExecute(invokable2, inputs2)) {
 			result.add(t);
 		}
 
@@ -253,7 +253,7 @@ public class WindowInvokableTest {
 				myReduceFunction, triggers, evictions);
 
 		ArrayList<Integer> result = new ArrayList<Integer>();
-		for (Integer t : MockInvokable.createAndExecute(invokable, inputs)) {
+		for (Integer t : MockContext.createAndExecute(invokable, inputs)) {
 			result.add(t);
 		}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
new file mode 100644
index 0000000..ea94f98
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.StreamConfig;
+import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.api.streamvertex.StreamTaskContext;
+import org.apache.flink.streaming.io.CoReaderIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+public class MockCoContext<IN1, IN2, OUT> implements StreamTaskContext<OUT> {
+	// private Collection<IN1> input1;
+	// private Collection<IN2> input2;
+	private Iterator<IN1> inputIterator1;
+	private Iterator<IN2> inputIterator2;
+	private List<OUT> outputs;
+
+	private Collector<OUT> collector;
+	private StreamRecordSerializer<IN1> inDeserializer1;
+	private CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> mockIterator;
+	private StreamRecordSerializer<IN2> inDeserializer2;
+
+	public MockCoContext(Collection<IN1> input1, Collection<IN2> input2) {
+
+		if (input1.isEmpty() || input2.isEmpty()) {
+			throw new RuntimeException("Inputs must not be empty");
+		}
+
+		this.inputIterator1 = input1.iterator();
+		this.inputIterator2 = input2.iterator();
+
+		TypeInformation<IN1> inTypeInfo1 = TypeExtractor.getForObject(input1.iterator().next());
+		inDeserializer1 = new StreamRecordSerializer<IN1>(inTypeInfo1);
+		TypeInformation<IN2> inTypeInfo2 = TypeExtractor.getForObject(input2.iterator().next());
+		inDeserializer2 = new StreamRecordSerializer<IN2>(inTypeInfo2);
+
+		mockIterator = new MockCoReaderIterator(inDeserializer1, inDeserializer2);
+
+		outputs = new ArrayList<OUT>();
+		collector = new MockCollector<OUT>(outputs);
+	}
+
+	private int currentInput = 1;
+	private StreamRecord<IN1> reuse1;
+	private StreamRecord<IN2> reuse2;
+
+	private class MockCoReaderIterator extends
+			CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> {
+
+		public MockCoReaderIterator(TypeSerializer<StreamRecord<IN1>> serializer1,
+				TypeSerializer<StreamRecord<IN2>> serializer2) {
+			super(null, serializer1, serializer2);
+			reuse1 = inDeserializer1.createInstance();
+			reuse2 = inDeserializer2.createInstance();
+		}
+
+		@Override
+		public int next(StreamRecord<IN1> target1, StreamRecord<IN2> target2) throws IOException {
+			this.delegate1.setInstance(target1);
+			this.delegate2.setInstance(target2);
+
+			int inputNumber = nextRecord();
+			target1.setObject(reuse1.getObject());
+			target2.setObject(reuse2.getObject());
+
+			return inputNumber;
+		}
+	}
+
+	private Integer nextRecord() {
+		if (inputIterator1.hasNext() && inputIterator2.hasNext()) {
+			switch (currentInput) {
+			case 1:
+				return next1();
+			case 2:
+				return next2();
+			default:
+				return 0;
+			}
+		}
+
+		if (inputIterator1.hasNext()) {
+			return next1();
+		}
+
+		if (inputIterator2.hasNext()) {
+			return next2();
+		}
+
+		return 0;
+	}
+
+	private int next1() {
+		reuse1 = inDeserializer1.createInstance();
+		reuse1.setObject(inputIterator1.next());
+		currentInput = 2;
+		return 1;
+	}
+
+	private int next2() {
+		reuse2 = inDeserializer2.createInstance();
+		reuse2.setObject(inputIterator2.next());
+		currentInput = 1;
+		return 2;
+	}
+
+	public List<OUT> getOutputs() {
+		return outputs;
+	}
+
+	public Collector<OUT> getCollector() {
+		return collector;
+	}
+
+	public StreamRecordSerializer<IN1> getInDeserializer1() {
+		return inDeserializer1;
+	}
+
+	public StreamRecordSerializer<IN2> getInDeserializer2() {
+		return inDeserializer2;
+	}
+
+	public CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> getIterator() {
+		return mockIterator;
+	}
+
+	public static <IN1, IN2, OUT> List<OUT> createAndExecute(CoInvokable<IN1, IN2, OUT> invokable,
+			List<IN1> input1, List<IN2> input2) {
+		MockCoContext<IN1, IN2, OUT> mockContext = new MockCoContext<IN1, IN2, OUT>(input1, input2);
+		invokable.setup(mockContext);
+
+		try {
+			invokable.open(null);
+			invokable.invoke();
+			invokable.close();
+		} catch (Exception e) {
+			throw new RuntimeException("Cannot invoke invokable.", e);
+		}
+
+		return mockContext.getOutputs();
+	}
+
+	@Override
+	public StreamConfig getConfig() {
+		return null;
+	}
+
+	@Override
+	public ClassLoader getUserCodeClassLoader() {
+		return null;
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public <X> MutableObjectIterator<X> getInput(int index) {
+		switch (index) {
+		case 0:
+			return (MutableObjectIterator<X>) inputIterator1;
+		case 1:
+			return (MutableObjectIterator<X>) inputIterator2;
+		default:
+			throw new IllegalArgumentException("CoStreamVertex has only 2 inputs");
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public <X> StreamRecordSerializer<X> getInputSerializer(int index) {
+		switch (index) {
+		case 0:
+			return (StreamRecordSerializer<X>) inDeserializer1;
+		case 1:
+			return (StreamRecordSerializer<X>) inDeserializer2;
+		default:
+			throw new IllegalArgumentException("CoStreamVertex has only 2 inputs");
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public <X, Y> CoReaderIterator<X, Y> getCoReader() {
+		return (CoReaderIterator<X, Y>) mockIterator;
+	}
+
+	@Override
+	public Collector<OUT> getOutputCollector() {
+		return collector;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoInvokable.java
deleted file mode 100644
index 39d3ab4..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoInvokable.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.io.CoReaderIterator;
-import org.apache.flink.util.Collector;
-
-public class MockCoInvokable<IN1, IN2, OUT> {
-	// private Collection<IN1> input1;
-	// private Collection<IN2> input2;
-	private Iterator<IN1> inputIterator1;
-	private Iterator<IN2> inputIterator2;
-	private List<OUT> outputs;
-
-	private Collector<OUT> collector;
-	private StreamRecordSerializer<IN1> inDeserializer1;
-	private CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> mockIterator;
-	private StreamRecordSerializer<IN2> inDeserializer2;
-
-	public MockCoInvokable(Collection<IN1> input1, Collection<IN2> input2) {
-
-		if (input1.isEmpty() || input2.isEmpty()) {
-			throw new RuntimeException("Inputs must not be empty");
-		}
-
-		this.inputIterator1 = input1.iterator();
-		this.inputIterator2 = input2.iterator();
-
-		TypeInformation<IN1> inTypeInfo1 = TypeExtractor.getForObject(input1.iterator().next());
-		inDeserializer1 = new StreamRecordSerializer<IN1>(inTypeInfo1);
-		TypeInformation<IN2> inTypeInfo2 = TypeExtractor.getForObject(input2.iterator().next());
-		inDeserializer2 = new StreamRecordSerializer<IN2>(inTypeInfo2);
-
-		mockIterator = new MockCoReaderIterator(inDeserializer1, inDeserializer2);
-
-		outputs = new ArrayList<OUT>();
-		collector = new MockCollector<OUT>(outputs);
-	}
-
-	private int currentInput = 1;
-	private StreamRecord<IN1> reuse1;
-	private StreamRecord<IN2> reuse2;
-	
-	private class MockCoReaderIterator extends
-			CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> {
-
-		public MockCoReaderIterator(
-				TypeSerializer<StreamRecord<IN1>> serializer1,
-				TypeSerializer<StreamRecord<IN2>> serializer2) {
-			super(null, serializer1, serializer2);
-			reuse1 = inDeserializer1.createInstance();
-			reuse2 = inDeserializer2.createInstance();
-		}
-
-		@Override
-		public int next(StreamRecord<IN1> target1, StreamRecord<IN2> target2) throws IOException {
-			this.delegate1.setInstance(target1);
-			this.delegate2.setInstance(target2);
-			
-			int inputNumber = nextRecord();
-			target1.setObject(reuse1.getObject());
-			target2.setObject(reuse2.getObject());
-			
-			return inputNumber;
-		}
-	}
-
-	private Integer nextRecord() {
-		if (inputIterator1.hasNext() && inputIterator2.hasNext()) {
-			switch (currentInput) {
-			case 1:
-				return next1();
-			case 2:
-				return next2();
-			default:
-				return 0;
-			}
-		}
-
-		if (inputIterator1.hasNext()) {
-			return next1();
-		}
-
-		if (inputIterator2.hasNext()) {
-			return next2();
-		}
-
-		return 0;
-	}
-
-	private int next1() {
-		reuse1 = inDeserializer1.createInstance();
-		reuse1.setObject(inputIterator1.next());
-		currentInput = 2;
-		return 1;
-	}
-
-	private int next2() {
-		reuse2 = inDeserializer2.createInstance();
-		reuse2.setObject(inputIterator2.next());
-		currentInput = 1;
-		return 2;
-	}
-
-	public List<OUT> getOutputs() {
-		return outputs;
-	}
-
-	public Collector<OUT> getCollector() {
-		return collector;
-	}
-
-	public StreamRecordSerializer<IN1> getInDeserializer1() {
-		return inDeserializer1;
-	}
-
-	public StreamRecordSerializer<IN2> getInDeserializer2() {
-		return inDeserializer2;
-	}
-
-	public CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> getIterator() {
-		return mockIterator;
-	}
-
-	public static <IN1, IN2, OUT> List<OUT> createAndExecute(CoInvokable<IN1, IN2, OUT> invokable,
-			List<IN1> input1, List<IN2> input2) {
-		MockCoInvokable<IN1, IN2, OUT> mock = new MockCoInvokable<IN1, IN2, OUT>(input1, input2);
-		invokable.initialize(mock.getCollector(), mock.getIterator(), mock.getInDeserializer1(),
-				mock.getInDeserializer2(), false);
-
-		try {
-			invokable.open(null);
-			invokable.invoke();
-			invokable.close();
-		} catch (Exception e) {
-			throw new RuntimeException("Cannot invoke invokable.", e);
-		}
-
-		return mock.getOutputs();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
new file mode 100644
index 0000000..87bedb2
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.StreamConfig;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.api.streamvertex.StreamTaskContext;
+import org.apache.flink.streaming.io.CoReaderIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+public class MockContext<IN, OUT> implements StreamTaskContext<OUT> {
+	private Collection<IN> inputs;
+	private List<OUT> outputs;
+
+	private Collector<OUT> collector;
+	private StreamRecordSerializer<IN> inDeserializer;
+	private MutableObjectIterator<StreamRecord<IN>> iterator;
+
+	public MockContext(Collection<IN> inputs) {
+		this.inputs = inputs;
+		if (inputs.isEmpty()) {
+			throw new RuntimeException("Inputs must not be empty");
+		}
+
+		TypeInformation<IN> inTypeInfo = TypeExtractor.getForObject(inputs.iterator().next());
+		inDeserializer = new StreamRecordSerializer<IN>(inTypeInfo);
+
+		iterator = new MockInputIterator();
+		outputs = new ArrayList<OUT>();
+		collector = new MockCollector<OUT>(outputs);
+	}
+
+	private class MockInputIterator implements MutableObjectIterator<StreamRecord<IN>> {
+		Iterator<IN> listIterator;
+
+		public MockInputIterator() {
+			listIterator = inputs.iterator();
+		}
+
+		@Override
+		public StreamRecord<IN> next(StreamRecord<IN> reuse) throws IOException {
+			if (listIterator.hasNext()) {
+				reuse.setObject(listIterator.next());
+			} else {
+				reuse = null;
+			}
+			return reuse;
+		}
+	}
+
+	public List<OUT> getOutputs() {
+		return outputs;
+	}
+
+	public Collector<OUT> getCollector() {
+		return collector;
+	}
+
+	public StreamRecordSerializer<IN> getInDeserializer() {
+		return inDeserializer;
+	}
+
+	public MutableObjectIterator<StreamRecord<IN>> getIterator() {
+		return iterator;
+	}
+
+	public static <IN, OUT> List<OUT> createAndExecute(StreamInvokable<IN, OUT> invokable,
+			List<IN> inputs) {
+		MockContext<IN, OUT> mockContext = new MockContext<IN, OUT>(inputs);
+		invokable.setup(mockContext);
+		try {
+			invokable.open(null);
+			invokable.invoke();
+			invokable.close();
+		} catch (Exception e) {
+			throw new RuntimeException("Cannot invoke invokable.", e);
+		}
+
+		return mockContext.getOutputs();
+	}
+
+	@Override
+	public StreamConfig getConfig() {
+		return null;
+	}
+
+	@Override
+	public ClassLoader getUserCodeClassLoader() {
+		return null;
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public <X> MutableObjectIterator<X> getInput(int index) {
+		if (index == 0) {
+			return (MutableObjectIterator<X>) iterator;
+		} else {
+			throw new IllegalArgumentException("There is only 1 input");
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public <X> StreamRecordSerializer<X> getInputSerializer(int index) {
+		if (index == 0) {
+			return (StreamRecordSerializer<X>) inDeserializer;
+		} else {
+			throw new IllegalArgumentException("There is only 1 input");
+		}
+	}
+
+	@Override
+	public Collector<OUT> getOutputCollector() {
+		return collector;
+	}
+
+	@Override
+	public <X, Y> CoReaderIterator<X, Y> getCoReader() {
+		throw new IllegalArgumentException("CoReader not available");
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java
deleted file mode 100644
index c06f53a..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-
-public class MockInvokable<IN, OUT> {
-	private Collection<IN> inputs;
-	private List<OUT> outputs;
-
-	private Collector<OUT> collector;
-	private StreamRecordSerializer<IN> inDeserializer;
-	private MutableObjectIterator<StreamRecord<IN>> iterator;
-
-	public MockInvokable(Collection<IN> inputs) {
-		this.inputs = inputs;
-		if (inputs.isEmpty()) {
-			throw new RuntimeException("Inputs must not be empty");
-		}
-
-		TypeInformation<IN> inTypeInfo = TypeExtractor.getForObject(inputs.iterator().next());
-		inDeserializer = new StreamRecordSerializer<IN>(inTypeInfo);
-		
-		iterator = new MockInputIterator();
-		outputs = new ArrayList<OUT>();
-		collector = new MockCollector<OUT>(outputs);
-	}
-
-
-	private class MockInputIterator implements MutableObjectIterator<StreamRecord<IN>> {
-		Iterator<IN> listIterator;
-		
-		public MockInputIterator() {
-			listIterator = inputs.iterator();
-		}
-
-		@Override
-		public StreamRecord<IN> next(StreamRecord<IN> reuse) throws IOException {
-			if (listIterator.hasNext()) {
-				reuse.setObject(listIterator.next());
-			} else {
-				reuse = null;
-			}
-			return reuse;
-		}
-	}
-
-	public List<OUT> getOutputs() {
-		return outputs;
-	}
-
-	public Collector<OUT> getCollector() {
-		return collector;
-	}
-
-	public StreamRecordSerializer<IN> getInDeserializer() {
-		return inDeserializer;
-	}
-
-	public MutableObjectIterator<StreamRecord<IN>> getIterator() {
-		return iterator;
-	}
-
-	public static <IN, OUT> List<OUT> createAndExecute(StreamInvokable<IN, OUT> invokable, List<IN> inputs) {
-		MockInvokable<IN, OUT> mock = new MockInvokable<IN, OUT>(inputs);
-		invokable.initialize(mock.getCollector(), mock.getIterator(), mock.getInDeserializer(), false);
-		try {
-			invokable.open(null);
-			invokable.invoke();
-			invokable.close();
-		} catch (Exception e) {
-			throw new RuntimeException("Cannot invoke invokable.", e);
-		}
-		
-		return mock.getOutputs();
-	}
-	
-}