You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/06/26 19:47:15 UTC

flink git commit: [FLINK-2279] [streaming] Add Connected Iteration

Repository: flink
Updated Branches:
  refs/heads/master bc3684e69 -> 128d3bcfe


[FLINK-2279] [streaming] Add Connected Iteration

Closes #870


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/128d3bcf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/128d3bcf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/128d3bcf

Branch: refs/heads/master
Commit: 128d3bcfe27ed3149be61f0481047f8f855e2c2d
Parents: bc3684e
Author: Gyula Fora <gy...@apache.org>
Authored: Fri Jun 26 16:42:24 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Fri Jun 26 19:45:53 2015 +0200

----------------------------------------------------------------------
 docs/apis/streaming_guide.md                    |  38 +++-
 .../common/functions/AbstractRichFunction.java  |   2 +-
 .../api/datastream/ConnectedDataStream.java     |   8 +-
 .../streaming/api/datastream/DataStream.java    |  34 ++--
 .../api/datastream/IterativeDataStream.java     | 176 +++++++++++++++++--
 .../flink/streaming/api/graph/StreamGraph.java  |  15 +-
 .../apache/flink/streaming/api/IterateTest.java |  60 ++++++-
 .../flink/streaming/api/scala/DataStream.scala  |  45 +++--
 .../streaming/api/scala/DataStreamTest.scala    |  29 +++
 9 files changed, 351 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/128d3bcf/docs/apis/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md
index 9218c98..8337b68 100644
--- a/docs/apis/streaming_guide.md
+++ b/docs/apis/streaming_guide.md
@@ -1108,7 +1108,7 @@ The operator applied on the iteration starting point is the head of the iteratio
 DataStream<Integer> head = iteration.map(new IterationHead());
 {% endhighlight %}
 
