You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/07/11 15:21:03 UTC

flink git commit: [FLINK-2335] [streaming] Lazy iteration construction in StreamGraph

Repository: flink
Updated Branches:
  refs/heads/master ea4f339d7 -> 3b69b2499


[FLINK-2335] [streaming] Lazy iteration construction in StreamGraph

Closes #900


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3b69b249
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3b69b249
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3b69b249

Branch: refs/heads/master
Commit: 3b69b249991c23995dddc3b5182415f5c7df332a
Parents: ea4f339
Author: Gyula Fora <gy...@apache.org>
Authored: Fri Jul 10 20:03:22 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Sat Jul 11 14:23:59 2015 +0200

----------------------------------------------------------------------
 .../streaming/api/datastream/DataStream.java    |  48 +-
 .../api/datastream/IterativeDataStream.java     |  43 +-
 .../api/datastream/SplitDataStream.java         |   4 +-
 .../flink/streaming/api/graph/StreamConfig.java |   8 +-
 .../flink/streaming/api/graph/StreamEdge.java   |   6 +-
 .../flink/streaming/api/graph/StreamGraph.java  | 275 ++++++----
 .../flink/streaming/api/graph/StreamLoop.java   | 122 +++++
 .../api/graph/StreamingJobGraphGenerator.java   |  27 +-
 .../partitioner/RebalancePartitioner.java       |   5 +
 .../runtime/partitioner/StreamPartitioner.java  |   5 +
 .../runtime/tasks/StreamIterationHead.java      |   4 +-
 .../runtime/tasks/StreamIterationTail.java      |   4 +-
 .../apache/flink/streaming/api/IterateTest.java | 519 ++++++++++++++-----
 .../flink/streaming/api/scala/DataStream.scala  |   4 +
 14 files changed, 804 insertions(+), 270 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index c9c1f49..7896169 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -95,11 +95,11 @@ public class DataStream<OUT> {
 	protected final StreamExecutionEnvironment environment;
 	protected final Integer id;
 	protected int parallelism;
-	protected List<String> userDefinedNames;
+	protected List<String> selectedNames;
 	protected StreamPartitioner<OUT> partitioner;
 	@SuppressWarnings("rawtypes")
 	protected TypeInformation typeInfo;
-	protected List<DataStream<OUT>> unionizedStreams;
+	protected List<DataStream<OUT>> unionedStreams;
 	
 	protected Integer iterationID = null;
 	protected Long iterationWaitTime = null;
@@ -126,11 +126,11 @@ public class DataStream<OUT> {
 		this.environment = environment;
 		this.parallelism = environment.getParallelism();
 		this.streamGraph = environment.getStreamGraph();
-		this.userDefinedNames = new ArrayList<String>();
+		this.selectedNames = new ArrayList<String>();
 		this.partitioner = new RebalancePartitioner<OUT>(true);
 		this.typeInfo = typeInfo;
-		this.unionizedStreams = new ArrayList<DataStream<OUT>>();
-		this.unionizedStreams.add(this);
+		this.unionedStreams = new ArrayList<DataStream<OUT>>();
+		this.unionedStreams.add(this);
 	}
 
 	/**
@@ -143,17 +143,17 @@ public class DataStream<OUT> {
 		this.environment = dataStream.environment;
 		this.id = dataStream.id;
 		this.parallelism = dataStream.parallelism;
-		this.userDefinedNames = new ArrayList<String>(dataStream.userDefinedNames);
+		this.selectedNames = new ArrayList<String>(dataStream.selectedNames);
 		this.partitioner = dataStream.partitioner.copy();
 		this.streamGraph = dataStream.streamGraph;
 		this.typeInfo = dataStream.typeInfo;
 		this.iterationID = dataStream.iterationID;
 		this.iterationWaitTime = dataStream.iterationWaitTime;
-		this.unionizedStreams = new ArrayList<DataStream<OUT>>();
-		this.unionizedStreams.add(this);
-		if (dataStream.unionizedStreams.size() > 1) {
-			for (int i = 1; i < dataStream.unionizedStreams.size(); i++) {
-				this.unionizedStreams.add(new DataStream<OUT>(dataStream.unionizedStreams.get(i)));
+		this.unionedStreams = new ArrayList<DataStream<OUT>>();
+		this.unionedStreams.add(this);
+		if (dataStream.unionedStreams.size() > 1) {
+			for (int i = 1; i < dataStream.unionedStreams.size(); i++) {
+				this.unionedStreams.add(new DataStream<OUT>(dataStream.unionedStreams.get(i)));
 			}
 		}
 
@@ -176,6 +176,14 @@ public class DataStream<OUT> {
 	public int getParallelism() {
 		return this.parallelism;
 	}
+	
+	public StreamPartitioner<OUT> getPartitioner() {
+		return this.partitioner;
+	}
+	
+	public List<String> getSelectedNames(){
+		return selectedNames;
+	}
 
 	/**
 	 * Gets the type of the stream.
@@ -248,9 +256,9 @@ public class DataStream<OUT> {
 		DataStream<OUT> returnStream = this.copy();
 
 		for (DataStream<OUT> stream : streams) {
-			for (DataStream<OUT> ds : stream.unionizedStreams) {
+			for (DataStream<OUT> ds : stream.unionedStreams) {
 				validateUnion(ds.getId());
-				returnStream.unionizedStreams.add(ds.copy());
+				returnStream.unionedStreams.add(ds.copy());
 			}
 		}
 		return returnStream;
@@ -268,7 +276,7 @@ public class DataStream<OUT> {
 	 * @return The {@link SplitDataStream}
 	 */
 	public SplitDataStream<OUT> split(OutputSelector<OUT> outputSelector) {
-		for (DataStream<OUT> ds : this.unionizedStreams) {
+		for (DataStream<OUT> ds : this.unionedStreams) {
 			streamGraph.addOutputSelector(ds.getId(), clean(outputSelector));
 		}
 
@@ -1103,9 +1111,7 @@ public class DataStream<OUT> {
 	}
 	
 	protected <X> void addIterationSource(DataStream<X> dataStream, TypeInformation<?> feedbackType) {
-		Integer id = ++counter;
-		streamGraph.addIterationHead(id, dataStream.getId(), iterationID, iterationWaitTime, feedbackType);
-		streamGraph.setParallelism(id, dataStream.getParallelism());
+		streamGraph.addIterationHead(dataStream.getId(), iterationID, iterationWaitTime, feedbackType);
 	}
 
 	/**
@@ -1118,7 +1124,7 @@ public class DataStream<OUT> {
 	protected DataStream<OUT> setConnectionType(StreamPartitioner<OUT> partitioner) {
 		DataStream<OUT> returnStream = this.copy();
 
-		for (DataStream<OUT> stream : returnStream.unionizedStreams) {
+		for (DataStream<OUT> stream : returnStream.unionedStreams) {
 			stream.partitioner = partitioner;
 		}
 
@@ -1139,9 +1145,9 @@ public class DataStream<OUT> {
 	 *            Number of the type (used at co-functions)
 	 */
 	protected <X> void connectGraph(DataStream<X> inputStream, Integer outputID, int typeNumber) {
-		for (DataStream<X> stream : inputStream.unionizedStreams) {
+		for (DataStream<X> stream : inputStream.unionedStreams) {
 			streamGraph.addEdge(stream.getId(), outputID, stream.partitioner, typeNumber,
-					inputStream.userDefinedNames);
+					inputStream.selectedNames);
 		}
 
 	}
@@ -1170,7 +1176,7 @@ public class DataStream<OUT> {
 	}
 
 	private void validateUnion(Integer id) {
-		for (DataStream<OUT> ds : this.unionizedStreams) {
+		for (DataStream<OUT> ds : this.unionedStreams) {
 			if (ds.getId().equals(id)) {
 				throw new RuntimeException("A DataStream cannot be merged with itself");
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
index da3d885..4de368c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.streaming.api.datastream;
 
+import java.util.List;
+
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -32,6 +34,8 @@ import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
  */
 public class IterativeDataStream<IN> extends
 		SingleOutputStreamOperator<IN, IterativeDataStream<IN>> {
+	
+	protected boolean closed = false;
 
 	static Integer iterationCount = 0;
 	
@@ -60,20 +64,18 @@ public class IterativeDataStream<IN> extends
 	 * @return The feedback stream.
 	 * 
 	 */
+	@SuppressWarnings({ "unchecked", "rawtypes" })
 	public DataStream<IN> closeWith(DataStream<IN> iterationTail, boolean keepPartitioning) {
-		DataStream<IN> iterationSink = new DataStreamSink<IN>(environment, "Iteration Sink", null,
-				null);
-
-		// We add an iteration sink to the tail which will send tuples to the
-		// iteration head
-		streamGraph.addIterationTail(iterationSink.getId(), iterationTail.getId(), iterationID,
-				iterationWaitTime);
-
-		if (keepPartitioning) {
-			connectGraph(iterationTail, iterationSink.getId(), 0);
-		} else {
-			connectGraph(iterationTail.forward(), iterationSink.getId(), 0);
+		
+		if (closed) {
+			throw new IllegalStateException(
+					"An iterative data stream can only be closed once. Use union to close with multiple stream.");
 		}
+		closed = true;
+		
+		streamGraph.addIterationTail((List) iterationTail.unionedStreams, iterationID,
+				keepPartitioning);
+
 		return iterationTail;
 	}
 	
@@ -138,7 +140,8 @@ public class IterativeDataStream<IN> extends
 	 * @return A {@link ConnectedIterativeDataStream}.
 	 */
 	public <F> ConnectedIterativeDataStream<IN, F> withFeedbackType(TypeInformation<F> feedbackType) {
-		return new ConnectedIterativeDataStream<IN, F>(this, feedbackType);
+		return new ConnectedIterativeDataStream<IN, F>(new IterativeDataStream<IN>(this,
+				iterationWaitTime), feedbackType);
 	}
 	
 	/**
@@ -201,14 +204,16 @@ public class IterativeDataStream<IN> extends
 		 * @return The feedback stream.
 		 * 
 		 */
+		@SuppressWarnings({ "rawtypes", "unchecked" })
 		public DataStream<F> closeWith(DataStream<F> feedbackStream) {
-			DataStream<F> iterationSink = new DataStreamSink<F>(input.environment, "Iteration Sink",
-					null, null);
+			if (input.closed) {
+				throw new IllegalStateException(
+						"An iterative data stream can only be closed once. Use union to close with multiple stream.");
+			}
+			input.closed = true;
 			
-			input.streamGraph.addIterationTail(iterationSink.getId(), feedbackStream.getId(), input.iterationID,
-					input.iterationWaitTime);
-
-			input.connectGraph(feedbackStream, iterationSink.getId(), 0);
+			input.streamGraph.addIterationTail((List) feedbackStream.unionedStreams,
+					input.iterationID, true);
 			return feedbackStream;
 		}
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
index 36a94c7..6b95fe7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
@@ -57,8 +57,8 @@ public class SplitDataStream<OUT> extends DataStream<OUT> {
 
 		DataStream<OUT> returnStream = copy();
 
-		for (DataStream<OUT> ds : returnStream.unionizedStreams) {
-			ds.userDefinedNames = Arrays.asList(outputNames);
+		for (DataStream<OUT> ds : returnStream.unionedStreams) {
+			ds.selectedNames = Arrays.asList(outputNames);
 		}
 		return returnStream;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 0784582..6a44104 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -206,12 +206,12 @@ public class StreamConfig implements Serializable {
 		}
 	}
 
-	public void setIterationId(Integer iterationId) {
-		config.setInteger(ITERATION_ID, iterationId);
+	public void setIterationId(String iterationId) {
+		config.setString(ITERATION_ID, iterationId);
 	}
 
-	public Integer getIterationId() {
-		return config.getInteger(ITERATION_ID, 0);
+	public String getIterationId() {
+		return config.getString(ITERATION_ID, "");
 	}
 
 	public void setIterationWaitTime(long time) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
index 293f5e0..47d97df 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
@@ -46,7 +46,7 @@ public class StreamEdge implements Serializable {
 	 * output selection).
 	 */
 	final private List<String> selectedNames;
-	final private StreamPartitioner<?> outputPartitioner;
+	private StreamPartitioner<?> outputPartitioner;
 
 	public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int typeNumber,
 			List<String> selectedNames, StreamPartitioner<?> outputPartitioner) {
@@ -87,6 +87,10 @@ public class StreamEdge implements Serializable {
 	public StreamPartitioner<?> getPartitioner() {
 		return outputPartitioner;
 	}
+	
+	public void setPartitioner(StreamPartitioner<?> partitioner) {
+		this.outputPartitioner = partitioner;
+	}
 
 	@Override
 	public int hashCode() {

http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index cae24be..64c349e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -28,6 +28,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.flink.api.common.ExecutionConfig;
@@ -41,6 +42,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.state.StateHandleProvider;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamSource;
@@ -79,6 +81,7 @@ public class StreamGraph extends StreamingPlan {
 
 	private Map<Integer, StreamLoop> streamLoops;
 	protected Map<Integer, StreamLoop> vertexIDtoLoop;
+	protected Map<Integer, String> vertexIDtoBrokerID;
 	private StateHandleProvider<?> stateHandleProvider;
 	private boolean forceCheckpoint = false;
 
@@ -97,7 +100,8 @@ public class StreamGraph extends StreamingPlan {
 	public void clear() {
 		streamNodes = new HashMap<Integer, StreamNode>();
 		streamLoops = new HashMap<Integer, StreamLoop>();
-		vertexIDtoLoop = new HashMap<Integer, StreamGraph.StreamLoop>();
+		vertexIDtoLoop = new HashMap<Integer, StreamLoop>();
+		vertexIDtoBrokerID = new HashMap<Integer, String>();
 		sources = new HashSet<Integer>();
 	}
 
@@ -120,9 +124,9 @@ public class StreamGraph extends StreamingPlan {
 	public void setCheckpointingInterval(long checkpointingInterval) {
 		this.checkpointingInterval = checkpointingInterval;
 	}
-	
+
 	public void forceCheckpoint() {
-		this.forceCheckpoint = true;	
+		this.forceCheckpoint = true;
 	}
 
 	public void setStateHandleProvider(StateHandleProvider<?> provider) {
@@ -179,8 +183,9 @@ public class StreamGraph extends StreamingPlan {
 	}
 
 	public <IN1, IN2, OUT> void addCoOperator(Integer vertexID,
-			TwoInputStreamOperator<IN1, IN2, OUT> taskoperatorObject, TypeInformation<IN1> in1TypeInfo,
-			TypeInformation<IN2> in2TypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) {
+			TwoInputStreamOperator<IN1, IN2, OUT> taskoperatorObject,
+			TypeInformation<IN1> in1TypeInfo, TypeInformation<IN2> in2TypeInfo,
+			TypeInformation<OUT> outTypeInfo, String operatorName) {
 
 		addNode(vertexID, TwoInputStreamTask.class, taskoperatorObject, operatorName);
 
@@ -196,59 +201,192 @@ public class StreamGraph extends StreamingPlan {
 		}
 	}
 
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	public void addIterationHead(Integer sourceID, Integer iterationHead, Integer iterationID,
-			long timeOut, TypeInformation<?> feedbackType) {
+	public void addIterationHead(Integer iterationHead, Integer iterationID, long timeOut,
+			TypeInformation<?> feedbackType) {
+		// If there is no loop object created for this iteration create one
+		StreamLoop loop = streamLoops.get(iterationID);
+		if (loop == null) {
+			loop = new StreamLoop(iterationID, timeOut, feedbackType);
+			streamLoops.put(iterationID, loop);
+		}
 
-		StreamNode itSource = addNode(sourceID, StreamIterationHead.class, null, null);
+		loop.addHeadOperator(getStreamNode(iterationHead));
+	}
 
-		StreamLoop iteration = new StreamLoop(iterationID, getStreamNode(sourceID), timeOut);
-		streamLoops.put(iterationID, iteration);
-		vertexIDtoLoop.put(sourceID, iteration);
+	public void addIterationTail(List<DataStream<?>> feedbackStreams, Integer iterationID,
+			boolean keepPartitioning) {
 
-		itSource.setOperatorName("IterationSource-" + sourceID);
-		itSource.setParallelism(getStreamNode(iterationHead).getParallelism());
-		
-		if(feedbackType == null){
-			setSerializersFrom(iterationHead, sourceID);
-			addEdge(sourceID, iterationHead, new RebalancePartitioner(true), 0, new ArrayList<String>());
-		}else{
-			itSource.setSerializerOut(new StreamRecordSerializer(feedbackType, executionConfig));
-			addEdge(sourceID, iterationHead, new RebalancePartitioner(true), 2, new ArrayList<String>());
+		if (!streamLoops.containsKey(iterationID)) {
+			throw new RuntimeException("Cannot close iteration without head operator.");
 		}
-		
 
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("ITERATION SOURCE: {}", sourceID);
+		StreamLoop loop = streamLoops.get(iterationID);
+
+		for (DataStream<?> stream : feedbackStreams) {
+			loop.addTailOperator(getStreamNode(stream.getId()), stream.getPartitioner(),
+					stream.getSelectedNames());
 		}
 
-		sources.add(sourceID);
+		if (keepPartitioning) {
+			loop.applyTailPartitioning();
+		}
 	}
 
-	public void addIterationTail(Integer sinkID, Integer iterationTail, Integer iterationID,
-			long waitTime) {
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	public void finalizeLoops() {
+		
+		// We create each loop separately, the order does not matter as sinks
+		// and sources don't interact
+		for (StreamLoop loop : streamLoops.values()) {
+
+			// We make sure not to re-create the loops if the method is called
+			// multiple times
+			if (loop.getSourceSinkPairs().isEmpty()) {
+
+				List<StreamNode> headOps = loop.getHeads();
+				List<StreamNode> tailOps = loop.getTails();
+
+				// This means that the iteration was not closed. It should not
+				// be
+				// allowed.
+				if (tailOps.isEmpty()) {
+					throw new RuntimeException("Cannot execute job with empty iterations.");
+				}
+
+				// Check whether we keep the feedback partitioning
+				if (loop.keepsPartitioning()) {
+					// This is the complicated case as we need to enforce
+					// partitioning on the tail -> sink side, which
+					// requires strict forward connections at source -> head
+
+					// We need one source/sink pair per different head
+					// parallelism
+					// as we depend on strict forwards connections
+					Map<Integer, List<StreamNode>> parallelismToHeads = new HashMap<Integer, List<StreamNode>>();
+
+					// Group head operators by parallelism
+					for (StreamNode head : headOps) {
+						int p = head.getParallelism();
+						if (!parallelismToHeads.containsKey(p)) {
+							parallelismToHeads.put(p, new ArrayList<StreamNode>());
+						}
+						parallelismToHeads.get(p).add(head);
+					}
+
+					// We create the sink/source pair for each parallelism
+					// group,
+					// tails will forward to all sinks but each head operator
+					// will
+					// only receive from one source (corresponding to its
+					// parallelism)
+					int c = 0;
+					for (Entry<Integer, List<StreamNode>> headGroup : parallelismToHeads.entrySet()) {
+						List<StreamNode> headOpsInGroup = headGroup.getValue();
+
+						Tuple2<StreamNode, StreamNode> sourceSinkPair = createItSourceAndSink(loop,
+								c);
+						StreamNode source = sourceSinkPair.f0;
+						StreamNode sink = sourceSinkPair.f1;
+
+						// We connect the source to the heads in this group
+						// (forward), setting
+						// type to 2 in case we have a coIteration (this sets
+						// the
+						// input as the second input of the co-operator)
+						for (StreamNode head : headOpsInGroup) {
+							int inputType = loop.isCoIteration() ? 2 : 0;
+							addEdge(source.getId(), head.getId(), new RebalancePartitioner(true),
+									inputType, new ArrayList<String>());
+						}
+
+						// We connect all the tails to the sink keeping the
+						// partitioner
+						for (int i = 0; i < tailOps.size(); i++) {
+							StreamNode tail = tailOps.get(i);
+							StreamPartitioner<?> partitioner = loop.getTailPartitioners().get(i);
+							addEdge(tail.getId(), sink.getId(), partitioner.copy(), 0, loop
+									.getTailSelectedNames().get(i));
+						}
+
+						// We set the sink/source parallelism to the group
+						// parallelism
+						source.setParallelism(headGroup.getKey());
+						sink.setParallelism(source.getParallelism());
+
+						// We set the proper serializers for the sink/source
+						setSerializersFrom(tailOps.get(0).getId(), sink.getId());
+						if (loop.isCoIteration()) {
+							source.setSerializerOut(new StreamRecordSerializer(loop
+									.getFeedbackType(), executionConfig));
+						} else {
+							setSerializersFrom(headOpsInGroup.get(0).getId(), source.getId());
+						}
+
+						c++;
+					}
+
+				} else {
+					// This is the most simple case, we add one iteration
+					// sink/source pair with the parallelism of the first tail
+					// operator. Tail operators will forward the records and
+					// partitioning will be enforced from source -> head
+
+					Tuple2<StreamNode, StreamNode> sourceSinkPair = createItSourceAndSink(loop, 0);
+					StreamNode source = sourceSinkPair.f0;
+					StreamNode sink = sourceSinkPair.f1;
+
+					// We get the feedback partitioner from the first input of
+					// the
+					// first head.
+					StreamPartitioner<?> partitioner = headOps.get(0).getInEdges().get(0)
+							.getPartitioner();
+
+					// Connect the sources to heads using this partitioner
+					for (StreamNode head : headOps) {
+						addEdge(source.getId(), head.getId(), partitioner.copy(), 0,
+								new ArrayList<String>());
+					}
+
+					// The tails are connected to the sink with forward
+					// partitioning
+					for (int i = 0; i < tailOps.size(); i++) {
+						StreamNode tail = tailOps.get(i);
+						addEdge(tail.getId(), sink.getId(), new RebalancePartitioner(true), 0, loop
+								.getTailSelectedNames().get(i));
+					}
+
+					// We set the parallelism to match the first tail op to make
+					// the
+					// forward more efficient
+					sink.setParallelism(tailOps.get(0).getParallelism());
+					source.setParallelism(sink.getParallelism());
+
+					// We set the proper serializers
+					setSerializersFrom(headOps.get(0).getId(), source.getId());
+					setSerializersFrom(tailOps.get(0).getId(), sink.getId());
+				}
 
-		if (getStreamNode(iterationTail).getBufferTimeout() == 0) {
-			throw new RuntimeException("Buffer timeout 0 at iteration tail is not supported.");
-		}
+			}
 
-		StreamNode itSink = addNode(sinkID, StreamIterationTail.class, null, null);
+		}
 
-		StreamLoop iteration = streamLoops.get(iterationID);
-		iteration.setSink(getStreamNode(sinkID));
-		vertexIDtoLoop.put(sinkID, iteration);
-		
-		itSink.setParallelism(iteration.getSource().getParallelism());
+	}
 
-		setSerializersFrom(iterationTail, sinkID);
-		getStreamNode(sinkID).setOperatorName("IterationSink-" + sinkID);
+	private Tuple2<StreamNode, StreamNode> createItSourceAndSink(StreamLoop loop, int c) {
+		StreamNode source = addNode(-1 * streamNodes.size(), StreamIterationHead.class, null, null);
+		sources.add(source.getId());
 
-		setBufferTimeout(iteration.getSource().getId(), getStreamNode(iterationTail).getBufferTimeout());
+		StreamNode sink = addNode(-1 * streamNodes.size(), StreamIterationTail.class, null, null);
 
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("ITERATION SINK: {}", sinkID);
-		}
+		source.setOperatorName("IterationSource-" + loop.getID() + "_" + c);
+		sink.setOperatorName("IterationSink-" + loop.getID() + "_" + c);
+		vertexIDtoBrokerID.put(source.getId(), loop.getID() + "_" + c);
+		vertexIDtoBrokerID.put(sink.getId(), loop.getID() + "_" + c);
+		vertexIDtoLoop.put(source.getId(), loop);
+		vertexIDtoLoop.put(sink.getId(), loop);
+		loop.addSourceSinkPair(source, sink);
 
+		return new Tuple2<StreamNode, StreamNode>(source, sink);
 	}
 
 	protected StreamNode addNode(Integer vertexID, Class<? extends AbstractInvokable> vertexClass,
@@ -284,7 +422,7 @@ public class StreamGraph extends StreamingPlan {
 		getStreamNode(vertexID).setParallelism(parallelism);
 	}
 
-	public void setKey(Integer vertexID, KeySelector<?,?> key) {
+	public void setKey(Integer vertexID, KeySelector<?, ?> key) {
 		getStreamNode(vertexID).setStatePartitioner(key);
 	}
 
@@ -382,6 +520,10 @@ public class StreamGraph extends StreamingPlan {
 		return vertexIDtoLoop.get(vertexID).getID();
 	}
 
+	public String getBrokerID(Integer vertexID) {
+		return vertexIDtoBrokerID.get(vertexID);
+	}
+
 	public long getLoopTimeout(Integer vertexID) {
 		return vertexIDtoLoop.get(vertexID).getTimeout();
 	}
@@ -421,13 +563,13 @@ public class StreamGraph extends StreamingPlan {
 	 *            name of the jobGraph
 	 */
 	public JobGraph getJobGraph(String jobGraphName) {
-
+		finalizeLoops();
 		// temporarily forbid checkpointing for iterative jobs
 		if (isIterative() && isCheckpointingEnabled() && !forceCheckpoint) {
 			throw new UnsupportedOperationException(
 					"Checkpointing is currently not supported by default for iterative jobs, as we cannot guarantee exactly once semantics. "
-					+ "State checkpoints happen normally, but records in-transit during the snapshot will be lost upon failure. "
-					+ "\nThe user can force enable state checkpoints with the reduced guarantees by calling: env.enableCheckpointing(interval,true)");
+							+ "State checkpoints happen normally, but records in-transit during the snapshot will be lost upon failure. "
+							+ "\nThe user can force enable state checkpoints with the reduced guarantees by calling: env.enableCheckpointing(interval,true)");
 		}
 
 		setJobName(jobGraphName);
@@ -474,45 +616,4 @@ public class StreamGraph extends StreamingPlan {
 		DEFAULT, ISOLATE, NEWGROUP
 	}
 
-	/**
-	 * Object for representing loops in streaming programs.
-	 * 
-	 */
-	public static class StreamLoop {
-
-		private Integer loopID;
-
-		private StreamNode source;
-		private StreamNode sink;
-		
-		private Long timeout;
-
-		public StreamLoop(Integer loopID, StreamNode source, Long timeout) {
-			this.loopID = loopID;
-			this.source = source;
-			this.timeout = timeout;
-		}
-
-		public Integer getID() {
-			return loopID;
-		}
-
-		public Long getTimeout() {
-			return timeout;
-		}
-
-		public void setSink(StreamNode sink) {
-			this.sink = sink;
-		}
-
-		public StreamNode getSource() {
-			return source;
-		}
-
-		public StreamNode getSink() {
-			return sink;
-		}
-
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamLoop.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamLoop.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamLoop.java
new file mode 100644
index 0000000..ba987ef
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamLoop.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+
+/**
+ * Object for representing loops in streaming programs.
+ * 
+ */
+public class StreamLoop {
+
+	private Integer loopID;
+
+	private List<StreamNode> headOperators = new ArrayList<StreamNode>();
+	private List<StreamNode> tailOperators = new ArrayList<StreamNode>();
+	private List<StreamPartitioner<?>> tailPartitioners = new ArrayList<StreamPartitioner<?>>();
+	private List<List<String>> tailSelectedNames = new ArrayList<List<String>>();
+
+	private boolean coIteration = false;
+	private TypeInformation<?> feedbackType = null;
+
+	private long timeout;
+	private boolean tailPartitioning = false;
+
+	private List<Tuple2<StreamNode, StreamNode>> sourcesAndSinks = new ArrayList<Tuple2<StreamNode, StreamNode>>();
+
+	public StreamLoop(Integer loopID, long timeout, TypeInformation<?> feedbackType) {
+		this.loopID = loopID;
+		this.timeout = timeout;
+		if (feedbackType != null) {
+			this.feedbackType = feedbackType;
+			coIteration = true;
+			tailPartitioning = true;
+		}
+	}
+
+	public Integer getID() {
+		return loopID;
+	}
+
+	public long getTimeout() {
+		return timeout;
+	}
+
+	public boolean isCoIteration() {
+		return coIteration;
+	}
+
+	public TypeInformation<?> getFeedbackType() {
+		return feedbackType;
+	}
+
+	public void addSourceSinkPair(StreamNode source, StreamNode sink) {
+		this.sourcesAndSinks.add(new Tuple2<StreamNode, StreamNode>(source, sink));
+	}
+
+	public List<Tuple2<StreamNode, StreamNode>> getSourceSinkPairs() {
+		return this.sourcesAndSinks;
+	}
+
+	public void addHeadOperator(StreamNode head) {
+		this.headOperators.add(head);
+	}
+
+	public void addTailOperator(StreamNode tail, StreamPartitioner<?> partitioner,
+			List<String> selectedNames) {
+		this.tailOperators.add(tail);
+		this.tailPartitioners.add(partitioner);
+		this.tailSelectedNames.add(selectedNames);
+	}
+
+	public void applyTailPartitioning() {
+		this.tailPartitioning = true;
+	}
+
+	public boolean keepsPartitioning() {
+		return tailPartitioning;
+	}
+
+	public List<StreamNode> getHeads() {
+		return headOperators;
+	}
+
+	public List<StreamNode> getTails() {
+		return tailOperators;
+	}
+
+	public List<StreamPartitioner<?>> getTailPartitioners() {
+		return tailPartitioners;
+	}
+
+	public List<List<String>> getTailSelectedNames() {
+		return tailSelectedNames;
+	}
+
+	@Override
+	public String toString() {
+		return "ID: " + loopID + "\n" + "Head: " + headOperators + "\n" + "Tail: " + tailOperators
+				+ "\n" + "TP: " + tailPartitioners + "\n" + "TSN: " + tailSelectedNames;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index eb34e3f..4d541bc 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -30,17 +30,17 @@ import java.util.Map.Entry;
 import org.apache.commons.lang.StringUtils;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.streaming.api.graph.StreamGraph.StreamLoop;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator.ChainingStrategy;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
@@ -250,6 +250,7 @@ public class StreamingJobGraphGenerator {
 		return retConfig;
 	}
 
+	@SuppressWarnings("unchecked")
 	private void setVertexConfig(Integer vertexID, StreamConfig config,
 			List<StreamEdge> chainableOutputs, List<StreamEdge> nonChainableOutputs) {
 
@@ -276,7 +277,7 @@ public class StreamingJobGraphGenerator {
 
 		if (vertexClass.equals(StreamIterationHead.class)
 				|| vertexClass.equals(StreamIterationTail.class)) {
-			config.setIterationId(streamGraph.getLoopID(vertexID));
+			config.setIterationId(streamGraph.getBrokerID(vertexID));
 			config.setIterationWaitTime(streamGraph.getLoopTimeout(vertexID));
 		}
 
@@ -360,13 +361,19 @@ public class StreamingJobGraphGenerator {
 		}
 
 		for (StreamLoop loop : streamGraph.getStreamLoops()) {
-			CoLocationGroup ccg = new CoLocationGroup();
-			JobVertex tail = jobVertices.get(loop.getSink().getId());
-			JobVertex head = jobVertices.get(loop.getSource().getId());
-			ccg.addVertex(head);
-			ccg.addVertex(tail);
-			tail.updateCoLocationGroup(ccg);
-			head.updateCoLocationGroup(ccg);
+			for (Tuple2<StreamNode, StreamNode> pair : loop.getSourceSinkPairs()) {
+				
+				CoLocationGroup ccg = new CoLocationGroup();
+				
+				JobVertex source = jobVertices.get(pair.f0.getId());
+				JobVertex sink = jobVertices.get(pair.f1.getId());
+				
+				ccg.addVertex(source);
+				ccg.addVertex(sink);
+				source.updateCoLocationGroup(ccg);
+				sink.updateCoLocationGroup(ccg);
+			}
+
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
index 70d9c6b..e6ad821 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
@@ -49,4 +49,9 @@ public class RebalancePartitioner<T> extends StreamPartitioner<T> {
 	public StreamPartitioner<T> copy() {
 		return new RebalancePartitioner<T>(forward);
 	}
+	
+	@Override
+	public String toString() {
+		return forward ? "ForwardPartitioner" : "RebalancePartitioner";
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
index ef598c6..b37655b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
@@ -45,4 +45,9 @@ public abstract class StreamPartitioner<T> implements
 	public StreamPartitioner<T> copy() {
 		return this;
 	}
+	
+	@Override
+	public String toString() {
+		return this.getClass().getSimpleName();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
index 4952cdf..25fe83d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
@@ -48,12 +48,12 @@ public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> {
 		super.registerInputOutput();
 		outputHandler = new OutputHandler<OUT>(this);
 
-		Integer iterationId = configuration.getIterationId();
+		String iterationId = configuration.getIterationId();
 		iterationWaitTime = configuration.getIterationWaitTime();
 		shouldWait = iterationWaitTime > 0;
 
 		try {
-			BlockingQueueBroker.instance().handIn(iterationId.toString()+"-" 
+			BlockingQueueBroker.instance().handIn(iterationId+"-" 
 					+getEnvironment().getIndexInSubtaskGroup(), dataChannel);
 		} catch (Exception e) {
 			throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
index 5bbae06..b6e3889 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
@@ -30,7 +30,7 @@ public class StreamIterationTail<IN> extends OneInputStreamTask<IN, IN> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(StreamIterationTail.class);
 
-	private Integer iterationId;
+	private String iterationId;
 
 	@SuppressWarnings("rawtypes")
 	private BlockingQueue<StreamRecord> dataChannel;
@@ -47,7 +47,7 @@ public class StreamIterationTail<IN> extends OneInputStreamTask<IN, IN> {
 			iterationId = configuration.getIterationId();
 			iterationWaitTime = configuration.getIterationWaitTime();
 			shouldWait = iterationWaitTime > 0;
-			dataChannel = BlockingQueueBroker.instance().get(iterationId.toString()+"-"
+			dataChannel = BlockingQueueBroker.instance().get(iterationId+"-"
 					+getEnvironment().getIndexInSubtaskGroup());
 		} catch (Exception e) {
 			throw new StreamTaskException(String.format(

http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index 3021abb..2a88a32 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -18,175 +18,318 @@
 package org.apache.flink.streaming.api;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.List;
 
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.IterativeDataStream;
 import org.apache.flink.streaming.api.datastream.IterativeDataStream.ConnectedIterativeDataStream;
+import org.apache.flink.streaming.api.datastream.SplitDataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
+import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.graph.StreamGraph.StreamLoop;
+import org.apache.flink.streaming.api.graph.StreamLoop;
+import org.apache.flink.streaming.api.graph.StreamNode;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
-public class IterateTest {
+@SuppressWarnings({ "unchecked", "unused", "serial" })
+public class IterateTest extends StreamingMultipleProgramsTestBase {
 
 	private static final long MEMORYSIZE = 32;
 	private static boolean iterated[];
 	private static int PARALLELISM = 2;
 
-	public static final class IterationHead extends RichFlatMapFunction<Boolean, Boolean> {
-
-		private static final long serialVersionUID = 1L;
+	@Test
+	public void testException() throws Exception {
 
-		@Override
-		public void flatMap(Boolean value, Collector<Boolean> out) throws Exception {
-			int indx = getRuntimeContext().getIndexOfThisSubtask();
-			if (value) {
-				iterated[indx] = true;
-			} else {
-				out.collect(value);
-			}
+		StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
+		DataStream<Integer> source = env.fromElements(1, 10);
+		IterativeDataStream<Integer> iter1 = source.iterate();
+		IterativeDataStream<Integer> iter2 = source.iterate();
 
+		iter1.closeWith(iter1.map(NoOpIntMap));
+		// Check for double closing
+		try {
+			iter1.closeWith(iter1.map(NoOpIntMap));
+			fail();
+		} catch (Exception e) {
 		}
 
-	}
+		// Check for closing iteration without head
+		try {
+			iter2.closeWith(iter1.map(NoOpIntMap));
+			fail();
+		} catch (Exception e) {
+		}
 
-	public static final class IterationTail extends RichFlatMapFunction<Boolean, Boolean> {
+		iter2.map(NoOpIntMap);
 
-		private static final long serialVersionUID = 1L;
+		// Check for executing with empty iteration
+		try {
+			env.execute();
+			fail();
+		} catch (Exception e) {
+		}
+	}
 
-		@Override
-		public void flatMap(Boolean value, Collector<Boolean> out) throws Exception {
-			out.collect(true);
+	@Test
+	public void testImmutabilityWithCoiteration() {
+		StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
+		DataStream<Integer> source = env.fromElements(1, 10);
 
-		}
+		IterativeDataStream<Integer> iter1 = source.iterate();
+		// Calling withFeedbackType should create a new iteration
+		ConnectedIterativeDataStream<Integer, String> iter2 = iter1.withFeedbackType(String.class);
 
-	}
+		iter1.closeWith(iter1.map(NoOpIntMap));
+		iter2.closeWith(iter2.map(NoOpCoMap));
 
-	public static final class MySink implements SinkFunction<Boolean> {
+		StreamGraph graph = env.getStreamGraph();
 
-		private static final long serialVersionUID = 1L;
+		graph.getJobGraph();
 
-		@Override
-		public void invoke(Boolean tuple) {
+		assertEquals(2, graph.getStreamLoops().size());
+		for (StreamLoop loop : graph.getStreamLoops()) {
+			assertEquals(loop.getHeads(), loop.getTails());
+			List<Tuple2<StreamNode, StreamNode>> sourceSinkPairs = loop.getSourceSinkPairs();
+			assertEquals(1, sourceSinkPairs.size());
 		}
 	}
 
-	public static final class NoOpMap implements MapFunction<Boolean, Boolean> {
+	@Test
+	public void testmultipleHeadsTailsSimple() {
+		StreamExecutionEnvironment env = new TestStreamEnvironment(4, MEMORYSIZE);
+		DataStream<Integer> source1 = env.fromElements(1, 2, 3, 4, 5).shuffle();
+		DataStream<Integer> source2 = env.fromElements(1, 2, 3, 4, 5);
 
-		private static final long serialVersionUID = 1L;
+		IterativeDataStream<Integer> iter1 = source1.union(source2).iterate();
 
-		@Override
-		public Boolean map(Boolean value) throws Exception {
-			return value;
-		}
+		DataStream<Integer> head1 = iter1.map(NoOpIntMap);
+		DataStream<Integer> head2 = iter1.map(NoOpIntMap).setParallelism(2);
+		DataStream<Integer> head3 = iter1.map(NoOpIntMap).setParallelism(2)
+				.addSink(new NoOpSink<Integer>());
+		DataStream<Integer> head4 = iter1.map(NoOpIntMap).addSink(new NoOpSink<Integer>());
 
-	}
+		SplitDataStream<Integer> source3 = env.fromElements(1, 2, 3, 4, 5).split(
+				new OutputSelector<Integer>() {
 
-	public StreamExecutionEnvironment constructIterativeJob(StreamExecutionEnvironment env) {
-		env.setBufferTimeout(10);
+					@Override
+					public Iterable<String> select(Integer value) {
+						return value % 2 == 0 ? Arrays.asList("even") : Arrays.asList("odd");
+					}
+				});
 
-		DataStream<Boolean> source = env.fromCollection(Collections.nCopies(PARALLELISM, false));
+		iter1.closeWith(source3.select("even").union(
+				head1.map(NoOpIntMap).broadcast().setParallelism(1), head2.shuffle()));
 
-		IterativeDataStream<Boolean> iteration = source.iterate(3000);
+		StreamGraph graph = env.getStreamGraph();
 
-		DataStream<Boolean> increment = iteration.flatMap(new IterationHead()).flatMap(
-				new IterationTail());
+		JobGraph jg = graph.getJobGraph();
 
-		iteration.closeWith(increment).addSink(new MySink());
-		return env;
-	}
+		assertEquals(1, graph.getStreamLoops().size());
+		StreamLoop loop = new ArrayList<StreamLoop>(graph.getStreamLoops()).get(0);
 
-	@Test
-	public void testColocation() throws Exception {
-		StreamExecutionEnvironment env = new TestStreamEnvironment(4, MEMORYSIZE);
+		assertEquals(4, loop.getHeads().size());
+		assertEquals(3, loop.getTails().size());
 
-		IterativeDataStream<Boolean> it = env.fromElements(true).rebalance().map(new NoOpMap())
-				.iterate();
+		assertEquals(1, loop.getSourceSinkPairs().size());
+		Tuple2<StreamNode, StreamNode> pair = loop.getSourceSinkPairs().get(0);
 
-		DataStream<Boolean> head = it.map(new NoOpMap()).setParallelism(2).name("HeadOperator");
+		assertEquals(pair.f0.getParallelism(), pair.f1.getParallelism());
+		assertEquals(4, pair.f0.getOutEdges().size());
+		assertEquals(3, pair.f1.getInEdges().size());
 
-		it.closeWith(head.map(new NoOpMap()).setParallelism(3).name("TailOperator")).print();
+		for (StreamEdge edge : pair.f0.getOutEdges()) {
+			assertTrue(edge.getPartitioner() instanceof ShufflePartitioner);
+		}
+		for (StreamEdge edge : pair.f1.getInEdges()) {
+			assertTrue(edge.getPartitioner() instanceof RebalancePartitioner);
+		}
 
-		JobGraph graph = env.getStreamGraph().getJobGraph();
+		assertTrue(loop.getTailSelectedNames().contains(Arrays.asList("even")));
 
-		JobVertex itSource = null;
-		JobVertex itSink = null;
-		JobVertex headOp = null;
-		JobVertex tailOp = null;
+		// Test co-location
 
-		for (JobVertex vertex : graph.getVertices()) {
+		JobVertex itSource1 = null;
+		JobVertex itSink1 = null;
+
+		for (JobVertex vertex : jg.getVertices()) {
 			if (vertex.getName().contains("IterationSource")) {
-				itSource = vertex;
+				itSource1 = vertex;
 			} else if (vertex.getName().contains("IterationSink")) {
-				itSink = vertex;
-			} else if (vertex.getName().contains("HeadOperator")) {
-				headOp = vertex;
-			} else if (vertex.getName().contains("TailOp")) {
-				tailOp = vertex;
+
+				itSink1 = vertex;
+
 			}
 		}
 
-		assertTrue(itSource.getCoLocationGroup() != null);
-		assertEquals(itSource.getCoLocationGroup(), itSink.getCoLocationGroup());
-		assertEquals(headOp.getParallelism(), 2);
-		assertEquals(tailOp.getParallelism(), 3);
-		assertEquals(itSource.getParallelism(), itSink.getParallelism());
+		assertTrue(itSource1.getCoLocationGroup() != null);
+		assertEquals(itSource1.getCoLocationGroup(), itSink1.getCoLocationGroup());
 	}
 
-	@SuppressWarnings("unchecked")
 	@Test
-	public void testPartitioning() throws Exception {
+	public void testmultipleHeadsTailsWithTailPartitioning() {
 		StreamExecutionEnvironment env = new TestStreamEnvironment(4, MEMORYSIZE);
+		DataStream<Integer> source1 = env.fromElements(1, 2, 3, 4, 5).shuffle();
+		DataStream<Integer> source2 = env.fromElements(1, 2, 3, 4, 5);
 
-		IterativeDataStream<Boolean> it = env.fromElements(true).iterate();
+		IterativeDataStream<Integer> iter1 = source1.union(source2).iterate();
 
-		IterativeDataStream<Boolean> it2 = env.fromElements(true).iterate();
+		DataStream<Integer> head1 = iter1.map(NoOpIntMap);
+		DataStream<Integer> head2 = iter1.map(NoOpIntMap).setParallelism(2).name("shuffle");
+		DataStream<Integer> head3 = iter1.map(NoOpIntMap).setParallelism(2)
+				.addSink(new NoOpSink<Integer>());
+		DataStream<Integer> head4 = iter1.map(NoOpIntMap).addSink(new NoOpSink<Integer>());
 
-		DataStream<Boolean> head = it.map(new NoOpMap()).name("Head1").broadcast();
-		DataStream<Boolean> head2 = it2.map(new NoOpMap()).name("Head2").broadcast();
+		SplitDataStream<Integer> source3 = env.fromElements(1, 2, 3, 4, 5).name("split")
+				.split(new OutputSelector<Integer>() {
 
-		it.closeWith(head.union(head.map(new NoOpMap()).shuffle()), true);
-		it2.closeWith(head2, false);
+					@Override
+					public Iterable<String> select(Integer value) {
+						return value % 2 == 0 ? Arrays.asList("even") : Arrays.asList("odd");
+					}
+				});
+
+		iter1.closeWith(
+				source3.select("even").union(
+						head1.map(NoOpIntMap).broadcast().setParallelism(1).name("bc"),
+						head2.shuffle()), true);
 
 		StreamGraph graph = env.getStreamGraph();
 
-		for (StreamLoop loop : graph.getStreamLoops()) {
-			StreamEdge tailToSink = loop.getSink().getInEdges().get(0);
-			if (tailToSink.getSourceVertex().getOperatorName().contains("Head1")) {
-				assertTrue(tailToSink.getPartitioner() instanceof BroadcastPartitioner);
-				assertTrue(loop.getSink().getInEdges().get(1).getPartitioner() instanceof ShufflePartitioner);
-			} else {
-				assertTrue(tailToSink.getPartitioner() instanceof RebalancePartitioner);
+		JobGraph jg = graph.getJobGraph();
+
+		assertEquals(1, graph.getStreamLoops().size());
+
+		StreamLoop loop = new ArrayList<StreamLoop>(graph.getStreamLoops()).get(0);
+
+		assertEquals(4, loop.getHeads().size());
+		assertEquals(3, loop.getTails().size());
+
+		assertEquals(2, loop.getSourceSinkPairs().size());
+		List<Tuple2<StreamNode, StreamNode>> pairs = loop.getSourceSinkPairs();
+		Tuple2<StreamNode, StreamNode> pair1 = pairs.get(0).f0.getParallelism() == 2 ? pairs.get(0)
+				: pairs.get(1);
+		Tuple2<StreamNode, StreamNode> pair2 = pairs.get(0).f0.getParallelism() == 4 ? pairs.get(0)
+				: pairs.get(1);
+
+		assertEquals(pair1.f0.getParallelism(), pair1.f1.getParallelism());
+		assertEquals(2, pair1.f0.getParallelism());
+		assertEquals(2, pair1.f0.getOutEdges().size());
+		assertEquals(3, pair1.f1.getInEdges().size());
+
+		for (StreamEdge edge : pair1.f0.getOutEdges()) {
+			assertTrue(edge.getPartitioner() instanceof RebalancePartitioner);
+			assertEquals(2, edge.getTargetVertex().getParallelism());
+		}
+		for (StreamEdge edge : pair1.f1.getInEdges()) {
+			String tailName = edge.getSourceVertex().getOperatorName();
+			if (tailName.equals("split")) {
+				assertTrue(edge.getPartitioner() instanceof RebalancePartitioner);
+			} else if (tailName.equals("bc")) {
+				assertTrue(edge.getPartitioner() instanceof BroadcastPartitioner);
+			} else if (tailName.equals("shuffle")) {
+				assertTrue(edge.getPartitioner() instanceof ShufflePartitioner);
+			}
+
+		}
+
+		assertEquals(pair2.f0.getParallelism(), pair2.f1.getParallelism());
+		assertEquals(4, pair2.f0.getParallelism());
+		assertEquals(2, pair2.f0.getOutEdges().size());
+		assertEquals(3, pair2.f1.getInEdges().size());
+
+		for (StreamEdge edge : pair2.f0.getOutEdges()) {
+			assertTrue(edge.getPartitioner() instanceof RebalancePartitioner);
+			assertEquals(4, edge.getTargetVertex().getParallelism());
+		}
+		for (StreamEdge edge : pair2.f1.getInEdges()) {
+			String tailName = edge.getSourceVertex().getOperatorName();
+			if (tailName.equals("split")) {
+				assertTrue(edge.getPartitioner() instanceof RebalancePartitioner);
+			} else if (tailName.equals("bc")) {
+				assertTrue(edge.getPartitioner() instanceof BroadcastPartitioner);
+			} else if (tailName.equals("shuffle")) {
+				assertTrue(edge.getPartitioner() instanceof ShufflePartitioner);
 			}
+
 		}
 
+		assertTrue(loop.getTailSelectedNames().contains(Arrays.asList("even")));
+
+		// Test co-location
+
+		JobVertex itSource1 = null;
+		JobVertex itSource2 = null;
+		JobVertex itSink1 = null;
+		JobVertex itSink2 = null;
+
+		for (JobVertex vertex : jg.getVertices()) {
+			if (vertex.getName().contains("IterationSource")) {
+				if (vertex.getName().contains("_0")) {
+					itSource1 = vertex;
+				} else if (vertex.getName().contains("_1")) {
+					itSource2 = vertex;
+				}
+			} else if (vertex.getName().contains("IterationSink")) {
+				if (vertex.getName().contains("_0")) {
+					itSink1 = vertex;
+				} else if (vertex.getName().contains("_1")) {
+					itSink2 = vertex;
+				}
+			}
+		}
+
+		assertTrue(itSource1.getCoLocationGroup() != null);
+		assertTrue(itSource2.getCoLocationGroup() != null);
+
+		assertEquals(itSource1.getCoLocationGroup(), itSink1.getCoLocationGroup());
+		assertEquals(itSource2.getCoLocationGroup(), itSink2.getCoLocationGroup());
+		assertNotEquals(itSource1.getCoLocationGroup(), itSource2.getCoLocationGroup());
 	}
 
+	@SuppressWarnings("rawtypes")
 	@Test
-	public void test() throws Exception {
-		StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
+	public void testSimpleIteration() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(PARALLELISM);
 		iterated = new boolean[PARALLELISM];
 
-		env = constructIterativeJob(env);
+		DataStream<Boolean> source = env
+				.fromCollection(Collections.nCopies(PARALLELISM * 2, false));
+
+		IterativeDataStream<Boolean> iteration = source.iterate(3000);
+
+		DataStream<Boolean> increment = iteration.flatMap(new IterationHead()).map(NoOpBoolMap);
+
+		iteration.map(NoOpBoolMap).addSink(new NoOpSink());
+
+		iteration.closeWith(increment).addSink(new NoOpSink());
 
 		env.execute();
 
@@ -195,55 +338,135 @@ public class IterateTest {
 		}
 
 	}
-	
+
 	@Test
 	public void testCoIteration() throws Exception {
-		StreamExecutionEnvironment env = new TestStreamEnvironment(2, MEMORYSIZE);
-		
-		
-		ConnectedIterativeDataStream<Integer, String> coIt =  env.fromElements(0, 0).iterate(2000).withFeedbackType("String");
-		
-		try{
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(2);
+
+		ConnectedIterativeDataStream<Integer, String> coIt = env.fromElements(0, 0).iterate(2000)
+				.withFeedbackType("String");
+
+		try {
 			coIt.groupBy(1, 2);
 			fail();
-		} catch (UnsupportedOperationException e){}
-		
-		DataStream<String> head = coIt.flatMap(new CoFlatMapFunction<Integer, String, String>() {
+		} catch (UnsupportedOperationException e) {
+		}
+
+		DataStream<String> head = coIt
+				.flatMap(new RichCoFlatMapFunction<Integer, String, String>() {
+
+					private static final long serialVersionUID = 1L;
+					boolean seenFromSource = false;
+
+					@Override
+					public void flatMap1(Integer value, Collector<String> out) throws Exception {
+						out.collect(((Integer) (value + 1)).toString());
+					}
+
+					@Override
+					public void flatMap2(String value, Collector<String> out) throws Exception {
+						Integer intVal = Integer.valueOf(value);
+						if (intVal < 2) {
+							out.collect(((Integer) (intVal + 1)).toString());
+						}
+						if (intVal == 1000 || intVal == 2000) {
+							seenFromSource = true;
+						}
+					}
 
-			private static final long serialVersionUID = 1L;
+					@Override
+					public void close() {
+						assertTrue(seenFromSource);
+					}
+				});
+
+		coIt.map(new CoMapFunction<Integer, String, String>() {
 
 			@Override
-			public void flatMap1(Integer value, Collector<String> out) throws Exception {
-				out.collect(((Integer) (value + 1)).toString());
+			public String map1(Integer value) throws Exception {
+				return value.toString();
 			}
 
 			@Override
-			public void flatMap2(String value, Collector<String> out) throws Exception {
-				Integer intVal = Integer.valueOf(value);
-				if(intVal < 2){
-					out.collect(((Integer) (intVal + 1)).toString());
-				}
-				
+			public String map2(String value) throws Exception {
+				return value;
 			}
-		});
-		
-		coIt.closeWith(head.broadcast());
-	
+		}).setParallelism(1).addSink(new NoOpSink<String>());
+
+		coIt.closeWith(head.broadcast().union(env.fromElements("1000", "2000").rebalance()));
+
 		head.addSink(new TestSink()).setParallelism(1);
-		
+
 		env.execute();
-		
-		assertEquals(new HashSet<String>(Arrays.asList("1","1","2","2","2","2")), TestSink.collected);
+
+		Collections.sort(TestSink.collected);
+		assertEquals(Arrays.asList("1", "1", "2", "2", "2", "2"), TestSink.collected);
+		assertEquals(2, new ArrayList<StreamLoop>(env.getStreamGraph().getStreamLoops()).get(0)
+				.getSourceSinkPairs().size());
 
 	}
 
 	@Test
+	public void testGroupByFeedback() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(3);
+
+		KeySelector<Integer, Integer> key = new KeySelector<Integer, Integer>() {
+
+			@Override
+			public Integer getKey(Integer value) throws Exception {
+				return value % 3;
+			}
+		};
+
+		DataStream<Integer> source = env.fromElements(1, 2, 3);
+
+		IterativeDataStream<Integer> it = source.groupBy(key).iterate(3000);
+
+		DataStream<Integer> head = it.flatMap(new RichFlatMapFunction<Integer, Integer>() {
+
+			int received = 0;
+			int key = -1;
+
+			@Override
+			public void flatMap(Integer value, Collector<Integer> out) throws Exception {
+				received++;
+				if (key == -1) {
+					key = value % 3;
+				} else {
+					assertEquals(key, value % 3);
+				}
+				if (value > 0) {
+					out.collect(value - 1);
+				}
+			}
+
+			@Override
+			public void close() {
+				assertTrue(received > 1);
+			}
+		});
+
+		it.closeWith(head.groupBy(key).union(head.map(NoOpIntMap).setParallelism(2).groupBy(key)),
+				true).addSink(new NoOpSink<Integer>());
+
+		env.execute();
+	}
+
+	@SuppressWarnings("deprecation")
+	@Test
 	public void testWithCheckPointing() throws Exception {
 		StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
+		env.enableCheckpointing();
 
-		env = constructIterativeJob(env);
+		DataStream<Boolean> source = env
+				.fromCollection(Collections.nCopies(PARALLELISM * 2, false));
+
+		IterativeDataStream<Boolean> iteration = source.iterate(3000);
+
+		iteration.closeWith(iteration.flatMap(new IterationHead()));
 
-		env.enableCheckpointing();
 		try {
 			env.execute();
 
@@ -252,8 +475,7 @@ public class IterateTest {
 		} catch (UnsupportedOperationException e) {
 			// expected behaviour
 		}
-		
-		
+
 		// Test force checkpointing
 
 		try {
@@ -265,22 +487,75 @@ public class IterateTest {
 		} catch (UnsupportedOperationException e) {
 			// expected behaviour
 		}
-		
+
 		env.enableCheckpointing(1, true);
 		env.getStreamGraph().getJobGraph();
+	}
+
+	public static final class IterationHead extends RichFlatMapFunction<Boolean, Boolean> {
+		public void flatMap(Boolean value, Collector<Boolean> out) throws Exception {
+			int indx = getRuntimeContext().getIndexOfThisSubtask();
+			if (value) {
+				iterated[indx] = true;
+			} else {
+				out.collect(true);
+			}
+		}
+	}
+
+	public static final class NoOpSink<T> extends RichSinkFunction<T> {
+		private List<T> received;
+
+		public void invoke(T tuple) {
+			received.add(tuple);
+		}
+
+		public void open(Configuration conf) {
+			received = new ArrayList<T>();
+		}
 
+		public void close() {
+			assertTrue(received.size() > 0);
+		}
 	}
-	
-	public static class TestSink implements SinkFunction<String>{
+
+	public static CoMapFunction<Integer, String, String> NoOpCoMap = new CoMapFunction<Integer, String, String>() {
+
+		public String map1(Integer value) throws Exception {
+			return value.toString();
+		}
+
+		public String map2(String value) throws Exception {
+			return value;
+		}
+	};
+
+	public static MapFunction<Integer, Integer> NoOpIntMap = new MapFunction<Integer, Integer>() {
+
+		public Integer map(Integer value) throws Exception {
+			return value;
+		}
+
+	};
+
+	public static MapFunction<Boolean, Boolean> NoOpBoolMap = new MapFunction<Boolean, Boolean>() {
+
+		public Boolean map(Boolean value) throws Exception {
+			return value;
+		}
+
+	};
+
+	public static class TestSink implements SinkFunction<String> {
 
 		private static final long serialVersionUID = 1L;
-		public static Set<String> collected = new HashSet<String>();
-		
+		public static List<String> collected = new ArrayList<String>();
+
 		@Override
 		public void invoke(String value) throws Exception {
 			collected.add(value);
 		}
-		
+
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index fbd6502..2b0f60e 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -78,6 +78,10 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * Returns the parallelism of this operation.
    */
   def getParallelism = javaStream.getParallelism
+  
+  def getPartitioner = javaStream.getPartitioner
+  
+  def getSelectedNames = javaStream.getSelectedNames
 
   /**
    * Returns the execution config.