You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/08/19 18:42:13 UTC

[4/6] flink git commit: [FLINK-2398][api-breaking] Introduce StreamGraphGenerator

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 65736f5..6474ae9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -23,14 +23,15 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -44,11 +45,11 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.state.StateHandleProvider;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
@@ -84,11 +85,15 @@ public class StreamGraph extends StreamingPlan {
 
 	private Map<Integer, StreamNode> streamNodes;
 	private Set<Integer> sources;
+	private Set<Integer> sinks;
+	private Map<Integer, Tuple2<Integer, List<String>>> virtualSelectNodes;
+	private Map<Integer, Tuple2<Integer, StreamPartitioner<?>>> virtuaPartitionNodes;
 
-	private Map<Integer, StreamLoop> streamLoops;
-	protected Map<Integer, StreamLoop> vertexIDtoLoop;
 	protected Map<Integer, String> vertexIDtoBrokerID;
+	protected Map<Integer, Long> vertexIDtoLoopTimeout;
 	private StateHandleProvider<?> stateHandleProvider;
+	private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs;
+
 	private boolean forceCheckpoint = false;
 
 	public StreamGraph(StreamExecutionEnvironment environment) {
@@ -104,11 +109,14 @@ public class StreamGraph extends StreamingPlan {
 	 * Remove all registered nodes etc.
 	 */
 	public void clear() {
-		streamNodes = new HashMap<Integer, StreamNode>();
-		streamLoops = new HashMap<Integer, StreamLoop>();
-		vertexIDtoLoop = new HashMap<Integer, StreamLoop>();
-		vertexIDtoBrokerID = new HashMap<Integer, String>();
-		sources = new HashSet<Integer>();
+		streamNodes = Maps.newHashMap();
+		virtualSelectNodes = Maps.newHashMap();
+		virtuaPartitionNodes = Maps.newHashMap();
+		vertexIDtoBrokerID = Maps.newHashMap();
+		vertexIDtoLoopTimeout = Maps.newHashMap();
+		iterationSourceSinkPairs = Sets.newHashSet();
+		sources = Sets.newHashSet();
+		sinks = Sets.newHashSet();
 	}
 
 	protected ExecutionConfig getExecutionConfig() {
@@ -167,7 +175,7 @@ public class StreamGraph extends StreamingPlan {
 	
 
 	public boolean isIterative() {
-		return !streamLoops.isEmpty();
+		return!vertexIDtoLoopTimeout.isEmpty();
 	}
 
 	public <IN, OUT> void addSource(Integer vertexID, StreamOperator<OUT> operatorObject,
@@ -176,6 +184,12 @@ public class StreamGraph extends StreamingPlan {
 		sources.add(vertexID);
 	}
 
+	public <IN, OUT> void addSink(Integer vertexID, StreamOperator<OUT> operatorObject,
+			TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) {
+		addOperator(vertexID, operatorObject, inTypeInfo, outTypeInfo, operatorName);
+		sinks.add(vertexID);
+	}
+
 	public <IN, OUT> void addOperator(Integer vertexID, StreamOperator<OUT> operatorObject,
 			TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) {
 
@@ -212,224 +226,141 @@ public class StreamGraph extends StreamingPlan {
 		}
 	}
 
-	public void addIterationHead(Integer iterationHead, Integer iterationID, long timeOut,
-			TypeInformation<?> feedbackType) {
-		// If there is no loop object created for this iteration create one
-		StreamLoop loop = streamLoops.get(iterationID);
-		if (loop == null) {
-			loop = new StreamLoop(iterationID, timeOut, feedbackType);
-			streamLoops.put(iterationID, loop);
-		}
-
-		loop.addHeadOperator(getStreamNode(iterationHead));
-	}
-
-	public void addIterationTail(List<DataStream<?>> feedbackStreams, Integer iterationID,
-			boolean keepPartitioning) {
+	protected StreamNode addNode(Integer vertexID, Class<? extends AbstractInvokable> vertexClass,
+			StreamOperator<?> operatorObject, String operatorName) {
 
-		if (!streamLoops.containsKey(iterationID)) {
-			throw new RuntimeException("Cannot close iteration without head operator.");
+		if (streamNodes.containsKey(vertexID)) {
+			throw new RuntimeException("Duplicate vertexID " + vertexID);
 		}
 
-		StreamLoop loop = streamLoops.get(iterationID);
+		StreamNode vertex = new StreamNode(environemnt, vertexID, operatorObject, operatorName,
+				new ArrayList<OutputSelector<?>>(), vertexClass);
 
-		for (DataStream<?> stream : feedbackStreams) {
-			loop.addTailOperator(getStreamNode(stream.getId()), stream.getPartitioner(),
-					stream.getSelectedNames());
-		}
+		streamNodes.put(vertexID, vertex);
 
-		if (keepPartitioning) {
-			loop.applyTailPartitioning();
-		}
+		return vertex;
 	}
 
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	public void finalizeLoops() {
-		
-		// We create each loop separately, the order does not matter as sinks
-		// and sources don't interact
-		for (StreamLoop loop : streamLoops.values()) {
-
-			// We make sure not to re-create the loops if the method is called
-			// multiple times
-			if (loop.getSourceSinkPairs().isEmpty()) {
-
-				List<StreamNode> headOps = loop.getHeads();
-				List<StreamNode> tailOps = loop.getTails();
+	/**
+	 * Adds a new virtual node that is used to connect a downstream vertex to only the outputs
+	 * with the selected names.
+	 *
+	 * When adding an edge from the virtual node to a downstream node the connection will be made
+	 * to the original node, only with the selected names given here.
+	 *
+	 * @param originalId ID of the node that should be connected to.
+	 * @param virtualId ID of the virtual node.
+	 * @param selectedNames The selected names.
+	 */
+	public void addVirtualSelectNode(Integer originalId, Integer virtualId, List<String> selectedNames) {
 
-				// This means that the iteration was not closed. It should not
-				// be
-				// allowed.
-				if (tailOps.isEmpty()) {
-					throw new RuntimeException("Cannot execute job with empty iterations.");
-				}
+		if (virtualSelectNodes.containsKey(virtualId)) {
+			throw new IllegalStateException("Already has virtual select node with id " + virtualId);
+		}
 
-				// Check whether we keep the feedback partitioning
-				if (loop.keepsPartitioning()) {
-					// This is the complicated case as we need to enforce
-					// partitioning on the tail -> sink side, which
-					// requires strict forward connections at source -> head
-
-					// We need one source/sink pair per different head
-					// parallelism
-					// as we depend on strict forwards connections
-					Map<Integer, List<StreamNode>> parallelismToHeads = new HashMap<Integer, List<StreamNode>>();
-
-					// Group head operators by parallelism
-					for (StreamNode head : headOps) {
-						int p = head.getParallelism();
-						if (!parallelismToHeads.containsKey(p)) {
-							parallelismToHeads.put(p, new ArrayList<StreamNode>());
-						}
-						parallelismToHeads.get(p).add(head);
-					}
-
-					// We create the sink/source pair for each parallelism
-					// group,
-					// tails will forward to all sinks but each head operator
-					// will
-					// only receive from one source (corresponding to its
-					// parallelism)
-					int c = 0;
-					for (Entry<Integer, List<StreamNode>> headGroup : parallelismToHeads.entrySet()) {
-						List<StreamNode> headOpsInGroup = headGroup.getValue();
-
-						Tuple2<StreamNode, StreamNode> sourceSinkPair = createItSourceAndSink(loop,
-								c);
-						StreamNode source = sourceSinkPair.f0;
-						StreamNode sink = sourceSinkPair.f1;
-
-						// We connect the source to the heads in this group
-						// (forward), setting
-						// type to 2 in case we have a coIteration (this sets
-						// the
-						// input as the second input of the co-operator)
-						for (StreamNode head : headOpsInGroup) {
-							int inputType = loop.isCoIteration() ? 2 : 0;
-							addEdge(source.getId(), head.getId(), new RebalancePartitioner(true),
-									inputType, new ArrayList<String>());
-						}
-
-						// We connect all the tails to the sink keeping the
-						// partitioner
-						for (int i = 0; i < tailOps.size(); i++) {
-							StreamNode tail = tailOps.get(i);
-							StreamPartitioner<?> partitioner = loop.getTailPartitioners().get(i);
-							addEdge(tail.getId(), sink.getId(), partitioner.copy(), 0, loop
-									.getTailSelectedNames().get(i));
-						}
-
-						// We set the sink/source parallelism to the group
-						// parallelism
-						source.setParallelism(headGroup.getKey());
-						sink.setParallelism(source.getParallelism());
-
-						// We set the proper serializers for the sink/source
-						setSerializersFrom(tailOps.get(0).getId(), sink.getId());
-						if (loop.isCoIteration()) {
-							source.setSerializerOut(loop.getFeedbackType().createSerializer(executionConfig));
-						} else {
-							setSerializersFrom(headOpsInGroup.get(0).getId(), source.getId());
-						}
-
-						c++;
-					}
-
-				} else {
-					// This is the most simple case, we add one iteration
-					// sink/source pair with the parallelism of the first tail
-					// operator. Tail operators will forward the records and
-					// partitioning will be enforced from source -> head
-
-					Tuple2<StreamNode, StreamNode> sourceSinkPair = createItSourceAndSink(loop, 0);
-					StreamNode source = sourceSinkPair.f0;
-					StreamNode sink = sourceSinkPair.f1;
-
-					// We get the feedback partitioner from the first input of
-					// the
-					// first head.
-					StreamPartitioner<?> partitioner = headOps.get(0).getInEdges().get(0)
-							.getPartitioner();
-
-					// Connect the sources to heads using this partitioner
-					for (StreamNode head : headOps) {
-						addEdge(source.getId(), head.getId(), partitioner.copy(), 0,
-								new ArrayList<String>());
-					}
-
-					// The tails are connected to the sink with forward
-					// partitioning
-					for (int i = 0; i < tailOps.size(); i++) {
-						StreamNode tail = tailOps.get(i);
-						addEdge(tail.getId(), sink.getId(), new RebalancePartitioner(true), 0, loop
-								.getTailSelectedNames().get(i));
-					}
-
-					// We set the parallelism to match the first tail op to make
-					// the
-					// forward more efficient
-					sink.setParallelism(tailOps.get(0).getParallelism());
-					source.setParallelism(sink.getParallelism());
-
-					// We set the proper serializers
-					setSerializersFrom(headOps.get(0).getId(), source.getId());
-					setSerializersFrom(tailOps.get(0).getId(), sink.getId());
-				}
+		virtualSelectNodes.put(virtualId,
+				new Tuple2<Integer, List<String>>(originalId, selectedNames));
+	}
 
-			}
+	/**
+	 * Adds a new virtual node that is used to connect a downstream vertex to an input with a certain
+	 * partitioning.
+	 *
+	 * When adding an edge from the virtual node to a downstream node the connection will be made
+	 * to the original node, but with the partitioning given here.
+	 *
+	 * @param originalId ID of the node that should be connected to.
+	 * @param virtualId ID of the virtual node.
+	 * @param partitioner The partitioner
+	 */
+	public void addVirtualPartitionNode(Integer originalId, Integer virtualId, StreamPartitioner<?> partitioner) {
 
+		if (virtuaPartitionNodes.containsKey(virtualId)) {
+			throw new IllegalStateException("Already has virtual partition node with id " + virtualId);
 		}
 
+		virtuaPartitionNodes.put(virtualId,
+				new Tuple2<Integer, StreamPartitioner<?>>(originalId, partitioner));
 	}
 
-	private Tuple2<StreamNode, StreamNode> createItSourceAndSink(StreamLoop loop, int c) {
-		StreamNode source = addNode(-1 * streamNodes.size(), StreamIterationHead.class, null, null);
-		sources.add(source.getId());
-
-		StreamNode sink = addNode(-1 * streamNodes.size(), StreamIterationTail.class, null, null);
-
-		source.setOperatorName("IterationSource-" + loop.getID() + "_" + c);
-		sink.setOperatorName("IterationSink-" + loop.getID() + "_" + c);
-		vertexIDtoBrokerID.put(source.getId(), loop.getID() + "_" + c);
-		vertexIDtoBrokerID.put(sink.getId(), loop.getID() + "_" + c);
-		vertexIDtoLoop.put(source.getId(), loop);
-		vertexIDtoLoop.put(sink.getId(), loop);
-		loop.addSourceSinkPair(source, sink);
+	public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) {
+		addEdgeInternal(upStreamVertexID,
+				downStreamVertexID,
+				typeNumber,
+				null,
+				Lists.<String>newArrayList());
 
-		return new Tuple2<StreamNode, StreamNode>(source, sink);
 	}
 
-	protected StreamNode addNode(Integer vertexID, Class<? extends AbstractInvokable> vertexClass,
-			StreamOperator<?> operatorObject, String operatorName) {
+	private void addEdgeInternal(Integer upStreamVertexID,
+			Integer downStreamVertexID,
+			int typeNumber,
+			StreamPartitioner<?> partitioner,
+			List<String> outputNames) {
 
-		StreamNode vertex = new StreamNode(environemnt, vertexID, operatorObject, operatorName,
-				new ArrayList<OutputSelector<?>>(), vertexClass);
 
-		streamNodes.put(vertexID, vertex);
+		if (virtualSelectNodes.containsKey(upStreamVertexID)) {
+			int virtualId = upStreamVertexID;
+			upStreamVertexID = virtualSelectNodes.get(virtualId).f0;
+			if (outputNames.isEmpty()) {
+				// selections that happen downstream override earlier selections
+				outputNames = virtualSelectNodes.get(virtualId).f1;
+			}
+			addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames);
+		} else if (virtuaPartitionNodes.containsKey(upStreamVertexID)) {
+			int virtualId = upStreamVertexID;
+			upStreamVertexID = virtuaPartitionNodes.get(virtualId).f0;
+			if (partitioner == null) {
+				partitioner = virtuaPartitionNodes.get(virtualId).f1;
+			}
+			addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames);
+		} else {
+			StreamNode upstreamNode = getStreamNode(upStreamVertexID);
+			StreamNode downstreamNode = getStreamNode(downStreamVertexID);
+
+			// If no partitioner was specified and the parallelism of upstream and downstream
+			// operator matches use forward partitioning, use rebalance otherwise.
+			if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
+				partitioner = new ForwardPartitioner<Object>();
+			} else if (partitioner == null) {
+				partitioner = new RebalancePartitioner<Object>();
+			}
 
-		return vertex;
-	}
+			if (partitioner instanceof ForwardPartitioner) {
+				if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
+					throw new UnsupportedOperationException("Forward partitioning does not allow " +
+							"change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
+							", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
+							" You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
+				}
+			}
 
-	public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID,
-			StreamPartitioner<?> partitionerObject, int typeNumber, List<String> outputNames) {
+			StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner);
 
-		StreamEdge edge = new StreamEdge(getStreamNode(upStreamVertexID),
-				getStreamNode(downStreamVertexID), typeNumber, outputNames, partitionerObject);
-		getStreamNode(edge.getSourceId()).addOutEdge(edge);
-		getStreamNode(edge.getTargetId()).addInEdge(edge);
+			getStreamNode(edge.getSourceId()).addOutEdge(edge);
+			getStreamNode(edge.getTargetId()).addInEdge(edge);
+		}
 	}
 
 	public <T> void addOutputSelector(Integer vertexID, OutputSelector<T> outputSelector) {
-		getStreamNode(vertexID).addOutputSelector(outputSelector);
+		if (virtuaPartitionNodes.containsKey(vertexID)) {
+			addOutputSelector(virtuaPartitionNodes.get(vertexID).f0, outputSelector);
+		} else if (virtualSelectNodes.containsKey(vertexID)) {
+			addOutputSelector(virtualSelectNodes.get(vertexID).f0, outputSelector);
+		} else {
+			getStreamNode(vertexID).addOutputSelector(outputSelector);
 
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Outputselector set for {}", vertexID);
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Outputselector set for {}", vertexID);
+			}
 		}
 
 	}
 
 	public void setParallelism(Integer vertexID, int parallelism) {
-		getStreamNode(vertexID).setParallelism(parallelism);
+		if (getStreamNode(vertexID) != null) {
+			getStreamNode(vertexID).setParallelism(parallelism);
+		}
 	}
 
 	public void setKey(Integer vertexID, KeySelector<?, ?> key) {
@@ -437,17 +368,19 @@ public class StreamGraph extends StreamingPlan {
 	}
 
 	public void setBufferTimeout(Integer vertexID, long bufferTimeout) {
-		getStreamNode(vertexID).setBufferTimeout(bufferTimeout);
+		if (getStreamNode(vertexID) != null) {
+			getStreamNode(vertexID).setBufferTimeout(bufferTimeout);
+		}
 	}
 
-	private void setSerializers(Integer vertexID, TypeSerializer<?> in1, TypeSerializer<?> in2, TypeSerializer<?> out) {
+	public void setSerializers(Integer vertexID, TypeSerializer<?> in1, TypeSerializer<?> in2, TypeSerializer<?> out) {
 		StreamNode vertex = getStreamNode(vertexID);
 		vertex.setSerializerIn1(in1);
 		vertex.setSerializerIn2(in2);
 		vertex.setSerializerOut(out);
 	}
 
-	private void setSerializersFrom(Integer from, Integer to) {
+	public void setSerializersFrom(Integer from, Integer to) {
 		StreamNode fromVertex = getStreamNode(from);
 		StreamNode toVertex = getStreamNode(to);
 
@@ -469,6 +402,10 @@ public class StreamGraph extends StreamingPlan {
 
 	public void setResourceStrategy(Integer vertexID, ResourceStrategy strategy) {
 		StreamNode node = getStreamNode(vertexID);
+		if (node == null) {
+			return;
+		}
+
 		switch (strategy) {
 		case ISOLATE:
 			node.isolateSlot();
@@ -506,6 +443,11 @@ public class StreamGraph extends StreamingPlan {
 		return sources;
 	}
 
+
+	public Collection<Integer> getSinkIDs() {
+		return sinks;
+	}
+
 	public Collection<StreamNode> getStreamNodes() {
 		return streamNodes.values();
 	}
@@ -519,20 +461,44 @@ public class StreamGraph extends StreamingPlan {
 		return operatorSet;
 	}
 
-	public Collection<StreamLoop> getStreamLoops() {
-		return streamLoops.values();
+	public String getBrokerID(Integer vertexID) {
+		return vertexIDtoBrokerID.get(vertexID);
 	}
 
-	public Integer getLoopID(Integer vertexID) {
-		return vertexIDtoLoop.get(vertexID).getID();
+	public long getLoopTimeout(Integer vertexID) {
+		return vertexIDtoLoopTimeout.get(vertexID);
 	}
 
-	public String getBrokerID(Integer vertexID) {
-		return vertexIDtoBrokerID.get(vertexID);
+	public  Tuple2<StreamNode, StreamNode> createIterationSourceAndSink(int loopId, int sourceId, int sinkId, long timeout, int parallelism) {
+
+		StreamNode source = this.addNode(sourceId,
+				StreamIterationHead.class,
+				null,
+				null);
+		sources.add(source.getId());
+		setParallelism(source.getId(), parallelism);
+
+		StreamNode sink = this.addNode(sinkId,
+				StreamIterationTail.class,
+				null,
+				null);
+		sinks.add(sink.getId());
+		setParallelism(sink.getId(), parallelism);
+
+		iterationSourceSinkPairs.add(new Tuple2<StreamNode, StreamNode>(source, sink));
+
+		source.setOperatorName("IterationSource-" + loopId);
+		sink.setOperatorName("IterationSink-" + loopId);
+		this.vertexIDtoBrokerID.put(source.getId(), "broker-" + loopId);
+		this.vertexIDtoBrokerID.put(sink.getId(), "broker-" + loopId);
+		this.vertexIDtoLoopTimeout.put(source.getId(), timeout);
+		this.vertexIDtoLoopTimeout.put(sink.getId(), timeout);
+
+		return new Tuple2<StreamNode, StreamNode>(source, sink);
 	}
 
-	public long getLoopTimeout(Integer vertexID) {
-		return vertexIDtoLoop.get(vertexID).getTimeout();
+	public Set<Tuple2<StreamNode, StreamNode>> getIterationSourceSinkPairs() {
+		return iterationSourceSinkPairs;
 	}
 
 	protected void removeEdge(StreamEdge edge) {
@@ -570,7 +536,6 @@ public class StreamGraph extends StreamingPlan {
 	 *            name of the jobGraph
 	 */
 	public JobGraph getJobGraph(String jobGraphName) {
-		finalizeLoops();
 		// temporarily forbid checkpointing for iterative jobs
 		if (isIterative() && isCheckpointingEnabled() && !forceCheckpoint) {
 			throw new UnsupportedOperationException(

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
new file mode 100644
index 0000000..6df8cb5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -0,0 +1,530 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.graph;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.FileSourceFunction;
+import org.apache.flink.streaming.api.transformations.CoFeedbackTransformation;
+import org.apache.flink.streaming.api.transformations.FeedbackTransformation;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.streaming.api.transformations.SelectTransformation;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.streaming.api.transformations.SplitTransformation;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.streaming.api.transformations.UnionTransformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A generator that generates a {@link StreamGraph} from a graph of
+ * {@link StreamTransformation StreamTransformations}.
+ *
+ * <p>
+ * This traverses the tree of {@code StreamTransformations} starting from the sinks. At each
+ * we transformation recursively transform the inputs, then create a node in the {@code StreamGraph}
+ * and add edges from the input Nodes to our newly created node. The transformation methods
+ * return the IDs of the nodes in the StreamGraph that represent the input transformation. Several
+ * IDs can be returned to be able to deal with feedback transformations and unions.
+ *
+ * <p>
+ * Partitioning, split/select and union don't create actual nodes in the {@code StreamGraph}. For
+ * these, we create a virtual node in the {@code StreamGraph} that holds the specific property, i.e.
+ * partitioning, selector and so on. When an edge is created from a virtual node to a downstream
+ * node the {@code StreamGraph} resolved the id of the original node and creates an edge
+ * in the graph with the desired property. For example, if you have this graph:
+ *
+ * <pre>
+ *     Map-1 -> HashPartition-2 -> Map-3
+ * </pre>
+ *
+ * where the numbers represent transformation IDs. We first recurse all the way down. {@code Map-1}
+ * is transformed, i.e. we create a {@code StreamNode} with ID 1. Then we transform the
+ * {@code HashPartition}, for this, we create virtual node of ID 4 that holds the property
+ * {@code HashPartition}. This transformation returns the ID 4. Then we transform the {@code Map-3}.
+ * We add the edge {@code 4 -> 3}. The {@code StreamGraph} resolved the actual node with ID 1 and
+ * creates and edge {@code 1 -> 3} with the property HashPartition.
+ */
+public class StreamGraphGenerator {
+
+	private static final Logger LOG = LoggerFactory.getLogger(StreamGraphGenerator.class);
+
+	// The StreamGraph that is being built, this is initialized at the beginning.
+	private StreamGraph streamGraph;
+
+	private final StreamExecutionEnvironment env;
+
+	// This is used to assign a unique ID to iteration source/sink
+	protected static Integer iterationIdCounter = 0;
+	public static int getNewIterationNodeId() {
+		iterationIdCounter--;
+		return iterationIdCounter;
+	}
+
+	// Keep track of which Transforms we have already transformed, this is necessary because
+	// we have loops, i.e. feedback edges.
+	private Map<StreamTransformation<?>, Collection<Integer>> alreadyTransformed;
+
+
+	/**
+	 * Private constructor. The generator should only be invoked using {@link #generate}.
+	 */
+	private StreamGraphGenerator(StreamExecutionEnvironment env) {
+		this.streamGraph = new StreamGraph(env);
+		this.streamGraph.setChaining(env.isChainingEnabled());
+		if (env.getCheckpointInterval() > 0) {
+			this.streamGraph.setCheckpointingEnabled(true);
+			this.streamGraph.setCheckpointingInterval(env.getCheckpointInterval());
+			this.streamGraph.setCheckpointingMode(env.getCheckpointingMode());
+		}
+		this.streamGraph.setStateHandleProvider(env.getStateHandleProvider());
+		if (env.isForceCheckpointing()) {
+			this.streamGraph.forceCheckpoint();
+		}
+		this.env = env;
+		this.alreadyTransformed = Maps.newHashMap();
+	}
+
+	/**
+	 * Generates a {@code StreamGraph} by traversing the graph of {@code StreamTransformations}
+	 * starting from the given transformations.
+	 *
+	 * @param env The {@code StreamExecutionEnvironment} that is used to set some parameters of the
+	 *            job
+	 * @param transformations The transformations starting from which to transform the graph
+	 *
+	 * @return The generated {@code StreamGraph}
+	 */
+	public static StreamGraph generate(StreamExecutionEnvironment env, List<StreamTransformation<?>> transformations) {
+		return new StreamGraphGenerator(env).generateInternal(transformations);
+	}
+
+	/**
+	 * This starts the actual transformation, beginning from the sinks.
+	 */
+	private StreamGraph generateInternal(List<StreamTransformation<?>> transformations) {
+		for (StreamTransformation<?> transformation: transformations) {
+			transform(transformation);
+		}
+		return streamGraph;
+	}
+
+	/**
+	 * Transforms one {@code StreamTransformation}.
+	 *
+	 * <p>
+	 * This checks whether we already transformed it and exits early in that case. If not it
+	 * delegates to one of the transformation specific methods.
+	 */
+	private Collection<Integer> transform(StreamTransformation<?> transform) {
+
+		if (alreadyTransformed.containsKey(transform)) {
+			return alreadyTransformed.get(transform);
+		}
+
+		LOG.debug("Transforming " + transform);
+
+		// call at least once to trigger exceptions about MissingTypeInfo
+		transform.getOutputType();
+
+		Collection<Integer> transformedIds;
+		if (transform instanceof OneInputTransformation<?, ?>) {
+			transformedIds = transformOnInputTransform((OneInputTransformation<?, ?>) transform);
+		} else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
+			transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
+		} else if (transform instanceof SourceTransformation<?>) {
+			transformedIds = transformSource((SourceTransformation<?>) transform);
+		} else if (transform instanceof SinkTransformation<?>) {
+			transformedIds = transformSink((SinkTransformation<?>) transform);
+		} else if (transform instanceof UnionTransformation<?>) {
+			transformedIds = transformUnion((UnionTransformation<?>) transform);
+		} else if (transform instanceof SplitTransformation<?>) {
+			transformedIds = transformSplit((SplitTransformation<?>) transform);
+		} else if (transform instanceof SelectTransformation<?>) {
+			transformedIds = transformSelect((SelectTransformation<?>) transform);
+		} else if (transform instanceof FeedbackTransformation<?>) {
+			transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
+		} else if (transform instanceof CoFeedbackTransformation<?>) {
+			transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
+		} else if (transform instanceof PartitionTransformation<?>) {
+			transformedIds = transformPartition((PartitionTransformation<?>) transform);
+		} else {
+			throw new IllegalStateException("Unknown transformation: " + transform);
+		}
+
+		// need this check because the iterate transformation adds itself before
+		// transforming the feedback edges
+		if (!alreadyTransformed.containsKey(transform)) {
+			alreadyTransformed.put(transform, transformedIds);
+		}
+
+		if (transform.getBufferTimeout() > 0) {
+			streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
+		}
+		if (transform.getResourceStrategy() != StreamGraph.ResourceStrategy.DEFAULT) {
+			streamGraph.setResourceStrategy(transform.getId(), transform.getResourceStrategy());
+		}
+
+		return transformedIds;
+	}
+
+	/**
+	 * Transforms a {@code UnionTransformation}.
+	 *
+	 * <p>
+	 * This is easy, we only have to transform the inputs and return all the IDs in a list so
+	 * that downstream operations can connect to all upstream nodes.
+	 */
+	private <T> Collection<Integer> transformUnion(UnionTransformation<T> union) {
+		List<StreamTransformation<T>> inputs = union.getInputs();
+		List<Integer> resultIds = Lists.newArrayList();
+
+		for (StreamTransformation<T> input: inputs) {
+			resultIds.addAll(transform(input));
+		}
+
+		return resultIds;
+	}
+
+	/**
+	 * Transforms a {@code PartitionTransformation}.
+	 *
+	 * <p>
+	 * For this we create a virtual node in the {@code StreamGraph} that holds the partition
+	 * property. @see StreamGraphGenerator
+	 */
+	private <T> Collection<Integer> transformPartition(PartitionTransformation<T> partition) {
+		StreamTransformation<T> input = partition.getInput();
+		List<Integer> resultIds = Lists.newArrayList();
+
+		Collection<Integer> transformedIds = transform(input);
+		for (Integer transformedId: transformedIds) {
+			int virtualId = StreamTransformation.getNewNodeId();
+			streamGraph.addVirtualPartitionNode(transformedId, virtualId, partition.getPartitioner());
+			resultIds.add(virtualId);
+		}
+
+		return resultIds;
+	}
+
+	/**
+	 * Transforms a {@code SplitTransformation}.
+	 *
+	 * <p>
+	 * We add the output selector to previously transformed nodes.
+	 */
+	private <T> Collection<Integer> transformSplit(SplitTransformation<T> split) {
+
+		StreamTransformation<T> input = split.getInput();
+		Collection<Integer> resultIds = transform(input);
+
+		// the recursive transform call might have transformed this already
+		if (alreadyTransformed.containsKey(split)) {
+			return alreadyTransformed.get(split);
+		}
+
+		for (int inputId : resultIds) {
+			streamGraph.addOutputSelector(inputId, split.getOutputSelector());
+		}
+
+
+		return resultIds;
+	}
+
+	/**
+	 * Transforms a {@code SelectTransformation}.
+	 *
+	 * <p>
+	 * For this we create a virtual node in the {@code StreamGraph} holds the selected names.
+	 * @see org.apache.flink.streaming.api.graph.StreamGraphGenerator
+	 */
+	private <T> Collection<Integer> transformSelect(SelectTransformation<T> select) {
+		StreamTransformation<T> input = select.getInput();
+		Collection<Integer> resultIds = transform(input);
+
+
+		// the recursive transform might have already transformed this
+		if (alreadyTransformed.containsKey(select)) {
+			return alreadyTransformed.get(select);
+		}
+
+		List<Integer> virtualResultIds = Lists.newArrayList();
+
+		for (int inputId : resultIds) {
+			int virtualId = StreamTransformation.getNewNodeId();
+			streamGraph.addVirtualSelectNode(inputId, virtualId, select.getSelectedNames());
+			virtualResultIds.add(virtualId);
+		}
+		return virtualResultIds;
+	}
+
+	/**
+	 * Transforms a {@code FeedbackTransformation}.
+	 *
+	 * <p>
+	 * This will recursively transform the input and the feedback edges. We return the concatenation
+	 * of the input IDs and the feedback IDs so that downstream operations can be wired to both.
+	 *
+	 * <p>
+	 * This is responsible for creating the IterationSource and IterationSink which
+	 * are used to feed back the elements.
+	 */
+	private <T> Collection<Integer> transformFeedback(FeedbackTransformation<T> iterate) {
+
+		if (iterate.getFeedbackEdges().size() <= 0) {
+			throw new IllegalStateException("Iteration " + iterate + " does not have any feedback edges.");
+		}
+
+		StreamTransformation<T> input = iterate.getInput();
+		List<Integer> resultIds = Lists.newArrayList();
+
+		// first transform the input stream(s) and store the result IDs
+		resultIds.addAll(transform(input));
+
+		// the recursive transform might have already transformed this
+		if (alreadyTransformed.containsKey(iterate)) {
+			return alreadyTransformed.get(iterate);
+		}
+
+
+		// create the fake iteration source/sink pair
+		Tuple2<StreamNode, StreamNode> itSourceAndSink = streamGraph.createIterationSourceAndSink(
+				iterate.getId(),
+				getNewIterationNodeId(),
+				getNewIterationNodeId(),
+				iterate.getWaitTime(),
+				iterate.getParallelism());
+
+		StreamNode itSource = itSourceAndSink.f0;
+		StreamNode itSink = itSourceAndSink.f1;
+
+		// We set the proper serializers for the sink/source
+		streamGraph.setSerializers(itSource.getId(), null, null, iterate.getOutputType().createSerializer(env.getConfig()));
+		streamGraph.setSerializers(itSink.getId(), iterate.getOutputType().createSerializer(env.getConfig()), null, null);
+
+		// also add the feedback source ID to the result IDs, so that downstream operators will
+		// add both as input
+		resultIds.add(itSource.getId());
+
+		// at the iterate to the already-seen-set with the result IDs, so that we can transform
+		// the feedback edges and let them stop when encountering the iterate node
+		alreadyTransformed.put(iterate, resultIds);
+
+		for (StreamTransformation<T> feedbackEdge : iterate.getFeedbackEdges()) {
+			Collection<Integer> feedbackIds = transform(feedbackEdge);
+			for (Integer feedbackId: feedbackIds) {
+				streamGraph.addEdge(feedbackId,
+						itSink.getId(),
+						0
+				);
+			}
+		}
+
+		return resultIds;
+	}
+
+	/**
+	 * Transforms a {@code CoFeedbackTransformation}.
+	 *
+	 * <p>
+	 * This will only transform feedback edges, the result of this transform will be wired
+	 * to the second input of a Co-Transform. The original input is wired directly to the first
+	 * input of the downstream Co-Transform.
+	 *
+	 * <p>
+	 * This is responsible for creating the IterationSource and IterationSink which
+	 * are used to feed back the elements.
+	 */
+	private <F> Collection<Integer> transformCoFeedback(CoFeedbackTransformation<F> coIterate) {
+
+		// For Co-Iteration we don't need to transform the input and wire the input to the
+		// head operator by returning the input IDs, the input is directly wired to the left
+		// input of the co-operation. This transform only needs to return the ids of the feedback
+		// edges, since they need to be wired to the second input of the co-operation.
+
+		// create the fake iteration source/sink pair
+		Tuple2<StreamNode, StreamNode> itSourceAndSink = streamGraph.createIterationSourceAndSink(
+				coIterate.getId(),
+				getNewIterationNodeId(),
+				getNewIterationNodeId(),
+				coIterate.getWaitTime(),
+				coIterate.getParallelism());
+
+		StreamNode itSource = itSourceAndSink.f0;
+		StreamNode itSink = itSourceAndSink.f1;
+
+		// We set the proper serializers for the sink/source
+		streamGraph.setSerializers(itSource.getId(), null, null, coIterate.getOutputType().createSerializer(env.getConfig()));
+		streamGraph.setSerializers(itSink.getId(), coIterate.getOutputType().createSerializer(env.getConfig()), null, null);
+
+		Collection<Integer> resultIds = Collections.singleton(itSource.getId());
+
+		// at the iterate to the already-seen-set with the result IDs, so that we can transform
+		// the feedback edges and let them stop when encountering the iterate node
+		alreadyTransformed.put(coIterate, resultIds);
+
+		for (StreamTransformation<F> feedbackEdge : coIterate.getFeedbackEdges()) {
+			Collection<Integer> feedbackIds = transform(feedbackEdge);
+			for (Integer feedbackId: feedbackIds) {
+				streamGraph.addEdge(feedbackId,
+						itSink.getId(),
+						0
+				);
+			}
+		}
+
+		return Collections.singleton(itSource.getId());
+	}
+
+	/**
+	 * Transforms a {@code SourceTransformation}.
+	 */
+	private <T> Collection<Integer> transformSource(SourceTransformation<T> source) {
+		streamGraph.addSource(source.getId(),
+				source.getOperator(),
+				null,
+				source.getOutputType(),
+				"Source: " + source.getName());
+		if (source.getOperator().getUserFunction() instanceof FileSourceFunction) {
+			FileSourceFunction<T> fs = (FileSourceFunction<T>) source.getOperator().getUserFunction();
+			streamGraph.setInputFormat(source.getId(), fs.getFormat());
+		}
+		streamGraph.setParallelism(source.getId(), source.getParallelism());
+		return Collections.singleton(source.getId());
+	}
+
+	/**
+	 * Transforms a {@code SourceTransformation}.
+	 */
+	private <T> Collection<Integer> transformSink(SinkTransformation<T> sink) {
+
+		Collection<Integer> inputIds = transform(sink.getInput());
+
+		streamGraph.addSink(sink.getId(),
+				sink.getOperator(),
+				sink.getInput().getOutputType(),
+				null,
+				"Sink: " + sink.getName());
+
+		streamGraph.setParallelism(sink.getId(), sink.getParallelism());
+
+		for (Integer inputId: inputIds) {
+			streamGraph.addEdge(inputId,
+					sink.getId(),
+					0
+			);
+		}
+
+
+		if (sink.getStateKeySelector() != null) {
+			streamGraph.setKey(sink.getId(), sink.getStateKeySelector());
+		}
+
+		return Collections.emptyList();
+	}
+
+	/**
+	 * Transforms a {@code OneInputTransformation}.
+	 *
+	 * <p>
+	 * This recusively transforms the inputs, creates a new {@code StreamNode} in the graph and
+	 * wired the inputs to this new node.
+	 */
+	private <IN, OUT> Collection<Integer> transformOnInputTransform(OneInputTransformation<IN, OUT> transform) {
+
+		Collection<Integer> inputIds = transform(transform.getInput());
+
+		// the recursive call might have already transformed this
+		if (alreadyTransformed.containsKey(transform)) {
+			return alreadyTransformed.get(transform);
+		}
+
+		streamGraph.addOperator(transform.getId(),
+				transform.getOperator(),
+				transform.getInputType(),
+				transform.getOutputType(),
+				transform.getName());
+
+		if (transform.getStateKeySelector() != null) {
+			streamGraph.setKey(transform.getId(), transform.getStateKeySelector());
+		}
+
+		streamGraph.setParallelism(transform.getId(), transform.getParallelism());
+
+		for (Integer inputId: inputIds) {
+			streamGraph.addEdge(inputId, transform.getId(), 0);
+		}
+
+		return Collections.singleton(transform.getId());
+	}
+
+	/**
+	 * Transforms a {@code TwoInputTransformation}.
+	 *
+	 * <p>
+	 * This recusively transforms the inputs, creates a new {@code StreamNode} in the graph and
+	 * wired the inputs to this new node.
+	 */
+	private <IN1, IN2, OUT> Collection<Integer> transformTwoInputTransform(TwoInputTransformation<IN1, IN2, OUT> transform) {
+
+		Collection<Integer> inputIds1 = transform(transform.getInput1());
+		Collection<Integer> inputIds2 = transform(transform.getInput2());
+
+		// the recursive call might have already transformed this
+		if (alreadyTransformed.containsKey(transform)) {
+			return alreadyTransformed.get(transform);
+		}
+
+		streamGraph.addCoOperator(
+				transform.getId(),
+				transform.getOperator(),
+				transform.getInputType1(),
+				transform.getInputType2(),
+				transform.getOutputType(),
+				transform.getName());
+
+		streamGraph.setParallelism(transform.getId(), transform.getParallelism());
+
+		for (Integer inputId: inputIds1) {
+			streamGraph.addEdge(inputId,
+					transform.getId(),
+					1
+			);
+		}
+
+		for (Integer inputId: inputIds2) {
+			streamGraph.addEdge(inputId,
+					transform.getId(),
+					2
+			);
+		}
+
+		return Collections.singleton(transform.getId());
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamLoop.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamLoop.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamLoop.java
deleted file mode 100644
index ba987ef..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamLoop.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.graph;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
-
-/**
- * Object for representing loops in streaming programs.
- * 
- */
-public class StreamLoop {
-
-	private Integer loopID;
-
-	private List<StreamNode> headOperators = new ArrayList<StreamNode>();
-	private List<StreamNode> tailOperators = new ArrayList<StreamNode>();
-	private List<StreamPartitioner<?>> tailPartitioners = new ArrayList<StreamPartitioner<?>>();
-	private List<List<String>> tailSelectedNames = new ArrayList<List<String>>();
-
-	private boolean coIteration = false;
-	private TypeInformation<?> feedbackType = null;
-
-	private long timeout;
-	private boolean tailPartitioning = false;
-
-	private List<Tuple2<StreamNode, StreamNode>> sourcesAndSinks = new ArrayList<Tuple2<StreamNode, StreamNode>>();
-
-	public StreamLoop(Integer loopID, long timeout, TypeInformation<?> feedbackType) {
-		this.loopID = loopID;
-		this.timeout = timeout;
-		if (feedbackType != null) {
-			this.feedbackType = feedbackType;
-			coIteration = true;
-			tailPartitioning = true;
-		}
-	}
-
-	public Integer getID() {
-		return loopID;
-	}
-
-	public long getTimeout() {
-		return timeout;
-	}
-
-	public boolean isCoIteration() {
-		return coIteration;
-	}
-
-	public TypeInformation<?> getFeedbackType() {
-		return feedbackType;
-	}
-
-	public void addSourceSinkPair(StreamNode source, StreamNode sink) {
-		this.sourcesAndSinks.add(new Tuple2<StreamNode, StreamNode>(source, sink));
-	}
-
-	public List<Tuple2<StreamNode, StreamNode>> getSourceSinkPairs() {
-		return this.sourcesAndSinks;
-	}
-
-	public void addHeadOperator(StreamNode head) {
-		this.headOperators.add(head);
-	}
-
-	public void addTailOperator(StreamNode tail, StreamPartitioner<?> partitioner,
-			List<String> selectedNames) {
-		this.tailOperators.add(tail);
-		this.tailPartitioners.add(partitioner);
-		this.tailSelectedNames.add(selectedNames);
-	}
-
-	public void applyTailPartitioning() {
-		this.tailPartitioning = true;
-	}
-
-	public boolean keepsPartitioning() {
-		return tailPartitioning;
-	}
-
-	public List<StreamNode> getHeads() {
-		return headOperators;
-	}
-
-	public List<StreamNode> getTails() {
-		return tailOperators;
-	}
-
-	public List<StreamPartitioner<?>> getTailPartitioners() {
-		return tailPartitioners;
-	}
-
-	public List<List<String>> getTailSelectedNames() {
-		return tailSelectedNames;
-	}
-
-	@Override
-	public String toString() {
-		return "ID: " + loopID + "\n" + "Head: " + headOperators + "\n" + "Tail: " + tailOperators
-				+ "\n" + "TP: " + tailPartitioners + "\n" + "TSN: " + tailSelectedNames;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index 62e2d83..9110cd3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -125,7 +125,11 @@ public class StreamNode implements Serializable {
 	}
 
 	public int getParallelism() {
-		return parallelism != null ? parallelism : env.getParallelism();
+		if (parallelism == -1) {
+			return env.getParallelism();
+		} else {
+			return parallelism;
+		}
 	}
 
 	public void setParallelism(Integer parallelism) {
@@ -218,7 +222,7 @@ public class StreamNode implements Serializable {
 	
 	@Override
 	public String toString() {
-		return operatorName + id;
+		return operatorName + "-" + id;
 	}
 
 	public KeySelector<?, ?> getStatePartitioner() {
@@ -228,4 +232,23 @@ public class StreamNode implements Serializable {
 	public void setStatePartitioner(KeySelector<?, ?> statePartitioner) {
 		this.statePartitioner = statePartitioner;
 	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+
+		StreamNode that = (StreamNode) o;
+
+		return id.equals(that.id);
+	}
+
+	@Override
+	public int hashCode() {
+		return id.hashCode();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 5280fb2..314d1b3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -44,8 +44,8 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator.ChainingStrategy;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
-import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner.PartitioningStrategy;
 import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
 import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
 import org.apache.flink.util.InstantiationUtil;
@@ -294,11 +294,6 @@ public class StreamingJobGraphGenerator {
 		List<StreamEdge> allOutputs = new ArrayList<StreamEdge>(chainableOutputs);
 		allOutputs.addAll(nonChainableOutputs);
 
-		for (StreamEdge output : allOutputs) {
-			config.setSelectedNames(output.getTargetId(),
-					streamGraph.getStreamEdge(vertexID, output.getTargetId()).getSelectedNames());
-		}
-
 		vertexConfigs.put(vertexID, config);
 	}
 
@@ -316,7 +311,7 @@ public class StreamingJobGraphGenerator {
 		downStreamConfig.setNumberOfInputs(downStreamConfig.getNumberOfInputs() + 1);
 
 		StreamPartitioner<?> partitioner = edge.getPartitioner();
-		if (partitioner.getStrategy() == PartitioningStrategy.FORWARD) {
+		if (partitioner instanceof ForwardPartitioner) {
 			downStreamVertex.connectNewDataSetAsInput(headVertex, DistributionPattern.POINTWISE);
 		} else {
 			downStreamVertex.connectNewDataSetAsInput(headVertex, DistributionPattern.ALL_TO_ALL);
@@ -345,7 +340,7 @@ public class StreamingJobGraphGenerator {
 				&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
 					headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS ||
 					headOperator.getChainingStrategy() == ChainingStrategy.FORCE_ALWAYS)
-				&& (edge.getPartitioner().getStrategy() == PartitioningStrategy.FORWARD || downStreamVertex
+				&& (edge.getPartitioner() instanceof ForwardPartitioner || downStreamVertex
 						.getParallelism() == 1)
 				&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
 				&& (streamGraph.isChainingEnabled() ||
@@ -370,21 +365,19 @@ public class StreamingJobGraphGenerator {
 			}
 		}
 
-		for (StreamLoop loop : streamGraph.getStreamLoops()) {
-			for (Tuple2<StreamNode, StreamNode> pair : loop.getSourceSinkPairs()) {
-				
-				CoLocationGroup ccg = new CoLocationGroup();
-				
-				JobVertex source = jobVertices.get(pair.f0.getId());
-				JobVertex sink = jobVertices.get(pair.f1.getId());
-				
-				ccg.addVertex(source);
-				ccg.addVertex(sink);
-				source.updateCoLocationGroup(ccg);
-				sink.updateCoLocationGroup(ccg);
-			}
+		for (Tuple2<StreamNode, StreamNode> pair : streamGraph.getIterationSourceSinkPairs()) {
 
+			CoLocationGroup ccg = new CoLocationGroup();
+
+			JobVertex source = jobVertices.get(pair.f0.getId());
+			JobVertex sink = jobVertices.get(pair.f1.getId());
+
+			ccg.addVertex(source);
+			ccg.addVertex(sink);
+			source.updateCoLocationGroup(ccg);
+			sink.updateCoLocationGroup(ccg);
 		}
+
 	}
 	
 	private void configureCheckpointing() {

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java
index dce7684..cbd2a40 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java
@@ -28,7 +28,6 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer;
 import org.apache.flink.streaming.api.operators.windowing.WindowFlattener;
 import org.apache.flink.streaming.api.operators.windowing.WindowMerger;
-import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 
 public class WindowingOptimizer {
 
@@ -64,8 +63,7 @@ public class WindowingOptimizer {
 				StreamNode mergeInput = input.getInEdges().get(0).getSourceVertex();
 
 				// We connect the merge input to the flattener directly
-				streamGraph.addEdge(mergeInput.getId(), flattenerId,
-						new RebalancePartitioner(true), 0, new ArrayList<String>());
+				streamGraph.addEdge(mergeInput.getId(), flattenerId, 0);
 
 				// If the merger is only connected to the flattener we delete it
 				// completely, otherwise we only remove the edge
@@ -107,8 +105,7 @@ public class WindowingOptimizer {
 
 				for (StreamEdge edge1 : discretizer.getInEdges()) {
 					for (StreamEdge edge2 : candidate.f1.get(0).getInEdges()) {
-						if (edge1.getPartitioner().getStrategy() != edge2.getPartitioner()
-								.getStrategy()) {
+						if (edge1.getPartitioner().getClass() != edge2.getPartitioner().getClass()) {
 							partitionersMatch = false;
 						}
 					}
@@ -155,8 +152,7 @@ public class WindowingOptimizer {
 		for (int i = 0; i < numOutputs; i++) {
 			StreamEdge outEdge = outEdges.get(i);
 
-			streamGraph.addEdge(replaceWithId, outEdge.getTargetId(), outEdge.getPartitioner(), 0,
-					new ArrayList<String>());
+			streamGraph.addEdge(replaceWithId, outEdge.getTargetId(), 0);
 		}
 
 		// Remove the other discretizer

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/CoFeedbackTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/CoFeedbackTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/CoFeedbackTransformation.java
new file mode 100644
index 0000000..67ccbd6
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/CoFeedbackTransformation.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.transformations;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * This represents a feedback point in a topology. The type of the feedback elements must not match
+ * the type of the upstream {@code StreamTransformation} because the only allowed operations
+ * after a {@code CoFeedbackTransformation} are
+ * {@link org.apache.flink.streaming.api.transformations.TwoInputTransformation TwoInputTransformations}.
+ * The upstream {@code StreamTransformation} will be connected to the first input of the Co-Transform
+ * while the feedback edges will be connected to the second input.
+ *
+ * <p>
+ * Both the partitioning of the input and the feedback edges is preserved. They can also have
+ * differing partitioning strategies. This requires, however, that the parallelism of the feedback
+ * {@code StreamTransformations} must match the parallelism of the input
+ * {@code StreamTransformation}.
+ *
+ * <p>
+ * The upstream {@code StreamTransformation} is not wired to this {@code CoFeedbackTransformation}.
+ * It is instead directly wired to the {@code TwoInputTransformation} after this
+ * {@code CoFeedbackTransformation}.
+ *
+ * <p>
+ * This is different from Iterations in batch processing.
+ * @see org.apache.flink.streaming.api.transformations.FeedbackTransformation
+ *
+ * @param <F> The type of the feedback elements.
+ *
+ */
+public class CoFeedbackTransformation<F> extends StreamTransformation<F> {
+
+	private final List<StreamTransformation<F>> feedbackEdges;
+
+	private final Long waitTime;
+
+	/**
+	 * Creates a new {@code CoFeedbackTransformation} from the given input.
+	 *
+	 * @param parallelism The parallelism of the upstream {@code StreamTransformatino} and the
+	 *                    feedback edges.
+	 * @param feedbackType The type of the feedback edges
+	 * @param waitTime The wait time of the feedback operator. After the time expires
+	 *                          the operation will close and not receive any more feedback elements.
+	 */
+	public CoFeedbackTransformation(int parallelism,
+			TypeInformation<F> feedbackType,
+			Long waitTime) {
+		super("CoFeedback", feedbackType, parallelism);
+		this.waitTime = waitTime;
+		this.feedbackEdges = Lists.newArrayList();
+	}
+
+	/**
+	 * Adds a feedback edge. The parallelism of the {@code StreamTransformation} must match
+	 * the parallelism of the input {@code StreamTransformation} of the upstream
+	 * {@code StreamTransformation}.
+	 *
+	 * @param transform The new feedback {@code StreamTransformation}.
+	 */
+	public void addFeedbackEdge(StreamTransformation<F> transform) {
+
+		if (transform.getParallelism() != this.getParallelism()) {
+			throw new UnsupportedOperationException(
+					"Parallelism of the feedback stream must match the parallelism of the original" +
+							" stream. Parallelism of original stream: " + this.getParallelism() +
+							"; parallelism of feedback stream: " + transform.getParallelism());
+		}
+
+		feedbackEdges.add(transform);
+	}
+
+	/**
+	 * Returns the list of feedback {@code StreamTransformations}.
+	 */
+	public List<StreamTransformation<F>> getFeedbackEdges() {
+		return feedbackEdges;
+	}
+
+	/**
+	 * Returns the wait time. This is the amount of time that the feedback operator keeps listening
+	 * for feedback elements. Once the time expires the operation will close and will not receive
+	 * further elements.
+	 */
+	public Long getWaitTime() {
+		return waitTime;
+	}
+
+	@Override
+	public final void setChainingStrategy(StreamOperator.ChainingStrategy strategy) {
+		throw new UnsupportedOperationException("Cannot set chaining strategy on Split Transformation.");
+	}
+
+	@Override
+	public Collection<StreamTransformation<?>> getTransitivePredecessors() {
+		return Collections.<StreamTransformation<?>>singleton(this);
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/FeedbackTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/FeedbackTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/FeedbackTransformation.java
new file mode 100644
index 0000000..11a2f33
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/FeedbackTransformation.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.transformations;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * This represents a feedback point in a topology.
+ *
+ * <p>
+ * This is different from how iterations work in batch processing. Once a feedback point is defined
+ * you can connect one or several {@code StreamTransformations} as a feedback edges. Operations
+ * downstream from the feedback point will receive elements from the input of this feedback point
+ * and from the feedback edges.
+ *
+ * <p>
+ * Both the partitioning of the input and the feedback edges is preserved. They can also have
+ * differing partitioning strategies. This requires, however, that the parallelism of the feedback
+ * {@code StreamTransformations} must match the parallelism of the input
+ * {@code StreamTransformation}.
+ *
+ * <p>
+ * The type of the input {@code StreamTransformation} and the feedback {@code StreamTransformation}
+ * must match.
+ *
+ * @param <T> The type of the input elements and the feedback elements.
+ */
+public class FeedbackTransformation<T> extends StreamTransformation<T> {
+
+	private final StreamTransformation<T> input;
+
+	private final List<StreamTransformation<T>> feedbackEdges;
+
+	private final Long waitTime;
+
+	/**
+	 * Creates a new {@code FeedbackTransformation} from the given input.
+	 *
+	 * @param input The input {@code StreamTransformation}
+	 * @param waitTime The wait time of the feedback operator. After the time expires
+	 *                          the operation will close and not receive any more feedback elements.
+	 */
+	public FeedbackTransformation(StreamTransformation<T> input, Long waitTime) {
+		super("Feedback", input.getOutputType(), input.getParallelism());
+		this.input = input;
+		this.waitTime = waitTime;
+		this.feedbackEdges = Lists.newArrayList();
+	}
+
+	/**
+	 * Returns the input {@code StreamTransformation} of this {@code FeedbackTransformation}.
+	 */
+	public StreamTransformation<T> getInput() {
+		return input;
+	}
+
+	/**
+	 * Adds a feedback edge. The parallelism of the {@code StreamTransformation} must match
+	 * the parallelism of the input {@code StreamTransformation} of this
+	 * {@code FeedbackTransformation}
+	 *
+	 * @param transform The new feedback {@code StreamTransformation}.
+	 */
+	public void addFeedbackEdge(StreamTransformation<T> transform) {
+
+		if (transform.getParallelism() != this.getParallelism()) {
+			throw new UnsupportedOperationException(
+					"Parallelism of the feedback stream must match the parallelism of the original" +
+							" stream. Parallelism of original stream: " + this.getParallelism() +
+							"; parallelism of feedback stream: " + transform.getParallelism());
+		}
+
+		feedbackEdges.add(transform);
+	}
+
+	/**
+	 * Returns the list of feedback {@code StreamTransformations}.
+	 */
+	public List<StreamTransformation<T>> getFeedbackEdges() {
+		return feedbackEdges;
+	}
+
+	/**
+	 * Returns the wait time. This is the amount of time that the feedback operator keeps listening
+	 * for feedback elements. Once the time expires the operation will close and will not receive
+	 * further elements.
+	 */
+	public Long getWaitTime() {
+		return waitTime;
+	}
+
+	@Override
+	public final void setChainingStrategy(StreamOperator.ChainingStrategy strategy) {
+		throw new UnsupportedOperationException("Cannot set chaining strategy on Split Transformation.");
+	}
+
+	@Override
+	public Collection<StreamTransformation<?>> getTransitivePredecessors() {
+		List<StreamTransformation<?>> result = Lists.newArrayList();
+		result.add(this);
+		result.addAll(input.getTransitivePredecessors());
+		return result;
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java
new file mode 100644
index 0000000..945d8eb
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.transformations;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * This Transformation represents the application of a
+ * {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator} to one input
+ * {@link org.apache.flink.streaming.api.transformations.StreamTransformation}.
+ *
+ * @param <IN> The type of the elements in the nput {@code StreamTransformation}
+ * @param <OUT> The type of the elements that result from this {@code OneInputTransformation}
+ */
+public class OneInputTransformation<IN, OUT> extends StreamTransformation<OUT> {
+
+	private final StreamTransformation<IN> input;
+
+	private KeySelector<IN, ?> stateKeySelector;
+
+	private final OneInputStreamOperator<IN, OUT> operator;
+
+	/**
+	 * Creates a new {@code OneInputTransformation} from the given input and operator.
+	 *
+	 * @param input The input {@code StreamTransformation}
+	 * @param name The name of the {@code StreamTransformation}, this will be shown in Visualizations and the Log
+	 * @param operator The {@code TwoInputStreamOperator}
+	 * @param outputType The type of the elements produced by this {@code OneInputTransformation}
+	 * @param parallelism The parallelism of this {@code OneInputTransformation}
+	 */
+	public OneInputTransformation(
+			StreamTransformation<IN> input,
+			String name,
+			OneInputStreamOperator<IN, OUT> operator,
+			TypeInformation<OUT> outputType,
+			int parallelism) {
+		super(name, outputType, parallelism);
+		this.input = input;
+		this.operator = operator;
+	}
+
+	/**
+	 * Returns the input {@code StreamTransformation} of this {@code OneInputTransformation}.
+	 */
+	public StreamTransformation<IN> getInput() {
+		return input;
+	}
+
+	/**
+	 * Returns the {@code TypeInformation} for the elements of the input.
+	 */
+	public TypeInformation<IN> getInputType() {
+		return input.getOutputType();
+	}
+
+	/**
+	 * Returns the {@code TwoInputStreamOperator} of this Transformation.
+	 */
+	public OneInputStreamOperator<IN, OUT> getOperator() {
+		return operator;
+	}
+
+	/**
+	 * Sets the {@link KeySelector} that must be used for partitioning keyed state of this operation.
+	 *
+	 * @param stateKeySelector The {@code KeySelector} to set
+	 */
+	public void setStateKeySelector(KeySelector<IN, ?> stateKeySelector) {
+		this.stateKeySelector = stateKeySelector;
+	}
+
+	/**
+	 * Returns the {@code KeySelector} that must be used for partitioning keyed state in this
+	 * Operation.
+	 *
+	 * @see #setStateKeySelector
+	 */
+	public KeySelector<IN, ?> getStateKeySelector() {
+		return stateKeySelector;
+	}
+
+	@Override
+	public Collection<StreamTransformation<?>> getTransitivePredecessors() {
+		List<StreamTransformation<?>> result = Lists.newArrayList();
+		result.add(this);
+		result.addAll(input.getTransitivePredecessors());
+		return result;
+	}
+
+	@Override
+	public final void setChainingStrategy(StreamOperator.ChainingStrategy strategy) {
+		operator.setChainingStrategy(strategy);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/PartitionTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/PartitionTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/PartitionTransformation.java
new file mode 100644
index 0000000..1165d5d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/PartitionTransformation.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.transformations;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * This transformation represents a change of partitioning of the input elements.
+ *
+ * <p>
+ * This does not create a physical operation, it only affects how upstream operations are
+ * connected to downstream operations.
+ *
+ * @param <T> The type of the elements that result from this {@code PartitionTransformation}
+ */
+public class PartitionTransformation<T> extends StreamTransformation<T> {
+	private final StreamTransformation<T> input;
+	private final StreamPartitioner<T> partitioner;
+
+	/**
+	 * Creates a new {@code PartitionTransformation} from the given input and
+	 * {@link StreamPartitioner}.
+	 *
+	 * @param input The input {@code StreamTransformation}
+	 * @param partitioner The {@code StreamPartitioner}
+	 */
+	public PartitionTransformation(StreamTransformation<T> input, StreamPartitioner<T> partitioner) {
+		super("Partition", input.getOutputType(), input.getParallelism());
+		this.input = input;
+		this.partitioner = partitioner;
+	}
+
+	/**
+	 * Returns the input {@code StreamTransformation} of this {@code SinkTransformation}.
+	 */
+	public StreamTransformation<T> getInput() {
+		return input;
+	}
+
+	/**
+	 * Returns the {@code StreamPartitioner} that must be used for partitioning the elements
+	 * of the input {@code StreamTransformation}.
+	 */
+	public StreamPartitioner<T> getPartitioner() {
+		return partitioner;
+	}
+
+	@Override
+	public Collection<StreamTransformation<?>> getTransitivePredecessors() {
+		List<StreamTransformation<?>> result = Lists.newArrayList();
+		result.add(this);
+		result.addAll(input.getTransitivePredecessors());
+		return result;
+	}
+
+	@Override
+	public final void setChainingStrategy(StreamOperator.ChainingStrategy strategy) {
+		throw new UnsupportedOperationException("Cannot set chaining strategy on Union Transformation.");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SelectTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SelectTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SelectTransformation.java
new file mode 100644
index 0000000..92033bd
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SelectTransformation.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.transformations;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * This transformation represents a selection of only certain upstream elements. This must
+ * follow a {@link org.apache.flink.streaming.api.transformations.SplitTransformation} that
+ * splits elements into several logical streams with assigned names.
+ *
+ * <p>
+ * This does not create a physical operation, it only affects how upstream operations are
+ * connected to downstream operations.
+ *
+ * @param <T> The type of the elements that result from this {@code SelectTransformation}
+ */
+public class SelectTransformation<T> extends StreamTransformation<T> {
+	private final StreamTransformation<T> input;
+	private List<String> selectedNames;
+
+	/**
+	 * Creates a new {@coe SelectionTransformation} from the given input that only selects
+	 * the streams with the selected names.
+	 *
+	 * @param input The input {@code StreamTransformation}
+	 * @param selectedNames The names from the upstream {@code SplitTransformation} that this
+	 *                      {@code SelectTransformation} selects.
+	 */
+	public SelectTransformation(StreamTransformation<T> input,
+			List<String> selectedNames) {
+		super("Select", input.getOutputType(), input.getParallelism());
+		this.input = input;
+		this.selectedNames = selectedNames;
+	}
+
+	/**
+	 * Returns the input {@code StreamTransformation}.
+	 */
+	public StreamTransformation<T> getInput() {
+		return input;
+	}
+
+	/**
+	 * Returns the names of the split streams that this {@code SelectTransformation} selects.
+	 */
+	public List<String> getSelectedNames() {
+		return selectedNames;
+	}
+
+	@Override
+	public Collection<StreamTransformation<?>> getTransitivePredecessors() {
+		List<StreamTransformation<?>> result = Lists.newArrayList();
+		result.add(this);
+		result.addAll(input.getTransitivePredecessors());
+		return result;
+	}
+
+	@Override
+	public final void setChainingStrategy(StreamOperator.ChainingStrategy strategy) {
+		throw new UnsupportedOperationException("Cannot set chaining strategy on Select Transformation.");
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SinkTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SinkTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SinkTransformation.java
new file mode 100644
index 0000000..2a4e2d0
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SinkTransformation.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.transformations;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamSink;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * This Transformation represents a Sink.
+ *
+ * @param <T> The type of the elements in the input {@code SinkTransformation}
+ */
+public class SinkTransformation<T> extends StreamTransformation<Object> {
+
+	private final StreamTransformation<T> input;
+
+	private final StreamSink<T> operator;
+
+	// We need this because sinks can also have state that is partitioned by key
+	private KeySelector<T, ?> stateKeySelector;
+
+	/**
+	 * Creates a new {@code SinkTransformation} from the given input {@code StreamTransformation}.
+	 *
+	 * @param input The input {@code StreamTransformation}
+	 * @param name The name of the {@code StreamTransformation}, this will be shown in Visualizations and the Log
+	 * @param operator The sink operator
+	 * @param parallelism The parallelism of this {@code SinkTransformation}
+	 */
+	public SinkTransformation(
+			StreamTransformation<T> input,
+			String name,
+			StreamSink<T> operator,
+			int parallelism) {
+		super(name, TypeExtractor.getForClass(Object.class), parallelism);
+		this.input = input;
+		this.operator = operator;
+	}
+
+	/**
+	 * Returns the input {@code StreamTransformation} of this {@code SinkTransformation}.
+	 */
+	public StreamTransformation<T> getInput() {
+		return input;
+	}
+
+	/**
+	 * Returns the {@link StreamSink} that is the operator of this {@code SinkTransformation}.
+	 */
+	public StreamSink<T> getOperator() {
+		return operator;
+	}
+
+	/**
+	 * Sets the {@link KeySelector} that must be used for partitioning keyed state of this Sink.
+	 *
+	 * @param stateKeySelector The {@code KeySelector} to set
+	 */
+	public void setStateKeySelector(KeySelector<T, ?> stateKeySelector) {
+		this.stateKeySelector = stateKeySelector;
+	}
+
+	/**
+	 * Returns the {@code KeySelector} that must be used for partitioning keyed state in this
+	 * Sink.
+	 *
+	 * @see #setStateKeySelector
+	 */
+	public KeySelector<T, ?> getStateKeySelector() {
+		return stateKeySelector;
+	}
+
+	@Override
+	public Collection<StreamTransformation<?>> getTransitivePredecessors() {
+		List<StreamTransformation<?>> result = Lists.newArrayList();
+		result.add(this);
+		result.addAll(input.getTransitivePredecessors());
+		return result;
+	}
+
+	@Override
+	public final void setChainingStrategy(StreamOperator.ChainingStrategy strategy) {
+		operator.setChainingStrategy(strategy);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java
new file mode 100644
index 0000000..c14c58c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.transformations;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamSource;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * This represents a Source. This does not actually transform anything since it has no inputs but
+ * it is the root {@code StreamTransformation} of any topology.
+ *
+ * @param <T> The type of the elements that this source produces
+ */
+public class SourceTransformation<T> extends StreamTransformation<T> {
+
+	private final StreamSource<T> operator;
+
+	/**
+	 * Creates a new {@code SourceTransformation} from the given operator.
+	 *
+	 * @param name The name of the {@code SourceTransformation}, this will be shown in Visualizations and the Log
+	 * @param operator The {@code StreamSource} that is the operator of this Transformation
+	 * @param outputType The type of the elements produced by this {@code SourceTransformation}
+	 * @param parallelism The parallelism of this {@code SourceTransformation}
+	 */
+	public SourceTransformation(
+			String name,
+			StreamSource<T> operator,
+			TypeInformation<T> outputType,
+			int parallelism) {
+		super(name, outputType, parallelism);
+		this.operator = operator;
+	}
+
+	/**
+	 * Returns the {@code StreamSource}, the operator of this {@code SourceTransformation}.
+	 */
+	public StreamSource<T> getOperator() {
+		return operator;
+	}
+
+	@Override
+	public Collection<StreamTransformation<?>> getTransitivePredecessors() {
+		return Collections.<StreamTransformation<?>>singleton(this);
+	}
+
+	@Override
+	public final void setChainingStrategy(StreamOperator.ChainingStrategy strategy) {
+		operator.setChainingStrategy(strategy);
+	}
+}