-To close an iteration and define the iteration tail, the user calls `closeWith(iterationTail)` method of the `IterativeDataStream`. This iteration tail (the DataStream given to the `closeWith` function) will be fed back to the iteration head. A common pattern is to use [filters](#filter) to separate the output of the iteration from the feedback-stream.
+To close an iteration and define the iteration tail, the user calls `closeWith(feedbackStream)` method of the `IterativeDataStream`. This iteration tail (the DataStream given to the `closeWith` function) will be fed back to the iteration head. A common pattern is to use [filters](#filter) to separate the output of the iteration from the feedback-stream.
 
 {% highlight java %}
 DataStream<Integer> tail = head.map(new IterationTail());
@@ -1116,8 +1116,6 @@ DataStream<Integer> tail = head.map(new IterationTail());
 iteration.closeWith(tail.filter(isFeedback));
 
 DataStream<Integer> output = tail.filter(isOutput);
-
-output.map(…).project(…);
 {% endhighlight %}
 
 In this case, all values passing the `isFeedback` filter will be fed back to the iteration head, and the values passing the `isOutput` filter will produce the output of the iteration that can be transformed further (here with a `map` and a `projection`) outside the iteration.
@@ -1126,6 +1124,22 @@ Because iterative streaming programs do not have a set number of iterations for
 To use this functionality the user needs to add the maxWaitTimeMillis parameter to the `dataStream.iterate(…)` call to control the max wait time.
 
 By default the partitioning of the feedback stream will be automatically set to be the same as the input of the iteration head. To override this the user can set an optional boolean flag in the `closeWith` method. 
+
+#### Iteration head as a co-operator
+The user can also treat the input and feedback stream of a streaming iteration as a `ConnectedDataStream`. This can be used to distinguish the feedback tuples and also to change the type of the iteration feedback. 
+
+To use this feature the user needs to call the `withFeedbackType(type)` method of the iterative data stream and pass the type of the feedback stream:
+
+{% highlight java %}
+ConnectedIterativeDataStream<Integer, String> coiteration = source.iterate(maxWaitTimeMillis).withFeedbackType(“String”);
+
+DataStream<String> head = coiteration.flatMap(new CoFlatMapFunction<Integer, String, String>(){})
+
+iteration.closeWith(head);
+
+{% endhighlight %}
+
+In this case the original input of the head operator will be used as the first input to the co-operator and the feedback stream will be used as the second input.
 </div>
 <div data-lang="scala" markdown="1">
 The Flink Streaming API supports implementing iterative stream processing dataflows similarly to the batch Flink API. Iterative streaming programs also implement a step function and embed it into an `IterativeDataStream`.
@@ -1134,19 +1148,31 @@ Unlike in the batch API the user does not define the maximum number of iteration
 A common pattern is to use [filters](#filter) to separate the output from the feedback-stream. In this case all values passing the `isFeedback` filter will be fed back to the iteration head, and the values passing the `isOutput` filter will produce the output of the iteration that can be transformed further (here with a `map` and a `projection`) outside the iteration.
 
 {% highlight scala %}
-val iteratedStream = someDataStream.iterate(maxWaitTime) {
+val iteratedStream = someDataStream.iterate(
   iteration => {
     val head = iteration.map(iterationHead)
     val tail = head.map(iterationTail)
     (tail.filter(isFeedback), tail.filter(isOutput))
-  }
-}.map(…).project(…)
+  }, maxWaitTimeMillis).map(…).project(…)
 {% endhighlight %}
 
 Because iterative streaming programs do not have a set number of iterations for each data element, the streaming program has no information on the end of its input. As a consequence iterative streaming programs run until the user manually stops the program. While this is acceptable under normal circumstances a method is provided to allow iterative programs to shut down automatically if no input received by the iteration head for a predefined number of milliseconds.
 To use this functionality the user needs to add the maxWaitTimeMillis parameter to the `dataStream.iterate(…)` call to control the max wait time. 
 
 By default the partitioning of the feedback stream will be automatically set to be the same as the input of the iteration head. To override this the user can set an optional boolean flag in the `iterate` method. 
+
+#### Iteration head as a co-operator
+The user can also treat the input and feedback stream of a streaming iteration as a `ConnectedDataStream`. This can be used to distinguish the feedback tuples and also to change the type of the iteration feedback. 
+
+To use this feature the user needs to call implement a step function that operates on a `ConnectedDataStream` and pass it to the `iterate(…)` call.
+
+{% highlight scala %}
+val iteratedStream = someDataStream.iterate(
+			stepFunction: ConnectedDataStream[T, F] => (DataStream[F], DataStream[R]), 
+			maxWaitTimeMillis)
+{% endhighlight %}
+
+In this case the original input of the head operator will be used as the first input to the co-operator and the feedback stream will be used as the second input.
 </div>
 
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/128d3bcf/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
index 3bbd1d2..5a019aa 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
@@ -1,5 +1,5 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
+c * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
  * regarding copyright ownership.  The ASF licenses this file

http://git-wip-us.apache.org/repos/asf/flink/blob/128d3bcf/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index 2626d9c..6c0c7eb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -61,8 +61,12 @@ public class ConnectedDataStream<IN1, IN2> {
 	protected ConnectedDataStream(DataStream<IN1> input1, DataStream<IN2> input2) {
 		this.jobGraphBuilder = input1.streamGraph;
 		this.environment = input1.environment;
-		this.dataStream1 = input1.copy();
-		this.dataStream2 = input2.copy();
+		if (input1 != null) {
+			this.dataStream1 = input1.copy();
+		}
+		if (input2 != null) {
+			this.dataStream2 = input2.copy();
+		}
 
 		if ((input1 instanceof GroupedDataStream) && (input2 instanceof GroupedDataStream)) {
 			this.isGrouped = true;

http://git-wip-us.apache.org/repos/asf/flink/blob/128d3bcf/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index fc16264..a28ed24 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -539,12 +539,18 @@ public class DataStream<OUT> {
 	 * this IterativeDataStream will be the iteration head. The data stream
 	 * given to the {@link IterativeDataStream#closeWith(DataStream)} method is
 	 * the data stream that will be fed back and used as the input for the
-	 * iteration head. A common usage pattern for streaming iterations is to use
-	 * output splitting to send a part of the closing data stream to the head.
-	 * Refer to {@link #split(OutputSelector)} for more information.
+	 * iteration head. The user can also use different feedback type than the
+	 * input of the iteration and treat the input and feedback streams as a
+	 * {@link ConnectedDataStream} be calling
+	 * {@link IterativeDataStream#withFeedbackType(TypeInfo)}
+	 * <p>
+	 * A common usage pattern for streaming iterations is to use output
+	 * splitting to send a part of the closing data stream to the head. Refer to
+	 * {@link #split(OutputSelector)} for more information.
 	 * <p>
 	 * The iteration edge will be partitioned the same way as the first input of
-	 * the iteration head.
+	 * the iteration head unless it is changed in the
+	 * {@link IterativeDataStream#closeWith(DataStream, boolean)} call.
 	 * <p>
 	 * By default a DataStream with iteration will never terminate, but the user
 	 * can use the maxWaitTime parameter to set a max waiting time for the
@@ -564,12 +570,18 @@ public class DataStream<OUT> {
 	 * this IterativeDataStream will be the iteration head. The data stream
 	 * given to the {@link IterativeDataStream#closeWith(DataStream)} method is
 	 * the data stream that will be fed back and used as the input for the
-	 * iteration head. A common usage pattern for streaming iterations is to use
-	 * output splitting to send a part of the closing data stream to the head.
-	 * Refer to {@link #split(OutputSelector)} for more information.
+	 * iteration head. The user can also use different feedback type than the
+	 * input of the iteration and treat the input and feedback streams as a
+	 * {@link ConnectedDataStream} be calling
+	 * {@link IterativeDataStream#withFeedbackType(TypeInfo)}
+	 * <p>
+	 * A common usage pattern for streaming iterations is to use output
+	 * splitting to send a part of the closing data stream to the head. Refer to
+	 * {@link #split(OutputSelector)} for more information.
 	 * <p>
 	 * The iteration edge will be partitioned the same way as the first input of
-	 * the iteration head.
+	 * the iteration head unless it is changed in the
+	 * {@link IterativeDataStream#closeWith(DataStream, boolean)} call.
 	 * <p>
 	 * By default a DataStream with iteration will never terminate, but the user
 	 * can use the maxWaitTime parameter to set a max waiting time for the
@@ -1309,15 +1321,15 @@ public class DataStream<OUT> {
 		
 		if (iterationID != null) {
 			//This data stream is an input to some iteration
-			addIterationSource(returnStream);
+			addIterationSource(returnStream, null);
 		}
 
 		return returnStream;
 	}
 	
-	private <X> void addIterationSource(DataStream<X> dataStream) {
+	protected <X> void addIterationSource(DataStream<X> dataStream, TypeInformation<?> feedbackType) {
 		Integer id = ++counter;
-		streamGraph.addIterationHead(id, dataStream.getId(), iterationID, iterationWaitTime);
+		streamGraph.addIterationHead(id, dataStream.getId(), iterationID, iterationWaitTime, feedbackType);
 		streamGraph.setParallelism(id, dataStream.getParallelism());
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/128d3bcf/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
index 6a48b6a..da3d885 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
@@ -17,6 +17,12 @@
 
 package org.apache.flink.streaming.api.datastream;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+
 /**
  * The iterative data stream represents the start of an iteration in a
  * {@link DataStream}.
@@ -45,16 +51,13 @@ public class IterativeDataStream<IN> extends
 	 * {@link DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector)}
 	 * for more information.
 	 * 
-	 * 
-	 * 
-	 * 
-	 * @param iterationTail
-	 *            The data stream that is fed back to the next iteration head.
+	 * @param feedbackStream
+	 *            {@link DataStream} that will be used as input to the iteration
+	 *            head.
 	 * @param keepPartitioning
 	 *            If true the feedback partitioning will be kept as it is (not
 	 *            changed to match the input of the iteration head)
-	 * @return Returns the stream that was fed back to the iteration. In most
-	 *         cases no further transformation are applied on this stream.
+	 * @return The feedback stream.
 	 * 
 	 */
 	public DataStream<IN> closeWith(DataStream<IN> iterationTail, boolean keepPartitioning) {
@@ -83,13 +86,164 @@ public class IterativeDataStream<IN> extends
 	 * for more information.
 	 * 
 	 * 
-	 * @param iterationTail
-	 *            The data stream that is fed back to the next iteration head.
-	 * @return Returns the stream that was fed back to the iteration. In most
-	 *         cases no further transformation are applied on this stream.
+	 * @param feedbackStream
+	 *            {@link DataStream} that will be used as input to the
+	 *            iteration head.
+	 * @return The feedback stream.
 	 * 
 	 */
 	public DataStream<IN> closeWith(DataStream<IN> iterationTail) {
 		return closeWith(iterationTail,false);
 	}
+
+	/**
+	 * Changes the feedback type of the iteration and allows the user to apply
+	 * co-transformations on the input and feedback stream, as in a
+	 * {@link ConnectedDataStream}.
+	 * <p>
+	 * For type safety the user needs to define the feedback type
+	 * 
+	 * @param feedbackTypeString
+	 *            String describing the type information of the feedback stream.
+	 * @return A {@link ConnectedIterativeDataStream}.
+	 */
+	public <F> ConnectedIterativeDataStream<IN, F> withFeedbackType(String feedbackTypeString) {
+		return withFeedbackType(TypeInfoParser.<F> parse(feedbackTypeString));
+	}
+
+	/**
+	 * Changes the feedback type of the iteration and allows the user to apply
+	 * co-transformations on the input and feedback stream, as in a
+	 * {@link ConnectedDataStream}.
+	 * <p>
+	 * For type safety the user needs to define the feedback type
+	 * 
+	 * @param feedbackTypeClass
+	 *            Class of the elements in the feedback stream.
+	 * @return A {@link ConnectedIterativeDataStream}.
+	 */
+	public <F> ConnectedIterativeDataStream<IN, F> withFeedbackType(Class<F> feedbackTypeClass) {
+		return withFeedbackType(TypeExtractor.getForClass(feedbackTypeClass));
+	}
+
+	/**
+	 * Changes the feedback type of the iteration and allows the user to apply
+	 * co-transformations on the input and feedback stream, as in a
+	 * {@link ConnectedDataStream}.
+	 * <p>
+	 * For type safety the user needs to define the feedback type
+	 * 
+	 * @param feedbackType
+	 *            The type information of the feedback stream.
+	 * @return A {@link ConnectedIterativeDataStream}.
+	 */
+	public <F> ConnectedIterativeDataStream<IN, F> withFeedbackType(TypeInformation<F> feedbackType) {
+		return new ConnectedIterativeDataStream<IN, F>(this, feedbackType);
+	}
+	
+	/**
+	 * The {@link ConnectedIterativeDataStream} represent a start of an
+	 * iterative part of a streaming program, where the original input of the
+	 * iteration and the feedback of the iteration are connected as in a
+	 * {@link ConnectedDataStream}.
+	 * <p>
+	 * The user can distinguish between the two inputs using co-transformation,
+	 * thus eliminating the need for mapping the inputs and outputs to a common
+	 * type.
+	 * 
+	 * @param <I>
+	 *            Type of the input of the iteration
+	 * @param <F>
+	 *            Type of the feedback of the iteration
+	 */
+	public static class ConnectedIterativeDataStream<I, F> extends ConnectedDataStream<I, F>{
+
+		private IterativeDataStream<I> input;
+		private TypeInformation<F> feedbackType;
+
+		public ConnectedIterativeDataStream(IterativeDataStream<I> input, TypeInformation<F> feedbackType) {
+			super(input, null);
+			this.input = input;
+			this.feedbackType = feedbackType;
+		}
+		
+		@Override
+		public TypeInformation<F> getType2() {
+			return feedbackType;
+		}
+		
+		@Override
+		public <OUT> SingleOutputStreamOperator<OUT, ?> transform(String functionName,
+				TypeInformation<OUT> outTypeInfo, TwoInputStreamOperator<I, F, OUT> operator) {
+
+			@SuppressWarnings({ "unchecked", "rawtypes" })
+			SingleOutputStreamOperator<OUT, ?> returnStream = new SingleOutputStreamOperator(
+					input.environment, outTypeInfo, operator);
+
+			input.streamGraph.addCoOperator(returnStream.getId(), operator, input.getType(),
+					feedbackType, outTypeInfo, functionName);
+
+			input.connectGraph(input, returnStream.getId(), 1);
+			
+			input.addIterationSource(returnStream, feedbackType);
+
+			return returnStream;
+		}
+		
+		/**
+		 * Closes the iteration. This method defines the end of the iterative
+		 * program part that will be fed back to the start of the iteration as
+		 * the second input in the {@link ConnectedDataStream}.
+		 * 
+		 * @param feedbackStream
+		 *            {@link DataStream} that will be used as second input to
+		 *            the iteration head.
+		 * @return The feedback stream.
+		 * 
+		 */
+		public DataStream<F> closeWith(DataStream<F> feedbackStream) {
+			DataStream<F> iterationSink = new DataStreamSink<F>(input.environment, "Iteration Sink",
+					null, null);
+			
+			input.streamGraph.addIterationTail(iterationSink.getId(), feedbackStream.getId(), input.iterationID,
+					input.iterationWaitTime);
+
+			input.connectGraph(feedbackStream, iterationSink.getId(), 0);
+			return feedbackStream;
+		}
+		
+		private UnsupportedOperationException groupingException = new UnsupportedOperationException(
+				"Cannot change the input partitioning of an iteration head directly. Apply the partitioning on the input and feedback streams instead.");
+		
+		@Override
+		public ConnectedDataStream<I, F> groupBy(int keyPosition1, int keyPosition2) {throw groupingException;}
+		
+		@Override
+		public ConnectedDataStream<I, F> groupBy(int[] keyPositions1, int[] keyPositions2) {throw groupingException;}
+		
+		@Override
+		public ConnectedDataStream<I, F> groupBy(String field1, String field2) {throw groupingException;}
+		
+		@Override
+		public ConnectedDataStream<I, F> groupBy(String[] fields1, String[] fields2) {throw groupingException;}
+		
+		@Override
+		public ConnectedDataStream<I, F> groupBy(KeySelector<I, ?> keySelector1,KeySelector<F, ?> keySelector2) {throw groupingException;}
+		
+		@Override
+		public ConnectedDataStream<I, F> partitionByHash(int keyPosition1, int keyPosition2) {throw groupingException;}
+		
+		@Override
+		public ConnectedDataStream<I, F> partitionByHash(int[] keyPositions1, int[] keyPositions2) {throw groupingException;}
+		
+		@Override
+		public ConnectedDataStream<I, F> partitionByHash(String field1, String field2) {throw groupingException;}
+		
+		@Override
+		public ConnectedDataStream<I, F> partitionByHash(String[] fields1, String[] fields2) {throw groupingException;}
+		
+		@Override
+		public ConnectedDataStream<I, F> partitionByHash(KeySelector<I, ?> keySelector1, KeySelector<F, ?> keySelector2) {throw groupingException;}
+		
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/128d3bcf/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index e07d881..cae24be 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -196,9 +196,9 @@ public class StreamGraph extends StreamingPlan {
 		}
 	}
 
-	@SuppressWarnings("rawtypes")
+	@SuppressWarnings({ "rawtypes", "unchecked" })
 	public void addIterationHead(Integer sourceID, Integer iterationHead, Integer iterationID,
-			long timeOut) {
+			long timeOut, TypeInformation<?> feedbackType) {
 
 		StreamNode itSource = addNode(sourceID, StreamIterationHead.class, null, null);
 
@@ -206,12 +206,17 @@ public class StreamGraph extends StreamingPlan {
 		streamLoops.put(iterationID, iteration);
 		vertexIDtoLoop.put(sourceID, iteration);
 
-		setSerializersFrom(iterationHead, sourceID);
 		itSource.setOperatorName("IterationSource-" + sourceID);
 		itSource.setParallelism(getStreamNode(iterationHead).getParallelism());
 		
-
-		addEdge(sourceID, iterationHead, new RebalancePartitioner(true), 0, new ArrayList<String>());
+		if(feedbackType == null){
+			setSerializersFrom(iterationHead, sourceID);
+			addEdge(sourceID, iterationHead, new RebalancePartitioner(true), 0, new ArrayList<String>());
+		}else{
+			itSource.setSerializerOut(new StreamRecordSerializer(feedbackType, executionConfig));
+			addEdge(sourceID, iterationHead, new RebalancePartitioner(true), 2, new ArrayList<String>());
+		}
+		
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("ITERATION SOURCE: {}", sourceID);

http://git-wip-us.apache.org/repos/asf/flink/blob/128d3bcf/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index a9c0ee3..c660dbc 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -21,15 +21,20 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.IterativeDataStream;
+import org.apache.flink.streaming.api.datastream.IterativeDataStream.ConnectedIterativeDataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.graph.StreamGraph;
@@ -191,6 +196,47 @@ public class IterateTest {
 		}
 
 	}
+	
+	@Test
+	public void testCoIteration() throws Exception {
+		StreamExecutionEnvironment env = new TestStreamEnvironment(2, MEMORYSIZE);
+		
+		
+		ConnectedIterativeDataStream<Integer, String> coIt =  env.fromElements(0, 0).iterate(2000).withFeedbackType("String");
+		
+		try{
+			coIt.groupBy(1, 2);
+			fail();
+		} catch (UnsupportedOperationException e){}
+		
+		DataStream<String> head = coIt.flatMap(new CoFlatMapFunction<Integer, String, String>() {
+
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public void flatMap1(Integer value, Collector<String> out) throws Exception {
+				out.collect(((Integer) (value + 1)).toString());
+			}
+
+			@Override
+			public void flatMap2(String value, Collector<String> out) throws Exception {
+				Integer intVal = Integer.valueOf(value);
+				if(intVal < 2){
+					out.collect(((Integer) (intVal + 1)).toString());
+				}
+				
+			}
+		});
+		
+		coIt.closeWith(head.broadcast());
+	
+		head.addSink(new TestSink()).setParallelism(1);
+		
+		env.execute();
+		
+		assertEquals(new HashSet<String>(Arrays.asList("1","1","2","2","2","2")), TestSink.collected);
+
+	}
 
 	@Test
 	public void testWithCheckPointing() throws Exception {
@@ -225,5 +271,17 @@ public class IterateTest {
 		env.getStreamGraph().getJobGraph();
 
 	}
+	
+	public static class TestSink implements SinkFunction<String>{
+
+		private static final long serialVersionUID = 1L;
+		public static Set<String> collected = new HashSet<String>();
+		
+		@Override
+		public void invoke(String value) throws Exception {
+			collected.add(value);
+		}
+		
+	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/128d3bcf/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 96f951b..5672e65 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -353,39 +353,46 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * the keepPartitioning flag to true
    *
    */
-  def iterate[R](stepFunction: DataStream[T] => (DataStream[T], DataStream[R])): DataStream[R] =
-    iterate(0)(stepFunction)
-  
+  def iterate[R](stepFunction: DataStream[T] => (DataStream[T], DataStream[R]),
+                    maxWaitTimeMillis:Long = 0,
+                    keepPartitioning: Boolean = false) : DataStream[R] = {
+    val iterativeStream = javaStream.iterate(maxWaitTimeMillis)
 
+    val (feedback, output) = stepFunction(new DataStream[T](iterativeStream))
+    iterativeStream.closeWith(feedback.getJavaStream, keepPartitioning)
+    output
+  }
+  
   /**
    * Initiates an iterative part of the program that creates a loop by feeding
    * back data streams. To create a streaming iteration the user needs to define
    * a transformation that creates two DataStreams. The first one is the output
    * that will be fed back to the start of the iteration and the second is the output
    * stream of the iterative part.
+   * 
+   * The input stream of the iterate operator and the feedback stream will be treated
+   * as a ConnectedDataStream where the the input is connected with the feedback stream.
+   * 
+   * This allows the user to distinguish standard input from feedback inputs.
+   * 
    * <p>
    * stepfunction: initialStream => (feedback, output)
    * <p>
-   * A common pattern is to use output splitting to create feedback and output DataStream.
-   * Please refer to the .split(...) method of the DataStream
-   * <p>
-   * By default a DataStream with iteration will never terminate, but the user
-   * can use the maxWaitTime parameter to set a max waiting time for the iteration head.
-   * If no data received in the set time the stream terminates.
-   * <p>
-   * By default the feedback partitioning is set to match the input, to override this set 
-   * the keepPartitioning flag to true
+   * The user must set the max waiting time for the iteration head.
+   * If no data received in the set time the stream terminates. If this parameter is set
+   * to 0 then the iteration sources will indefinitely, so the job must be killed to stop.
    *
    */
-  def iterate[R](maxWaitTimeMillis:Long = 0)
-                (stepFunction: DataStream[T] => (DataStream[T], DataStream[R]), 
-                    keepPartitioning: Boolean = false) : DataStream[R] = {
-    val iterativeStream = javaStream.iterate(maxWaitTimeMillis)
+  def iterate[R, F: TypeInformation: ClassTag](stepFunction: ConnectedDataStream[T, F] => 
+    (DataStream[F], DataStream[R]), maxWaitTimeMillis:Long): DataStream[R] = {
+    val feedbackType: TypeInformation[F] = implicitly[TypeInformation[F]]
+    val connectedIterativeStream = javaStream.iterate(maxWaitTimeMillis).
+                                   withFeedbackType(feedbackType)
 
-    val (feedback, output) = stepFunction(new DataStream[T](iterativeStream))
-    iterativeStream.closeWith(feedback.getJavaStream, keepPartitioning)
+    val (feedback, output) = stepFunction(connectedIterativeStream)
+    connectedIterativeStream.closeWith(feedback.getJavaStream)
     output
-  }
+  }  
 
   /**
    * Applies an aggregation that that gives the current maximum of the data stream at

http://git-wip-us.apache.org/repos/asf/flink/blob/128d3bcf/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index 7746bf5..c394228 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -415,6 +415,35 @@ class DataStreamTest {
     assert(globalPartitioner.isInstanceOf[GlobalPartitioner[_]])
   }
 
+  @Test
+  def testIterations {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val source = env.fromElements(1, 2, 3)
+
+    val iterated = source.iterate((input: ConnectedDataStream[Int, String]) => {
+      val head = input.map(i => (i + 1).toString, s => s)
+      (head.filter(_ == "2"), head.filter(_ != "2"))
+    }, 1000)
+
+    val iterated2 = source.iterate((input: DataStream[Int]) => 
+      (input.map(_ + 1), input.map(_.toString)), 2000)
+
+    try {
+      val invalid = source.iterate((input: ConnectedDataStream[Int, String]) => {
+        val head = input.partitionByHash(1, 1).map(i => (i + 1).toString, s => s)
+        (head.filter(_ == "2"), head.filter(_ != "2"))
+      }, 1000)
+      fail
+    } catch {
+      case uoe: UnsupportedOperationException =>
+      case e: Exception => fail
+    }
+
+    val sg = env.getStreamGraph
+
+    assert(sg.getStreamLoops().size() == 2)
+  }
+
   /////////////////////////////////////////////////////////////
   // Utilities
   /////////////////////////////////////////////////////////////