You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/09/24 21:51:40 UTC

[07/12] git commit: [streaming] Streaming jobgraph and vertex refactor to match recent runtime changes

[streaming] Streaming jobgraph and vertex refactor to match recent runtime changes


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/73371101
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/73371101
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/73371101

Branch: refs/heads/master
Commit: 73371101e9edd16f2823ce8af0b27283e3ed3264
Parents: 7cc2400
Author: Gyula Fora <gy...@apache.org>
Authored: Sun Sep 21 22:58:18 2014 +0200
Committer: mbalassi <ba...@gmail.com>
Committed: Wed Sep 24 19:54:39 2014 +0200

----------------------------------------------------------------------
 .../flink/streaming/api/JobGraphBuilder.java    | 519 ++++++++-----------
 .../flink/streaming/api/StreamConfig.java       |  25 +-
 .../api/datastream/BatchedDataStream.java       |   4 +-
 .../streaming/api/datastream/DataStream.java    |  30 +-
 .../api/datastream/IterativeDataStream.java     |   2 +-
 .../environment/StreamExecutionEnvironment.java |  16 +-
 .../streaming/api/invokable/SinkInvokable.java  |   2 +-
 .../api/invokable/SourceInvokable.java          |  14 +-
 .../api/invokable/StreamInvokable.java          | 100 +++-
 .../api/invokable/StreamOperatorInvokable.java  | 119 -----
 .../operator/BatchGroupReduceInvokable.java     |   4 +-
 .../operator/BatchReduceInvokable.java          |   4 +-
 .../invokable/operator/CounterInvokable.java    |   4 +-
 .../api/invokable/operator/FilterInvokable.java |   4 +-
 .../invokable/operator/FlatMapInvokable.java    |   4 +-
 .../api/invokable/operator/MapInvokable.java    |   4 +-
 .../operator/StreamReduceInvokable.java         |   4 +-
 .../api/invokable/operator/co/CoInvokable.java  |  17 +-
 .../AbstractStreamComponent.java                |  71 ---
 .../api/streamcomponent/CoStreamTask.java       | 114 ----
 .../api/streamcomponent/InputHandler.java       | 104 ----
 .../api/streamcomponent/OutputHandler.java      | 179 -------
 .../StreamComponentException.java               |  68 ---
 .../streamcomponent/StreamIterationSink.java    | 104 ----
 .../streamcomponent/StreamIterationSource.java  | 100 ----
 .../api/streamcomponent/StreamSink.java         |  61 ---
 .../api/streamcomponent/StreamSource.java       |  52 --
 .../api/streamcomponent/StreamTask.java         |  55 --
 .../api/streamvertex/CoStreamVertex.java        | 113 ++++
 .../api/streamvertex/InputHandler.java          | 109 ++++
 .../api/streamvertex/OutputHandler.java         | 181 +++++++
 .../api/streamvertex/StreamIterationHead.java   | 100 ++++
 .../api/streamvertex/StreamIterationTail.java   | 104 ++++
 .../api/streamvertex/StreamVertex.java          |  96 ++++
 .../api/streamvertex/StreamVertexException.java |  68 +++
 .../api/collector/StreamCollectorTest.java      |   6 +-
 .../api/streamcomponent/MockRecordWriter.java   |  45 --
 .../streamcomponent/StreamComponentTest.java    | 197 -------
 .../api/streamvertex/MockRecordWriter.java      |  45 ++
 .../api/streamvertex/StreamVertexTest.java      | 184 +++++++
 .../flink/streaming/util/MockInvokable.java     |   4 +-
 .../streaming/util/MockRecordWriterFactory.java |   2 +-
 .../examples/window/join/WindowJoinLocal.java   |   4 +-
 43 files changed, 1399 insertions(+), 1643 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index d8d4f2d..a04dbaa 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -19,32 +19,29 @@ package org.apache.flink.streaming.api;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.streaming.api.collector.OutputSelector;
-import org.apache.flink.streaming.api.invokable.SinkInvokable;
-import org.apache.flink.streaming.api.invokable.SourceInvokable;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
 import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
-import org.apache.flink.streaming.api.streamcomponent.CoStreamTask;
-import org.apache.flink.streaming.api.streamcomponent.StreamIterationSink;
-import org.apache.flink.streaming.api.streamcomponent.StreamIterationSource;
-import org.apache.flink.streaming.api.streamcomponent.StreamSink;
-import org.apache.flink.streaming.api.streamcomponent.StreamSource;
-import org.apache.flink.streaming.api.streamcomponent.StreamTask;
+import org.apache.flink.streaming.api.streamvertex.CoStreamVertex;
+import org.apache.flink.streaming.api.streamvertex.StreamIterationHead;
+import org.apache.flink.streaming.api.streamvertex.StreamIterationTail;
+import org.apache.flink.streaming.api.streamvertex.StreamVertex;
 import org.apache.flink.streaming.partitioner.ForwardPartitioner;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Object for building Apache Flink stream processing job graphs
@@ -55,8 +52,8 @@ public class JobGraphBuilder {
 	private final JobGraph jobGraph;
 
 	// Graph attributes
-	private Map<String, AbstractJobVertex> components;
-	private Map<String, Integer> componentParallelism;
+	private Map<String, AbstractJobVertex> streamVertices;
+	private Map<String, Integer> vertexParallelism;
 	private Map<String, Long> bufferTimeout;
 	private Map<String, List<String>> outEdgeList;
 	private Map<String, List<Integer>> outEdgeType;
@@ -66,17 +63,17 @@ public class JobGraphBuilder {
 	private Map<String, List<String>> inEdgeList;
 	private Map<String, List<StreamPartitioner<?>>> connectionTypes;
 	private Map<String, String> operatorNames;
-	private Map<String, StreamInvokable<?>> invokableObjects;
+	private Map<String, StreamInvokable<?, ?>> invokableObjects;
 	private Map<String, TypeSerializerWrapper<?>> typeWrapperIn1;
 	private Map<String, TypeSerializerWrapper<?>> typeWrapperIn2;
 	private Map<String, TypeSerializerWrapper<?>> typeWrapperOut1;
 	private Map<String, TypeSerializerWrapper<?>> typeWrapperOut2;
 	private Map<String, byte[]> serializedFunctions;
 	private Map<String, byte[]> outputSelectors;
-	private Map<String, Class<? extends AbstractInvokable>> componentClasses;
+	private Map<String, Class<? extends AbstractInvokable>> vertexClasses;
 	private Map<String, String> iterationIds;
-	private Map<String, String> iterationIDtoSourceName;
-	private Map<String, String> iterationIDtoSinkName;
+	private Map<String, String> iterationIDtoHeadName;
+	private Map<String, String> iterationIDtoTailName;
 	private Map<String, Integer> iterationTailCount;
 	private Map<String, Long> iterationWaitTime;
 
@@ -86,7 +83,6 @@ public class JobGraphBuilder {
 	/**
 	 * Creates an new {@link JobGraph} with the given name. A JobGraph is a DAG
 	 * and consists of sources, tasks (intermediate vertices) and sinks. A
-	 * JobGraph must contain at least a source and a sink.
 	 * 
 	 * @param jobGraphName
 	 *            Name of the JobGraph
@@ -95,8 +91,8 @@ public class JobGraphBuilder {
 
 		jobGraph = new JobGraph(jobGraphName);
 
-		components = new HashMap<String, AbstractJobVertex>();
-		componentParallelism = new HashMap<String, Integer>();
+		streamVertices = new HashMap<String, AbstractJobVertex>();
+		vertexParallelism = new HashMap<String, Integer>();
 		bufferTimeout = new HashMap<String, Long>();
 		outEdgeList = new HashMap<String, List<String>>();
 		outEdgeType = new HashMap<String, List<Integer>>();
@@ -106,17 +102,17 @@ public class JobGraphBuilder {
 		inEdgeList = new HashMap<String, List<String>>();
 		connectionTypes = new HashMap<String, List<StreamPartitioner<?>>>();
 		operatorNames = new HashMap<String, String>();
-		invokableObjects = new HashMap<String, StreamInvokable<?>>();
+		invokableObjects = new HashMap<String, StreamInvokable<?, ?>>();
 		typeWrapperIn1 = new HashMap<String, TypeSerializerWrapper<?>>();
 		typeWrapperIn2 = new HashMap<String, TypeSerializerWrapper<?>>();
 		typeWrapperOut1 = new HashMap<String, TypeSerializerWrapper<?>>();
 		typeWrapperOut2 = new HashMap<String, TypeSerializerWrapper<?>>();
 		serializedFunctions = new HashMap<String, byte[]>();
 		outputSelectors = new HashMap<String, byte[]>();
-		componentClasses = new HashMap<String, Class<? extends AbstractInvokable>>();
+		vertexClasses = new HashMap<String, Class<? extends AbstractInvokable>>();
 		iterationIds = new HashMap<String, String>();
-		iterationIDtoSourceName = new HashMap<String, String>();
-		iterationIDtoSinkName = new HashMap<String, String>();
+		iterationIDtoHeadName = new HashMap<String, String>();
+		iterationIDtoTailName = new HashMap<String, String>();
 		iterationTailCount = new HashMap<String, Integer>();
 		iterationWaitTime = new HashMap<String, Long>();
 
@@ -142,12 +138,16 @@ public class JobGraphBuilder {
 	}
 
 	/**
-	 * Adds source to the JobGraph with the given parameters
+	 * Adds a vertex to the streaming JobGraph with the given parameters
 	 * 
-	 * @param componentName
-	 *            Name of the component
-	 * @param InvokableObject
+	 * @param vertexName
+	 *            Name of the vertex
+	 * @param invokableObject
 	 *            User defined operator
+	 * @param inTypeWrapper
+	 *            Input type wrapper for serialization
+	 * @param outTypeWrapper
+	 *            Output type wrapper for serialization
 	 * @param operatorName
 	 *            Operator type
 	 * @param serializedFunction
@@ -155,25 +155,27 @@ public class JobGraphBuilder {
 	 * @param parallelism
 	 *            Number of parallel instances created
 	 */
-	public void addSource(String componentName, SourceInvokable<?> InvokableObject,
+	public <IN, OUT> void addStreamVertex(String vertexName,
+			StreamInvokable<IN, OUT> invokableObject, TypeSerializerWrapper<?> inTypeWrapper,
 			TypeSerializerWrapper<?> outTypeWrapper, String operatorName,
 			byte[] serializedFunction, int parallelism) {
 
-		addComponent(componentName, StreamSource.class, InvokableObject, operatorName,
+		addVertex(vertexName, StreamVertex.class, invokableObject, operatorName,
 				serializedFunction, parallelism);
-		addTypeWrappers(componentName, null, null, outTypeWrapper, null);
+
+		addTypeWrappers(vertexName, inTypeWrapper, null, outTypeWrapper, null);
 
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("SOURCE: {}", componentName);
+			LOG.debug("Vertex: {}", vertexName);
 		}
 	}
 
 	/**
-	 * Adds a source to the iteration head to the {@link JobGraph}. The iterated
-	 * tuples will be fed from this component back to the graph.
+	 * Adds a vertex for the iteration head to the {@link JobGraph}. The
+	 * iterated values will be fed from this vertex back to the graph.
 	 * 
-	 * @param componentName
-	 *            Name of the component
+	 * @param vertexName
+	 *            Name of the vertex
 	 * @param iterationHead
 	 *            Id of the iteration head
 	 * @param iterationID
@@ -183,143 +185,82 @@ public class JobGraphBuilder {
 	 * @param waitTime
 	 *            Max wait time for next record
 	 */
-	public void addIterationSource(String componentName, String iterationHead, String iterationID,
+	public void addIterationHead(String vertexName, String iterationHead, String iterationID,
 			int parallelism, long waitTime) {
 
-		addComponent(componentName, StreamIterationSource.class, null, null, null, parallelism);
+		addVertex(vertexName, StreamIterationHead.class, null, null, null, parallelism);
 
-		iterationIds.put(componentName, iterationID);
-		iterationIDtoSourceName.put(iterationID, componentName);
+		iterationIds.put(vertexName, iterationID);
+		iterationIDtoHeadName.put(iterationID, vertexName);
 
-		setBytesFrom(iterationHead, componentName);
+		setBytesFrom(iterationHead, vertexName);
 
-		setEdge(componentName, iterationHead,
-				connectionTypes.get(inEdgeList.get(iterationHead).get(0)).get(0), 0,
-				new ArrayList<String>(), false);
+		setEdge(vertexName, iterationHead, connectionTypes
+				.get(inEdgeList.get(iterationHead).get(0)).get(0), 0, new ArrayList<String>(),
+				false);
 
-		iterationWaitTime.put(iterationIDtoSourceName.get(iterationID), waitTime);
+		iterationWaitTime.put(iterationIDtoHeadName.get(iterationID), waitTime);
 
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("ITERATION SOURCE: {}", componentName);
+			LOG.debug("ITERATION SOURCE: {}", vertexName);
 		}
 	}
 
 	/**
-	 * Adds a task to the JobGraph with the given parameters
+	 * Adds a vertex for the iteration tail to the {@link JobGraph}. The values
+	 * intended to be iterated will be sent to this sink from the iteration
+	 * head.
 	 * 
-	 * @param componentName
-	 *            Name of the component
-	 * @param taskInvokableObject
-	 *            User defined operator
-	 * @param inTypeWrapper
-	 *            Input type wrapper for serialization
-	 * @param outTypeWrapper
-	 *            Output type wrapper for serialization
-	 * @param operatorName
-	 *            Operator type
-	 * @param serializedFunction
-	 *            Serialized udf
+	 * @param vertexName
+	 *            Name of the vertex
+	 * @param iterationTail
+	 *            Id of the iteration tail
+	 * @param iterationID
+	 *            ID of iteration for mulitple iterations
 	 * @param parallelism
 	 *            Number of parallel instances created
+	 * @param waitTime
+	 *            Max waiting time for next record
 	 */
-	public <IN, OUT> void addTask(String componentName,
-			StreamOperatorInvokable<IN, OUT> taskInvokableObject,
-			TypeSerializerWrapper<?> inTypeWrapper, TypeSerializerWrapper<?> outTypeWrapper,
-			String operatorName, byte[] serializedFunction, int parallelism) {
+	public void addIterationTail(String vertexName, String iterationTail, String iterationID,
+			int parallelism, long waitTime) {
 
-		addComponent(componentName, StreamTask.class, taskInvokableObject, operatorName,
-				serializedFunction, parallelism);
+		addVertex(vertexName, StreamIterationTail.class, null, null, null, parallelism);
+
+		iterationIds.put(vertexName, iterationID);
+		iterationIDtoTailName.put(iterationID, vertexName);
 
-		addTypeWrappers(componentName, inTypeWrapper, null, outTypeWrapper, null);
+		setBytesFrom(iterationTail, vertexName);
+		iterationWaitTime.put(iterationIDtoTailName.get(iterationID), waitTime);
 
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("TASK: {}", componentName);
+			LOG.debug("ITERATION SINK: {}", vertexName);
 		}
+
 	}
 
-	public <IN1, IN2, OUT> void addCoTask(String componentName,
+	public <IN1, IN2, OUT> void addCoTask(String vertexName,
 			CoInvokable<IN1, IN2, OUT> taskInvokableObject,
 			TypeSerializerWrapper<?> in1TypeWrapper, TypeSerializerWrapper<?> in2TypeWrapper,
 			TypeSerializerWrapper<?> outTypeWrapper, String operatorName,
 			byte[] serializedFunction, int parallelism) {
 
-		addComponent(componentName, CoStreamTask.class, taskInvokableObject, operatorName,
-				serializedFunction, parallelism);
-
-		addTypeWrappers(componentName, in1TypeWrapper, in2TypeWrapper, outTypeWrapper, null);
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("CO-TASK: {}", componentName);
-		}
-	}
-
-	/**
-	 * Adds sink to the JobGraph with the given parameters
-	 * 
-	 * @param componentName
-	 *            Name of the component
-	 * @param InvokableObject
-	 *            User defined operator
-	 * @param operatorName
-	 *            Operator type
-	 * @param serializedFunction
-	 *            Serialized udf
-	 * @param parallelism
-	 *            Number of parallel instances created
-	 */
-	public void addSink(String componentName, SinkInvokable<?> InvokableObject,
-			TypeSerializerWrapper<?> inTypeWrapper, String operatorName, byte[] serializedFunction,
-			int parallelism) {
-
-		addComponent(componentName, StreamSink.class, InvokableObject, operatorName,
+		addVertex(vertexName, CoStreamVertex.class, taskInvokableObject, operatorName,
 				serializedFunction, parallelism);
-		addTypeWrappers(componentName, inTypeWrapper, null, null, null);
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("SINK: {}", componentName);
-		}
-
-	}
-
-	/**
-	 * Adds a sink to an iteration tail to the {@link JobGraph}. The tuples
-	 * intended to be iterated will be sent to this sink from the iteration
-	 * head.
-	 * 
-	 * @param componentName
-	 *            Name of the component
-	 * @param iterationTail
-	 *            Id of the iteration tail
-	 * @param iterationID
-	 *            ID of iteration for mulitple iterations
-	 * @param parallelism
-	 *            Number of parallel instances created
-	 * @param waitTime
-	 *            Max waiting time for next record
-	 */
-	public void addIterationSink(String componentName, String iterationTail, String iterationID,
-			int parallelism, long waitTime) {
-
-		addComponent(componentName, StreamIterationSink.class, null, null, null, parallelism);
-
-		iterationIds.put(componentName, iterationID);
-		iterationIDtoSinkName.put(iterationID, componentName);
 
-		setBytesFrom(iterationTail, componentName);
-		iterationWaitTime.put(iterationIDtoSinkName.get(iterationID), waitTime);
+		addTypeWrappers(vertexName, in1TypeWrapper, in2TypeWrapper, outTypeWrapper, null);
 
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("ITERATION SINK: {}", componentName);
+			LOG.debug("CO-TASK: {}", vertexName);
 		}
-
 	}
 
 	/**
-	 * Sets component parameters in the JobGraph
+	 * Sets vertex parameters in the JobGraph
 	 * 
-	 * @param componentName
-	 *            Name of the component
-	 * @param componentClass
+	 * @param vertexName
+	 *            Name of the vertex
+	 * @param vertexClass
 	 *            The class of the vertex
 	 * @param invokableObject
 	 *            The user defined invokable object
@@ -330,114 +271,153 @@ public class JobGraphBuilder {
 	 * @param parallelism
 	 *            Number of parallel instances created
 	 */
-	private void addComponent(String componentName,
-			Class<? extends AbstractInvokable> componentClass, StreamInvokable<?> invokableObject,
-			String operatorName, byte[] serializedFunction, int parallelism) {
-
-		componentClasses.put(componentName, componentClass);
-		setParallelism(componentName, parallelism);
-		mutability.put(componentName, false);
-		invokableObjects.put(componentName, invokableObject);
-		operatorNames.put(componentName, operatorName);
-		serializedFunctions.put(componentName, serializedFunction);
-		outEdgeList.put(componentName, new ArrayList<String>());
-		outEdgeType.put(componentName, new ArrayList<Integer>());
-		outEdgeNames.put(componentName, new ArrayList<List<String>>());
-		outEdgeSelectAll.put(componentName, new ArrayList<Boolean>());
-		inEdgeList.put(componentName, new ArrayList<String>());
-		connectionTypes.put(componentName, new ArrayList<StreamPartitioner<?>>());
-		iterationTailCount.put(componentName, 0);
+	private void addVertex(String vertexName, Class<? extends AbstractInvokable> vertexClass,
+			StreamInvokable<?, ?> invokableObject, String operatorName, byte[] serializedFunction,
+			int parallelism) {
+
+		vertexClasses.put(vertexName, vertexClass);
+		setParallelism(vertexName, parallelism);
+		mutability.put(vertexName, false);
+		invokableObjects.put(vertexName, invokableObject);
+		operatorNames.put(vertexName, operatorName);
+		serializedFunctions.put(vertexName, serializedFunction);
+		outEdgeList.put(vertexName, new ArrayList<String>());
+		outEdgeType.put(vertexName, new ArrayList<Integer>());
+		outEdgeNames.put(vertexName, new ArrayList<List<String>>());
+		outEdgeSelectAll.put(vertexName, new ArrayList<Boolean>());
+		inEdgeList.put(vertexName, new ArrayList<String>());
+		connectionTypes.put(vertexName, new ArrayList<StreamPartitioner<?>>());
+		iterationTailCount.put(vertexName, 0);
 	}
 
-	private void addTypeWrappers(String componentName, TypeSerializerWrapper<?> in1,
+	private void addTypeWrappers(String vertexName, TypeSerializerWrapper<?> in1,
 			TypeSerializerWrapper<?> in2, TypeSerializerWrapper<?> out1,
 			TypeSerializerWrapper<?> out2) {
-		typeWrapperIn1.put(componentName, in1);
-		typeWrapperIn2.put(componentName, in2);
-		typeWrapperOut1.put(componentName, out1);
-		typeWrapperOut2.put(componentName, out2);
+		typeWrapperIn1.put(vertexName, in1);
+		typeWrapperIn2.put(vertexName, in2);
+		typeWrapperOut1.put(vertexName, out1);
+		typeWrapperOut2.put(vertexName, out2);
 	}
 
 	/**
 	 * Creates an {@link AbstractJobVertex} in the {@link JobGraph} and sets its
 	 * config parameters using the ones set previously.
 	 * 
-	 * @param componentName
-	 *            Name of the component for which the vertex will be created.
+	 * @param vertexName
+	 *            Name for which the vertex will be created.
 	 */
-	private void createVertex(String componentName) {
+	private void createVertex(String vertexName) {
 
 		// Get vertex attributes
-		Class<? extends AbstractInvokable> componentClass = componentClasses.get(componentName);
-		StreamInvokable<?> invokableObject = invokableObjects.get(componentName);
-		String operatorName = operatorNames.get(componentName);
-		byte[] serializedFunction = serializedFunctions.get(componentName);
-		int parallelism = componentParallelism.get(componentName);
-		byte[] outputSelector = outputSelectors.get(componentName);
+		Class<? extends AbstractInvokable> vertexClass = vertexClasses.get(vertexName);
+		StreamInvokable<?, ?> invokableObject = invokableObjects.get(vertexName);
+		String operatorName = operatorNames.get(vertexName);
+		byte[] serializedFunction = serializedFunctions.get(vertexName);
+		int parallelism = vertexParallelism.get(vertexName);
+		byte[] outputSelector = outputSelectors.get(vertexName);
 
 		// Create vertex object
-		AbstractJobVertex component = new AbstractJobVertex(componentName);
+		AbstractJobVertex vertex = new AbstractJobVertex(vertexName);
 
-		this.jobGraph.addVertex(component);
+		this.jobGraph.addVertex(vertex);
 
-		component.setInvokableClass(componentClass);
-		component.setParallelism(parallelism);
+		vertex.setInvokableClass(vertexClass);
+		vertex.setParallelism(parallelism);
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("Parallelism set: {} for {}", parallelism, componentName);
+			LOG.debug("Parallelism set: {} for {}", parallelism, vertexName);
 		}
 
-		StreamConfig config = new StreamConfig(component.getConfiguration());
+		StreamConfig config = new StreamConfig(vertex.getConfiguration());
 
-		config.setMutability(mutability.get(componentName));
-		config.setBufferTimeout(bufferTimeout.get(componentName));
+		config.setMutability(mutability.get(vertexName));
+		config.setBufferTimeout(bufferTimeout.get(vertexName));
 
-		config.setTypeWrapperIn1(typeWrapperIn1.get(componentName));
-		config.setTypeWrapperIn2(typeWrapperIn2.get(componentName));
-		config.setTypeWrapperOut1(typeWrapperOut1.get(componentName));
-		config.setTypeWrapperOut2(typeWrapperOut2.get(componentName));
+		config.setTypeWrapperIn1(typeWrapperIn1.get(vertexName));
+		config.setTypeWrapperIn2(typeWrapperIn2.get(vertexName));
+		config.setTypeWrapperOut1(typeWrapperOut1.get(vertexName));
+		config.setTypeWrapperOut2(typeWrapperOut2.get(vertexName));
 
 		// Set vertex config
 		config.setUserInvokable(invokableObject);
-		config.setComponentName(componentName);
+		config.setVertexName(vertexName);
 		config.setFunction(serializedFunction, operatorName);
 		config.setOutputSelector(outputSelector);
 
-		if (componentClass.equals(StreamIterationSource.class)
-				|| componentClass.equals(StreamIterationSink.class)) {
-			config.setIterationId(iterationIds.get(componentName));
-			config.setIterationWaitTime(iterationWaitTime.get(componentName));
+		if (vertexClass.equals(StreamIterationHead.class)
+				|| vertexClass.equals(StreamIterationTail.class)) {
+			config.setIterationId(iterationIds.get(vertexName));
+			config.setIterationWaitTime(iterationWaitTime.get(vertexName));
+		}
+
+		streamVertices.put(vertexName, vertex);
+	}
+
+	/**
+	 * Connects two vertices with the given names, partitioning and channel type
+	 * 
+	 * @param upStreamVertexName
+	 *            Name of the upstream vertex, that will emit the values
+	 * @param downStreamVertexName
+	 *            Name of the downstream vertex, that will receive the values
+	 * @param partitionerObject
+	 *            The partitioner
+	 */
+	private <T> void connect(String upStreamVertexName, String downStreamVertexName,
+			StreamPartitioner<T> partitionerObject) {
+
+		AbstractJobVertex upStreamVertex = streamVertices.get(upStreamVertexName);
+		AbstractJobVertex downStreamVertex = streamVertices.get(downStreamVertexName);
+
+		StreamConfig config = new StreamConfig(upStreamVertex.getConfiguration());
+
+		if (partitionerObject.getClass().equals(ForwardPartitioner.class)) {
+			downStreamVertex
+					.connectNewDataSetAsInput(upStreamVertex, DistributionPattern.POINTWISE);
+		} else {
+			downStreamVertex
+					.connectNewDataSetAsInput(upStreamVertex, DistributionPattern.BIPARTITE);
+		}
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("CONNECTED: {} - {} -> {}", partitionerObject.getClass().getSimpleName(),
+					upStreamVertexName, downStreamVertexName);
 		}
 
-		components.put(componentName, component);
+		int outputIndex = upStreamVertex.getNumberOfProducedIntermediateDataSets() - 1;
+
+		config.setOutputName(outputIndex, outEdgeNames.get(upStreamVertexName).get(outputIndex));
+		config.setSelectAll(outputIndex, outEdgeSelectAll.get(upStreamVertexName).get(outputIndex));
+		config.setPartitioner(outputIndex, partitionerObject);
+		config.setNumberOfOutputChannels(outputIndex, vertexParallelism.get(downStreamVertexName));
 	}
 
 	/**
-	 * Sets the number of parallel instances created for the given component.
+	 * Sets the number of parallel instances created for the given vertex.
 	 * 
-	 * @param componentName
-	 *            Name of the component
+	 * @param vertexName
+	 *            Name of the vertex
 	 * @param parallelism
 	 *            Number of parallel instances created
 	 */
-	public void setParallelism(String componentName, int parallelism) {
-		componentParallelism.put(componentName, parallelism);
+	public void setParallelism(String vertexName, int parallelism) {
+		vertexParallelism.put(vertexName, parallelism);
 	}
 
-	public void setMutability(String componentName, boolean isMutable) {
-		mutability.put(componentName, isMutable);
+	public void setMutability(String vertexName, boolean isMutable) {
+		mutability.put(vertexName, isMutable);
 	}
 
-	public void setBufferTimeout(String componentName, long bufferTimeout) {
-		this.bufferTimeout.put(componentName, bufferTimeout);
+	public void setBufferTimeout(String vertexName, long bufferTimeout) {
+		this.bufferTimeout.put(vertexName, bufferTimeout);
 	}
 
 	/**
 	 * Connects two vertices in the JobGraph using the selected partitioner
 	 * settings
 	 * 
-	 * @param upStreamComponentName
+	 * @param upStreamVertexName
 	 *            Name of the upstream(output) vertex
-	 * @param downStreamComponentName
+	 * @param downStreamVertexName
 	 *            Name of the downstream(input) vertex
 	 * @param partitionerObject
 	 *            Partitioner object
@@ -446,55 +426,15 @@ public class JobGraphBuilder {
 	 * @param outputNames
 	 *            User defined names of the out edge
 	 */
-	public void setEdge(String upStreamComponentName, String downStreamComponentName,
+	public void setEdge(String upStreamVertexName, String downStreamVertexName,
 			StreamPartitioner<?> partitionerObject, int typeNumber, List<String> outputNames,
 			boolean selectAll) {
-		outEdgeList.get(upStreamComponentName).add(downStreamComponentName);
-		outEdgeType.get(upStreamComponentName).add(typeNumber);
-		inEdgeList.get(downStreamComponentName).add(upStreamComponentName);
-		connectionTypes.get(upStreamComponentName).add(partitionerObject);
-		outEdgeNames.get(upStreamComponentName).add(outputNames);
-		outEdgeSelectAll.get(upStreamComponentName).add(selectAll);
-	}
-
-	/**
-	 * Connects to JobGraph components with the given names, partitioning and
-	 * channel type
-	 * 
-	 * @param upStreamComponentName
-	 *            Name of the upstream component, that will emit the tuples
-	 * @param downStreamComponentName
-	 *            Name of the downstream component, that will receive the tuples
-	 * @param partitionerObject
-	 *            The partitioner
-	 */
-	private <T> void connect(String upStreamComponentName, String downStreamComponentName,
-			StreamPartitioner<T> partitionerObject) {
-
-		AbstractJobVertex upStreamComponent = components.get(upStreamComponentName);
-		AbstractJobVertex downStreamComponent = components.get(downStreamComponentName);
-
-		StreamConfig config = new StreamConfig(upStreamComponent.getConfiguration());
-
-		if (partitionerObject.getClass().equals(ForwardPartitioner.class)) {
-			downStreamComponent.connectNewDataSetAsInput(upStreamComponent, DistributionPattern.POINTWISE);
-		} else {
-			downStreamComponent.connectNewDataSetAsInput(upStreamComponent, DistributionPattern.BIPARTITE);
-		}
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("CONNECTED: {} - {} -> {}", partitionerObject.getClass().getSimpleName(),
-					upStreamComponentName, downStreamComponentName);
-		}
-		
-		int outputIndex = upStreamComponent.getNumberOfProducedIntermediateDataSets() - 1;
-
-		config.setOutputName(outputIndex, outEdgeNames.get(upStreamComponentName).get(outputIndex));
-		config.setSelectAll(outputIndex,
-				outEdgeSelectAll.get(upStreamComponentName).get(outputIndex));
-		config.setPartitioner(outputIndex, partitionerObject);
-		config.setNumberOfOutputChannels(outputIndex,
-				componentParallelism.get(downStreamComponentName));
+		outEdgeList.get(upStreamVertexName).add(downStreamVertexName);
+		outEdgeType.get(upStreamVertexName).add(typeNumber);
+		inEdgeList.get(downStreamVertexName).add(upStreamVertexName);
+		connectionTypes.get(upStreamVertexName).add(partitionerObject);
+		outEdgeNames.get(upStreamVertexName).add(outputNames);
+		outEdgeSelectAll.get(upStreamVertexName).add(selectAll);
 	}
 
 	/**
@@ -507,33 +447,31 @@ public class JobGraphBuilder {
 	 *            ID of the iteration tail
 	 */
 	public void setIterationSourceSettings(String iterationID, String iterationTail) {
-		setParallelism(iterationIDtoSourceName.get(iterationID),
-				componentParallelism.get(iterationTail));
-		setBufferTimeout(iterationIDtoSourceName.get(iterationID), bufferTimeout.get(iterationTail));
+		setParallelism(iterationIDtoHeadName.get(iterationID), vertexParallelism.get(iterationTail));
+		setBufferTimeout(iterationIDtoHeadName.get(iterationID), bufferTimeout.get(iterationTail));
 	}
 
 	/**
-	 * Sets a user defined {@link OutputSelector} for the given component. Used
-	 * for directed emits.
+	 * Sets a user defined {@link OutputSelector} for the given vertex. Used for
+	 * directed emits.
 	 * 
-	 * @param componentName
-	 *            Name of the component for which the output selector will be
-	 *            set
+	 * @param vertexName
+	 *            Name of the vertex for which the output selector will be set
 	 * @param serializedOutputSelector
 	 *            Byte array representing the serialized output selector.
 	 */
-	public <T> void setOutputSelector(String componentName, byte[] serializedOutputSelector) {
-		outputSelectors.put(componentName, serializedOutputSelector);
+	public <T> void setOutputSelector(String vertexName, byte[] serializedOutputSelector) {
+		outputSelectors.put(vertexName, serializedOutputSelector);
 
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("Outputselector set for {}", componentName);
+			LOG.debug("Outputselector set for {}", vertexName);
 		}
 
 	}
 
 	/**
-	 * Sets udf operator and TypeSerializerWrapper from one component to
-	 * another, used with some sinks.
+	 * Sets udf operator and TypeSerializerWrapper from one vertex to another,
+	 * used with some sinks.
 	 * 
 	 * @param from
 	 *            from
@@ -559,39 +497,32 @@ public class JobGraphBuilder {
 		return typeWrapperOut1.get(id).getTypeInfo();
 	}
 
-//  TODO: This should be adjusted to the sharing groups
-//	/**
-//	 * Sets instance sharing between the given components
-//	 * 
-//	 * @param component1
-//	 *            Share will be called on this component
-//	 * @param component2
-//	 *            Share will be called to this component
-//	 */
-//	public void setInstanceSharing(String component1, String component2) {
-//		AbstractJobVertex c1 = components.get(component1);
-//		AbstractJobVertex c2 = components.get(component2);
-//
-//		c1.setVertexToShareInstancesWith(c2);
-//	}
-
 	/**
-	 * Sets all components to share with the one with highest parallelism
+	 * Sets slot sharing for the vertices.
 	 */
-	private void setAutomaticInstanceSharing() {
+	private void setSlotSharing() {
 		SlotSharingGroup shareGroup = new SlotSharingGroup();
 
-		for (AbstractJobVertex vertex : components.values()) {
+		for (AbstractJobVertex vertex : streamVertices.values()) {
 			vertex.setSlotSharingGroup(shareGroup);
 		}
+
+		for (String iterID : new HashSet<String>(iterationIds.values())) {
+			CoLocationGroup ccg = new CoLocationGroup();
+			AbstractJobVertex tail = streamVertices.get(iterationIDtoTailName.get(iterID));
+			AbstractJobVertex head = streamVertices.get(iterationIDtoHeadName.get(iterID));
+
+			ccg.addVertex(head);
+			ccg.addVertex(tail);
+		}
 	}
 
 	/**
 	 * Writes number of inputs into each JobVertex's config
 	 */
 	private void setNumberOfJobInputs() {
-		for (AbstractJobVertex component : components.values()) {
-			(new StreamConfig(component.getConfiguration())).setNumberOfInputs(component
+		for (AbstractJobVertex vertex : streamVertices.values()) {
+			(new StreamConfig(vertex.getConfiguration())).setNumberOfInputs(vertex
 					.getNumberOfInputs());
 		}
 	}
@@ -601,43 +532,43 @@ public class JobGraphBuilder {
 	 * config
 	 */
 	private void setNumberOfJobOutputs() {
-		for (AbstractJobVertex component : components.values()) {
-			(new StreamConfig(component.getConfiguration())).setNumberOfOutputs(component
+		for (AbstractJobVertex vertex : streamVertices.values()) {
+			(new StreamConfig(vertex.getConfiguration())).setNumberOfOutputs(vertex
 					.getNumberOfProducedIntermediateDataSets());
 		}
 	}
 
 	/**
-	 * Builds the {@link JobGraph} from the components with the edges and
-	 * settings provided.
+	 * Builds the {@link JobGraph} from the vertices with the edges and settings
+	 * provided.
 	 */
 	private void buildGraph() {
 
-		for (String componentName : outEdgeList.keySet()) {
-			createVertex(componentName);
+		for (String vertexName : outEdgeList.keySet()) {
+			createVertex(vertexName);
 		}
 
-		for (String upStreamComponentName : outEdgeList.keySet()) {
+		for (String upStreamVertexName : outEdgeList.keySet()) {
 			int i = 0;
 
-			List<Integer> outEdgeTypeList = outEdgeType.get(upStreamComponentName);
+			List<Integer> outEdgeTypeList = outEdgeType.get(upStreamVertexName);
 
-			for (String downStreamComponentName : outEdgeList.get(upStreamComponentName)) {
-				StreamConfig downStreamComponentConfig = new StreamConfig(components.get(
-						downStreamComponentName).getConfiguration());
+			for (String downStreamVertexName : outEdgeList.get(upStreamVertexName)) {
+				StreamConfig downStreamVertexConfig = new StreamConfig(streamVertices.get(
+						downStreamVertexName).getConfiguration());
 
-				int inputNumber = downStreamComponentConfig.getNumberOfInputs();
+				int inputNumber = downStreamVertexConfig.getNumberOfInputs();
 
-				downStreamComponentConfig.setInputType(inputNumber++, outEdgeTypeList.get(i));
-				downStreamComponentConfig.setNumberOfInputs(inputNumber);
+				downStreamVertexConfig.setInputType(inputNumber++, outEdgeTypeList.get(i));
+				downStreamVertexConfig.setNumberOfInputs(inputNumber);
 
-				connect(upStreamComponentName, downStreamComponentName,
-						connectionTypes.get(upStreamComponentName).get(i));
+				connect(upStreamVertexName, downStreamVertexName,
+						connectionTypes.get(upStreamVertexName).get(i));
 				i++;
 			}
 		}
 
-		setAutomaticInstanceSharing();
+		setSlotSharing();
 		setNumberOfJobInputs();
 		setNumberOfJobOutputs();
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index 2c53fb3..6fac391 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -28,7 +28,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.collector.OutputSelector;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.streamcomponent.StreamComponentException;
+import org.apache.flink.streaming.api.streamvertex.StreamVertexException;
 import org.apache.flink.streaming.partitioner.ShufflePartitioner;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
@@ -46,7 +46,7 @@ public class StreamConfig {
 	private static final String DIRECTED_EMIT = "directedEmit";
 	private static final String FUNCTION_NAME = "operatorName";
 	private static final String FUNCTION = "operator";
-	private static final String COMPONENT_NAME = "componentName";
+	private static final String VERTEX_NAME = "vertexName";
 	private static final String SERIALIZEDUDF = "serializedudf";
 	private static final String USER_FUNCTION = "userfunction";
 	private static final String BUFFER_TIMEOUT = "bufferTimeout";
@@ -125,8 +125,13 @@ public class StreamConfig {
 
 		TypeSerializerWrapper<T> typeWrapper = (TypeSerializerWrapper<T>) SerializationUtils
 				.deserialize(serializedWrapper);
+		if (typeWrapper != null) {
+			return typeWrapper.getTypeInfo();
+
+		} else {
+			return null;
+		}
 
-		return typeWrapper.getTypeInfo();
 	}
 
 	public void setMutability(boolean isMutable) {
@@ -145,7 +150,7 @@ public class StreamConfig {
 		return config.getLong(BUFFER_TIMEOUT, DEFAULT_TIMEOUT);
 	}
 
-	public void setUserInvokable(StreamInvokable<?> invokableObject) {
+	public void setUserInvokable(StreamInvokable<?,?> invokableObject) {
 		if (invokableObject != null) {
 			config.setClass(USER_FUNCTION, invokableObject.getClass());
 
@@ -162,16 +167,16 @@ public class StreamConfig {
 		try {
 			return deserializeObject(config.getBytes(SERIALIZEDUDF, null));
 		} catch (Exception e) {
-			throw new StreamComponentException("Cannot instantiate user function", e);
+			throw new StreamVertexException("Cannot instantiate user function", e);
 		}
 	}
 
-	public void setComponentName(String componentName) {
-		config.setString(COMPONENT_NAME, componentName);
+	public void setVertexName(String vertexName) {
+		config.setString(VERTEX_NAME, vertexName);
 	}
 
-	public String getComponentName() {
-		return config.getString(COMPONENT_NAME, null);
+	public String getVertexName() {
+		return config.getString(VERTEX_NAME, null);
 	}
 
 	public void setFunction(byte[] serializedFunction, String functionName) {
@@ -212,7 +217,7 @@ public class StreamConfig {
 		try {
 			return deserializeObject(config.getBytes(OUTPUT_SELECTOR, null));
 		} catch (Exception e) {
-			throw new StreamComponentException("Cannot deserialize and instantiate OutputSelector",
+			throw new StreamVertexException("Cannot deserialize and instantiate OutputSelector",
 					e);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
index 2258d4a..51f1467 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
@@ -26,7 +26,7 @@ import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
 import org.apache.flink.streaming.api.function.aggregation.MaxAggregationFunction;
 import org.apache.flink.streaming.api.function.aggregation.MinAggregationFunction;
 import org.apache.flink.streaming.api.function.aggregation.SumAggregationFunction;
-import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.invokable.operator.BatchGroupReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.BatchReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.GroupedBatchGroupReduceInvokable;
@@ -200,7 +200,7 @@ public class BatchedDataStream<OUT> {
 	}
 
 	private SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate) {
-		StreamOperatorInvokable<OUT, OUT> invokable = getReduceInvokable(aggregate);
+		StreamInvokable<OUT, OUT> invokable = getReduceInvokable(aggregate);
 
 		SingleOutputStreamOperator<OUT, ?> returnStream = dataStream.addFunction("batchReduce",
 				aggregate, dataStream.outTypeWrapper, dataStream.outTypeWrapper, invokable);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index ace76aa..23bc80d 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -47,7 +47,7 @@ import org.apache.flink.streaming.api.function.sink.WriteFormatAsText;
 import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByBatches;
 import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByMillis;
 import org.apache.flink.streaming.api.invokable.SinkInvokable;
-import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.invokable.operator.CounterInvokable;
 import org.apache.flink.streaming.api.invokable.operator.FilterInvokable;
 import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable;
@@ -558,7 +558,7 @@ public class DataStream<OUT> {
 	public SingleOutputStreamOperator<OUT, ?> max() {
 		return max(0);
 	}
-	
+
 	/**
 	 * Applies an aggregation that gives the count of the data point.
 	 * 
@@ -568,15 +568,16 @@ public class DataStream<OUT> {
 		TypeSerializerWrapper<OUT> inTypeWrapper = outTypeWrapper;
 		TypeSerializerWrapper<Long> outTypeWrapper = new ObjectTypeWrapper<Long>(new Long(0));
 
-		return addFunction("counter", null, inTypeWrapper, outTypeWrapper, new CounterInvokable<OUT>());
+		return addFunction("counter", null, inTypeWrapper, outTypeWrapper,
+				new CounterInvokable<OUT>());
 	}
 
 	protected SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate) {
 
 		StreamReduceInvokable<OUT> invokable = new StreamReduceInvokable<OUT>(aggregate);
 
-		SingleOutputStreamOperator<OUT, ?> returnStream = addFunction("reduce", aggregate, outTypeWrapper,
-				outTypeWrapper, invokable);
+		SingleOutputStreamOperator<OUT, ?> returnStream = addFunction("reduce", aggregate,
+				outTypeWrapper, outTypeWrapper, invokable);
 
 		return returnStream;
 	}
@@ -759,7 +760,8 @@ public class DataStream<OUT> {
 	private DataStreamSink<OUT> writeAsText(DataStream<OUT> inputStream, String path,
 			WriteFormatAsText<OUT> format, int batchSize, OUT endTuple) {
 		DataStreamSink<OUT> returnStream = addSink(inputStream,
-				new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple), inputStream.outTypeWrapper);
+				new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple),
+				inputStream.outTypeWrapper);
 		jobGraphBuilder.setMutability(returnStream.getId(), false);
 		return returnStream;
 	}
@@ -909,7 +911,8 @@ public class DataStream<OUT> {
 	private DataStreamSink<OUT> writeAsCsv(DataStream<OUT> inputStream, String path,
 			WriteFormatAsCsv<OUT> format, int batchSize, OUT endTuple) {
 		DataStreamSink<OUT> returnStream = addSink(inputStream,
-				new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple), inputStream.outTypeWrapper);
+				new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple),
+				inputStream.outTypeWrapper);
 		jobGraphBuilder.setMutability(returnStream.getId(), false);
 		return returnStream;
 	}
@@ -944,7 +947,7 @@ public class DataStream<OUT> {
 
 		DataStream<R> returnStream = new DataStreamSource<R>(environment, "iterationSource", null);
 
-		jobGraphBuilder.addIterationSource(returnStream.getId(), this.getId(), iterationID,
+		jobGraphBuilder.addIterationHead(returnStream.getId(), this.getId(), iterationID,
 				degreeOfParallelism, waitTime);
 
 		return this.copy();
@@ -966,15 +969,14 @@ public class DataStream<OUT> {
 	 */
 	protected <R> SingleOutputStreamOperator<R, ?> addFunction(String functionName,
 			final Function function, TypeSerializerWrapper<OUT> inTypeWrapper,
-			TypeSerializerWrapper<R> outTypeWrapper,
-			StreamOperatorInvokable<OUT, R> functionInvokable) {
+			TypeSerializerWrapper<R> outTypeWrapper, StreamInvokable<OUT, R> functionInvokable) {
 		DataStream<OUT> inputStream = this.copy();
 		@SuppressWarnings({ "unchecked", "rawtypes" })
 		SingleOutputStreamOperator<R, ?> returnStream = new SingleOutputStreamOperator(environment,
 				functionName, outTypeWrapper);
 
 		try {
-			jobGraphBuilder.addTask(returnStream.getId(), functionInvokable, inTypeWrapper,
+			jobGraphBuilder.addStreamVertex(returnStream.getId(), functionInvokable, inTypeWrapper,
 					outTypeWrapper, functionName,
 					SerializationUtils.serialize((Serializable) function), degreeOfParallelism);
 		} catch (SerializationException e) {
@@ -1049,13 +1051,13 @@ public class DataStream<OUT> {
 	}
 
 	private DataStreamSink<OUT> addSink(DataStream<OUT> inputStream,
-			SinkFunction<OUT> sinkFunction, TypeSerializerWrapper<OUT> typeWrapper) {
+			SinkFunction<OUT> sinkFunction, TypeSerializerWrapper<OUT> inTypeWrapper) {
 		DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink",
 				outTypeWrapper);
 
 		try {
-			jobGraphBuilder.addSink(returnStream.getId(), new SinkInvokable<OUT>(sinkFunction),
-					typeWrapper, "sink", SerializationUtils.serialize(sinkFunction),
+			jobGraphBuilder.addStreamVertex(returnStream.getId(), new SinkInvokable<OUT>(sinkFunction),
+					inTypeWrapper, null, "sink", SerializationUtils.serialize(sinkFunction),
 					degreeOfParallelism);
 		} catch (SerializationException e) {
 			throw new RuntimeException("Cannot serialize SinkFunction");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
index e619f36..41616c9 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
@@ -82,7 +82,7 @@ public class IterativeDataStream<IN> extends
 	public <R> DataStream<IN> closeWith(DataStream<IN> iterationTail, String iterationName) {
 		DataStream<R> returnStream = new DataStreamSink<R>(environment, "iterationSink", null);
 
-		jobGraphBuilder.addIterationSink(returnStream.getId(), iterationTail.getId(),
+		jobGraphBuilder.addIterationTail(returnStream.getId(), iterationTail.getId(),
 				iterationID.toString(), iterationTail.getParallelism(), waitTime);
 
 		jobGraphBuilder.setIterationSourceSettings(iterationID.toString(), iterationTail.getId());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 00f9082..c3231d7 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -230,8 +230,8 @@ public abstract class StreamExecutionEnvironment {
 
 		try {
 			SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
-			jobGraphBuilder.addSource(returnStream.getId(), new SourceInvokable<OUT>(function),
-					outTypeWrapper, "source", SerializationUtils.serialize(function), 1);
+			jobGraphBuilder.addStreamVertex(returnStream.getId(), new SourceInvokable<OUT>(function),
+					null, outTypeWrapper, "source", SerializationUtils.serialize(function), 1);
 		} catch (SerializationException e) {
 			throw new RuntimeException("Cannot serialize elements");
 		}
@@ -267,8 +267,8 @@ public abstract class StreamExecutionEnvironment {
 		try {
 			SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
 
-			jobGraphBuilder.addSource(returnStream.getId(), new SourceInvokable<OUT>(
-					new FromElementsFunction<OUT>(data)), new ObjectTypeWrapper<OUT>(data
+			jobGraphBuilder.addStreamVertex(returnStream.getId(), new SourceInvokable<OUT>(
+					new FromElementsFunction<OUT>(data)), null, new ObjectTypeWrapper<OUT>(data
 					.iterator().next()), "source", SerializationUtils.serialize(function), 1);
 		} catch (SerializationException e) {
 			throw new RuntimeException("Cannot serialize collection");
@@ -311,8 +311,9 @@ public abstract class StreamExecutionEnvironment {
 				outTypeWrapper);
 
 		try {
-			jobGraphBuilder.addSource(returnStream.getId(), new SourceInvokable<OUT>(function),
-					outTypeWrapper, "source", SerializationUtils.serialize(function), parallelism);
+			jobGraphBuilder.addStreamVertex(returnStream.getId(), new SourceInvokable<OUT>(function),
+					null, outTypeWrapper, "source", SerializationUtils.serialize(function),
+					parallelism);
 		} catch (SerializationException e) {
 			throw new RuntimeException("Cannot serialize SourceFunction");
 		}
@@ -454,7 +455,8 @@ public abstract class StreamExecutionEnvironment {
 	 * <p>
 	 * The program execution will be logged and displayed with a generated
 	 * default name.
-	 * @throws Exception 
+	 * 
+	 * @throws Exception
 	 **/
 	public abstract void execute() throws Exception;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
index 4c7138c..ec33224 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.invokable;
 
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 
-public class SinkInvokable<IN> extends StreamOperatorInvokable<IN, IN> {
+public class SinkInvokable<IN> extends StreamInvokable<IN, IN> {
 	private static final long serialVersionUID = 1L;
 
 	private SinkFunction<IN> sinkFunction;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
index 30c86f9..8c9df46 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
@@ -21,7 +21,7 @@ import java.io.Serializable;
 
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 
-public class SourceInvokable<OUT> extends StreamInvokable<OUT> implements Serializable {
+public class SourceInvokable<OUT> extends StreamInvokable<OUT,OUT> implements Serializable {
 
 	private static final long serialVersionUID = 1L;
 
@@ -38,4 +38,16 @@ public class SourceInvokable<OUT> extends StreamInvokable<OUT> implements Serial
 		sourceFunction.invoke(collector);
 	}
 
+	@Override
+	protected void immutableInvoke() throws Exception {		
+	}
+
+	@Override
+	protected void mutableInvoke() throws Exception {		
+	}
+
+	@Override
+	protected void callUserFunction() throws Exception {
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
index 4f0e138..b3cd57f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
@@ -22,18 +22,30 @@ import java.io.Serializable;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * The StreamInvokable represents the base class for all invokables in
- * the streaming topology.
+ * The StreamInvokable represents the base class for all invokables in the
+ * streaming topology.
  *
  * @param <OUT>
  *            The output type of the invokable
  */
-public abstract class StreamInvokable<OUT> implements Serializable {
+public abstract class StreamInvokable<IN, OUT> implements Serializable {
 
 	private static final long serialVersionUID = 1L;
+	private static final Logger LOG = LoggerFactory.getLogger(StreamInvokable.class);
+
+	protected MutableObjectIterator<StreamRecord<IN>> recordIterator;
+	protected StreamRecordSerializer<IN> serializer;
+	protected StreamRecord<IN> reuse;
+	protected boolean isMutable;
 
 	protected Collector<OUT> collector;
 	protected Function userFunction;
@@ -43,8 +55,79 @@ public abstract class StreamInvokable<OUT> implements Serializable {
 		this.userFunction = userFunction;
 	}
 
-	public void setCollector(Collector<OUT> collector) {
+	/**
+	 * Initializes the {@link StreamOperatorInvokable} for input and output
+	 * handling
+	 * 
+	 * @param collector
+	 *            Collector object for collecting the outputs for the operator
+	 * @param recordIterator
+	 *            Iterator for reading in the input records
+	 * @param serializer
+	 *            Serializer used to deserialize inputs
+	 * @param isMutable
+	 *            Mutability setting for the operator
+	 */
+	public void initialize(Collector<OUT> collector,
+			MutableObjectIterator<StreamRecord<IN>> recordIterator,
+			StreamRecordSerializer<IN> serializer, boolean isMutable) {
 		this.collector = collector;
+		this.recordIterator = recordIterator;
+		this.serializer = serializer;
+		if(this.serializer != null){
+			this.reuse = serializer.createInstance();
+		}
+		this.isMutable = isMutable;
+	}
+
+	/**
+	 * Re-initializes the object in which the next input record will be read in
+	 */
+	protected void resetReuse() {
+		this.reuse = serializer.createInstance();
+	}
+
+	/**
+	 * Method that will be called if the mutability setting is set to immutable
+	 */
+	protected abstract void immutableInvoke() throws Exception;
+
+	/**
+	 * Method that will be called if the mutability setting is set to mutable
+	 */
+	protected abstract void mutableInvoke() throws Exception;
+
+	/**
+	 * The call of the user implemented function should be implemented here
+	 */
+	protected abstract void callUserFunction() throws Exception;
+
+	/**
+	 * Method for logging exceptions thrown during the user function call
+	 */
+	protected void callUserFunctionAndLogException() {
+		try {
+			callUserFunction();
+		} catch (Exception e) {
+			if (LOG.isErrorEnabled()) {
+				LOG.error("Calling user function failed due to: {}",
+						StringUtils.stringifyException(e));
+			}
+		}
+	}
+
+	/**
+	 * Method that will be called when the stream starts. The user should encode
+	 * the processing functionality in {@link #mutableInvoke()} and
+	 * {@link #immutableInvoke()}
+	 * 
+	 */
+	public void invoke() throws Exception {
+		if (this.isMutable) {
+			mutableInvoke();
+		} else {
+			immutableInvoke();
+		}
 	}
 
 	/**
@@ -55,7 +138,7 @@ public abstract class StreamInvokable<OUT> implements Serializable {
 	 *            The configuration parameters for the operator
 	 */
 	public void open(Configuration parameters) throws Exception {
-		isRunning=true;
+		isRunning = true;
 		if (userFunction instanceof RichFunction) {
 			((RichFunction) userFunction).open(parameters);
 		}
@@ -72,11 +155,4 @@ public abstract class StreamInvokable<OUT> implements Serializable {
 			((RichFunction) userFunction).close();
 		}
 	}
-
-	/**
-	 * The method that will be called once when the operator is created, the
-	 * working mechanics of the operator should be implemented here
-	 * 
-	 */
-	public abstract void invoke() throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamOperatorInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamOperatorInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamOperatorInvokable.java
deleted file mode 100644
index 239f9f4..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamOperatorInvokable.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-import org.apache.flink.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The StreamOperatorInvokable represents the base class for all operators in
- * the streaming topology.
- *
- * @param <IN>
- *            Input type of the operator
- * @param <OUT>
- *            Output type of the operator
- */
-public abstract class StreamOperatorInvokable<IN, OUT> extends StreamInvokable<OUT> {
-
-	public StreamOperatorInvokable(Function userFunction) {
-		super(userFunction);
-	}
-
-	private static final long serialVersionUID = 1L;
-	private static final Logger LOG = LoggerFactory.getLogger(StreamOperatorInvokable.class);
-
-	protected MutableObjectIterator<StreamRecord<IN>> recordIterator;
-	protected StreamRecordSerializer<IN> serializer;
-	protected StreamRecord<IN> reuse;
-	protected boolean isMutable;
-
-	/**
-	 * Initializes the {@link StreamOperatorInvokable} for input and output
-	 * handling
-	 * 
-	 * @param collector
-	 *            Collector object for collecting the outputs for the operator
-	 * @param recordIterator
-	 *            Iterator for reading in the input records
-	 * @param serializer
-	 *            Serializer used to deserialize inputs
-	 * @param isMutable
-	 *            Mutability setting for the operator
-	 */
-	public void initialize(Collector<OUT> collector,
-			MutableObjectIterator<StreamRecord<IN>> recordIterator,
-			StreamRecordSerializer<IN> serializer, boolean isMutable) {
-		setCollector(collector);
-		this.recordIterator = recordIterator;
-		this.serializer = serializer;
-		this.reuse = serializer.createInstance();
-		this.isMutable = isMutable;
-	}
-
-	/**
-	 * Re-initializes the object in which the next input record will be read in
-	 */
-	protected void resetReuse() {
-		this.reuse = serializer.createInstance();
-	}
-
-	/**
-	 * Method that will be called if the mutability setting is set to immutable
-	 */
-	protected abstract void immutableInvoke() throws Exception;
-
-	/**
-	 * Method that will be called if the mutability setting is set to mutable
-	 */
-	protected abstract void mutableInvoke() throws Exception;
-
-	/**
-	 * The call of the user implemented function should be implemented here
-	 */
-	protected abstract void callUserFunction() throws Exception;
-
-	/**
-	 * Method for logging exceptions thrown during the user function call
-	 */
-	protected void callUserFunctionAndLogException() {
-		try {
-			callUserFunction();
-		} catch (Exception e) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Calling user function failed due to: {}",
-						StringUtils.stringifyException(e));
-			}
-		}
-	}
-
-	@Override
-	public void invoke() throws Exception {
-		if (this.isMutable) {
-			mutableInvoke();
-		} else {
-			immutableInvoke();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceInvokable.java
index 2e64d7c..81498dc 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceInvokable.java
@@ -24,11 +24,11 @@ import java.util.Iterator;
 import org.apache.commons.math.util.MathUtils;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.state.SlidingWindowState;
 
-public class BatchGroupReduceInvokable<IN, OUT> extends StreamOperatorInvokable<IN, OUT> {
+public class BatchGroupReduceInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
 
 	private static final long serialVersionUID = 1L;
 	protected GroupReduceFunction<IN, OUT> reducer;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
index 41ae3f3..f306dac 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
@@ -24,11 +24,11 @@ import org.apache.commons.math.util.MathUtils;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.state.NullableCircularBuffer;
 
-public class BatchReduceInvokable<OUT> extends StreamOperatorInvokable<OUT, OUT> {
+public class BatchReduceInvokable<OUT> extends StreamInvokable<OUT, OUT> {
 
 	private static final long serialVersionUID = 1L;
 	protected ReduceFunction<OUT> reducer;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
index 12f43e1..7924595 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
@@ -17,9 +17,9 @@
 
 package org.apache.flink.streaming.api.invokable.operator;
 
-import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
 
-public class CounterInvokable<IN> extends StreamOperatorInvokable<IN, Long> {
+public class CounterInvokable<IN> extends StreamInvokable<IN, Long> {
 	private static final long serialVersionUID = 1L;
 	
 	Long count = 0L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
index 9e5b18a..a54b6ad 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
@@ -18,9 +18,9 @@
 package org.apache.flink.streaming.api.invokable.operator;
 
 import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
 
-public class FilterInvokable<IN> extends StreamOperatorInvokable<IN, IN> {
+public class FilterInvokable<IN> extends StreamInvokable<IN, IN> {
 
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
index 1a0c93e..3452a82 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
@@ -18,9 +18,9 @@
 package org.apache.flink.streaming.api.invokable.operator;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
 
-public class FlatMapInvokable<IN, OUT> extends StreamOperatorInvokable<IN, OUT> {
+public class FlatMapInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
 	private static final long serialVersionUID = 1L;
 
 	private FlatMapFunction<IN, OUT> flatMapper;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
index 40142ed..4feb4f3 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
@@ -18,9 +18,9 @@
 package org.apache.flink.streaming.api.invokable.operator;
 
 import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
 
-public class MapInvokable<IN, OUT> extends StreamOperatorInvokable<IN, OUT> {
+public class MapInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
 	private static final long serialVersionUID = 1L;
 
 	private MapFunction<IN, OUT> mapper;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
index 7080a7f..d327c76 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
@@ -18,9 +18,9 @@
 package org.apache.flink.streaming.api.invokable.operator;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
 
-public class StreamReduceInvokable<IN> extends StreamOperatorInvokable<IN, IN> {
+public class StreamReduceInvokable<IN> extends StreamInvokable<IN, IN> {
 	private static final long serialVersionUID = 1L;
 
 	protected ReduceFunction<IN> reducer;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
index 29e7650..098bbc6 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
@@ -27,7 +27,7 @@ import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<OUT> {
+public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<IN1, OUT> {
 
 	public CoInvokable(Function userFunction) {
 		super(userFunction);
@@ -41,7 +41,6 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<OUT> {
 	protected StreamRecord<IN2> reuse2;
 	protected StreamRecordSerializer<IN1> serializer1;
 	protected StreamRecordSerializer<IN2> serializer2;
-	protected boolean isMutable;
 
 	public void initialize(Collector<OUT> collector,
 			CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> recordIterator,
@@ -71,14 +70,7 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<OUT> {
 		this.reuse2 = serializer2.createInstance();
 	}
 
-	public void invoke() throws Exception {
-		if (this.isMutable) {
-			mutableInvoke();
-		} else {
-			immutableInvoke();
-		}
-	}
-
+	@Override
 	protected void immutableInvoke() throws Exception {
 		while (true) {
 			int next = recordIterator.next(reuse1, reuse2);
@@ -96,6 +88,7 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<OUT> {
 		}
 	}
 
+	@Override
 	protected void mutableInvoke() throws Exception {
 		while (true) {
 			int next = recordIterator.next(reuse1, reuse2);
@@ -149,4 +142,8 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<OUT> {
 		}
 	}
 
+	@Override
+	protected void callUserFunction() throws Exception {
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
deleted file mode 100644
index 03c3988..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamcomponent;
-
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.streaming.api.StreamConfig;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-
-public abstract class AbstractStreamComponent extends AbstractInvokable {
-
-	protected StreamConfig configuration;
-	protected int instanceID;
-	protected String name;
-	private static int numComponents = 0;
-	protected boolean isMutable;
-	protected Object function;
-	protected String functionName;
-
-	protected static int newComponent() {
-		numComponents++;
-		return numComponents;
-	}
-
-	@Override
-	public void registerInputOutput() {
-		initialize();
-		setInputsOutputs();
-		setInvokable();
-	}
-	
-	protected void initialize() {
-		this.configuration = new StreamConfig(getTaskConfiguration());
-		this.name = configuration.getComponentName();
-		this.isMutable = configuration.getMutability();
-		this.functionName = configuration.getFunctionName();
-		this.function = configuration.getFunction();
-	}
-
-	protected <T> void invokeUserFunction(StreamInvokable<T> userInvokable) throws Exception {
-		userInvokable.open(getTaskConfiguration());
-		userInvokable.invoke();
-		userInvokable.close();
-	}
-
-	protected abstract void setInputsOutputs();
-
-	protected abstract void setInvokable();
-
-	public String getName() {
-		return name;
-	}
-
-	public int getInstanceID() {
-		return instanceID;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
deleted file mode 100644
index 4483969..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamcomponent;
-
-import java.util.ArrayList;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.runtime.io.network.api.MutableRecordReader;
-import org.apache.flink.runtime.plugable.DeserializationDelegate;
-import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.io.CoReaderIterator;
-import org.apache.flink.streaming.io.CoRecordReader;
-import org.apache.flink.util.MutableObjectIterator;
-
-public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tuple> extends
-		AbstractStreamComponent {
-
-	private OutputHandler<OUT> outputHandler;
-
-	protected StreamRecordSerializer<IN1> inputDeserializer1 = null;
-	protected StreamRecordSerializer<IN2> inputDeserializer2 = null;
-
-	MutableObjectIterator<StreamRecord<IN1>> inputIter1;
-	MutableObjectIterator<StreamRecord<IN2>> inputIter2;
-
-	CoRecordReader<DeserializationDelegate<StreamRecord<IN1>>, DeserializationDelegate<StreamRecord<IN2>>> coReader;
-	CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> coIter;
-
-	private CoInvokable<IN1, IN2, OUT> userInvokable;
-	private static int numTasks;
-
-	public CoStreamTask() {
-		userInvokable = null;
-		numTasks = newComponent();
-		instanceID = numTasks;
-	}
-
-	private void setDeserializers() {
-		TypeInformation<IN1> inputTypeInfo1 = configuration.getTypeInfoIn1();
-		inputDeserializer1 = new StreamRecordSerializer<IN1>(inputTypeInfo1);
-
-		TypeInformation<IN2> inputTypeInfo2 = configuration.getTypeInfoIn2();
-		inputDeserializer2 = new StreamRecordSerializer<IN2>(inputTypeInfo2);
-	}
-
-	@Override
-	public void setInputsOutputs() {
-		outputHandler = new OutputHandler<OUT>(this);
-
-		setConfigInputs();
-
-		coIter = new CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>>(coReader,
-				inputDeserializer1, inputDeserializer2);
-	}
-
-	@Override
-	protected void setInvokable() {
-		userInvokable = configuration.getUserInvokable();
-		userInvokable.initialize(outputHandler.getCollector(), coIter, inputDeserializer1,
-				inputDeserializer2, isMutable);
-	}
-
-	protected void setConfigInputs() throws StreamComponentException {
-		setDeserializers();
-
-		int numberOfInputs = configuration.getNumberOfInputs();
-
-		ArrayList<MutableRecordReader<DeserializationDelegate<StreamRecord<IN1>>>> inputList1 = new ArrayList<MutableRecordReader<DeserializationDelegate<StreamRecord<IN1>>>>();
-		ArrayList<MutableRecordReader<DeserializationDelegate<StreamRecord<IN2>>>> inputList2 = new ArrayList<MutableRecordReader<DeserializationDelegate<StreamRecord<IN2>>>>();
-
-		for (int i = 0; i < numberOfInputs; i++) {
-			int inputType = configuration.getInputType(i);
-			switch (inputType) {
-			case 1:
-				inputList1.add(new MutableRecordReader<DeserializationDelegate<StreamRecord<IN1>>>(
-						this));
-				break;
-			case 2:
-				inputList2.add(new MutableRecordReader<DeserializationDelegate<StreamRecord<IN2>>>(
-						this));
-				break;
-			default:
-				throw new RuntimeException("Invalid input type number: " + inputType);
-			}
-		}
-
-		coReader = new CoRecordReader<DeserializationDelegate<StreamRecord<IN1>>, DeserializationDelegate<StreamRecord<IN2>>>(
-				inputList1, inputList2);
-	}
-
-	@Override
-	public void invoke() throws Exception {
-		outputHandler.invokeUserFunction("CO-TASK", userInvokable);
-	}
-
-}