You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/06/09 21:05:20 UTC

flink git commit: [streaming] Optional iteration feedback partitioning added

Repository: flink
Updated Branches:
  refs/heads/master 31d3eaa74 -> 1481647d5


[streaming] Optional iteration feedback partitioning added

Closes #810


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1481647d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1481647d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1481647d

Branch: refs/heads/master
Commit: 1481647d5a23bf4be9ecef9c435d3926f776c46a
Parents: 31d3eaa
Author: Gyula Fora <gy...@apache.org>
Authored: Tue Jun 9 20:13:13 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Tue Jun 9 21:04:12 2015 +0200

----------------------------------------------------------------------
 docs/apis/streaming_guide.md                    |  6 ++-
 .../api/datastream/IterativeDataStream.java     | 32 +++++++++++-
 .../flink/streaming/api/graph/StreamGraph.java  | 25 +++++----
 .../apache/flink/streaming/api/IterateTest.java | 54 ++++++++++++++++----
 .../flink/streaming/api/scala/DataStream.scala  | 17 +++---
 5 files changed, 103 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1481647d/docs/apis/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md
index c365dc3..de6340e 100644
--- a/docs/apis/streaming_guide.md
+++ b/docs/apis/streaming_guide.md
@@ -1123,7 +1123,9 @@ output.map(…).project(…);
 In this case, all values passing the `isFeedback` filter will be fed back to the iteration head, and the values passing the `isOutput` filter will produce the output of the iteration that can be transformed further (here with a `map` and a `projection`) outside the iteration.
 
 Because iterative streaming programs do not have a set number of iterations for each data element, the streaming program has no information on the end of its input. As a consequence iterative streaming programs run until the user manually stops the program. While this is acceptable under normal circumstances, a method is provided to allow iterative programs to shut down automatically if no input is received by the iteration head for a predefined number of milliseconds.
-To use this functionality the user needs to add the maxWaitTimeMillis parameter to the `dataStream.iterate(…)` call to control the max wait time. 
+To use this functionality the user needs to add the maxWaitTimeMillis parameter to the `dataStream.iterate(…)` call to control the max wait time.
+
+By default the partitioning of the feedback stream will be automatically set to be the same as the input of the iteration head. To override this the user can set an optional boolean flag in the `closeWith` method. 
 </div>
 <div data-lang="scala" markdown="1">
 The Flink Streaming API supports implementing iterative stream processing dataflows similarly to the batch Flink API. Iterative streaming programs also implement a step function and embed it into an `IterativeDataStream`.
@@ -1143,6 +1145,8 @@ val iteratedStream = someDataStream.iterate(maxWaitTime) {
 
 Because iterative streaming programs do not have a set number of iterations for each data element, the streaming program has no information on the end of its input. As a consequence iterative streaming programs run until the user manually stops the program. While this is acceptable under normal circumstances a method is provided to allow iterative programs to shut down automatically if no input received by the iteration head for a predefined number of milliseconds.
 To use this functionality the user needs to add the maxWaitTimeMillis parameter to the `dataStream.iterate(…)` call to control the max wait time. 
+
+By default the partitioning of the feedback stream will be automatically set to be the same as the input of the iteration head. To override this the user can set an optional boolean flag in the `iterate` method. 
 </div>
 
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/1481647d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
index 581087d..6a48b6a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
@@ -46,13 +46,18 @@ public class IterativeDataStream<IN> extends
 	 * for more information.
 	 * 
 	 * 
+	 * 
+	 * 
 	 * @param iterationTail
 	 *            The data stream that is fed back to the next iteration head.
+	 * @param keepPartitioning
+	 *            If true the feedback partitioning will be kept as it is (not
+	 *            changed to match the input of the iteration head)
 	 * @return Returns the stream that was fed back to the iteration. In most
 	 *         cases no further transformation are applied on this stream.
 	 * 
 	 */
-	public DataStream<IN> closeWith(DataStream<IN> iterationTail) {
+	public DataStream<IN> closeWith(DataStream<IN> iterationTail, boolean keepPartitioning) {
 		DataStream<IN> iterationSink = new DataStreamSink<IN>(environment, "Iteration Sink", null,
 				null);
 
@@ -61,7 +66,30 @@ public class IterativeDataStream<IN> extends
 		streamGraph.addIterationTail(iterationSink.getId(), iterationTail.getId(), iterationID,
 				iterationWaitTime);
 
-		connectGraph(iterationTail.forward(), iterationSink.getId(), 0);
+		if (keepPartitioning) {
+			connectGraph(iterationTail, iterationSink.getId(), 0);
+		} else {
+			connectGraph(iterationTail.forward(), iterationSink.getId(), 0);
+		}
 		return iterationTail;
 	}
+	
+	/**
+	 * Closes the iteration. This method defines the end of the iterative
+	 * program part that will be fed back to the start of the iteration. </br>
+	 * </br>A common usage pattern for streaming iterations is to use output
+	 * splitting to send a part of the closing data stream to the head. Refer to
+	 * {@link DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector)}
+	 * for more information.
+	 * 
+	 * 
+	 * @param iterationTail
+	 *            The data stream that is fed back to the next iteration head.
+	 * @return Returns the stream that was fed back to the iteration. In most
+	 *         cases no further transformation are applied on this stream.
+	 * 
+	 */
+	public DataStream<IN> closeWith(DataStream<IN> iterationTail) {
+		return closeWith(iterationTail,false);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1481647d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index ade1c6e..d559ed3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -44,6 +44,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
@@ -189,23 +190,22 @@ public class StreamGraph extends StreamingPlan {
 		}
 	}
 
+	@SuppressWarnings("rawtypes")
 	public void addIterationHead(Integer sourceID, Integer iterationHead, Integer iterationID,
 			long timeOut) {
 
-		addNode(sourceID, StreamIterationHead.class, null, null);
+		StreamNode itSource = addNode(sourceID, StreamIterationHead.class, null, null);
 
 		StreamLoop iteration = new StreamLoop(iterationID, getStreamNode(sourceID), timeOut);
 		streamLoops.put(iterationID, iteration);
 		vertexIDtoLoop.put(sourceID, iteration);
 
 		setSerializersFrom(iterationHead, sourceID);
-		getStreamNode(sourceID).setOperatorName("IterationHead-" + iterationHead);
-
-		int outpartitionerIndex = getStreamNode(iterationHead).getInEdgeIndices().get(0);
-		StreamPartitioner<?> outputPartitioner = getStreamNode(outpartitionerIndex).getOutEdges()
-				.get(0).getPartitioner();
+		itSource.setOperatorName("IterationSource-" + sourceID);
+		itSource.setParallelism(getStreamNode(iterationHead).getParallelism());
+		
 
-		addEdge(sourceID, iterationHead, outputPartitioner, 0, new ArrayList<String>());
+		addEdge(sourceID, iterationHead, new RebalancePartitioner(true), 0, new ArrayList<String>());
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("ITERATION SOURCE: {}", sourceID);
@@ -221,19 +221,18 @@ public class StreamGraph extends StreamingPlan {
 			throw new RuntimeException("Buffer timeout 0 at iteration tail is not supported.");
 		}
 
-		addNode(sinkID, StreamIterationTail.class, null, null).setParallelism(
-				getStreamNode(iterationTail).getParallelism());
+		StreamNode itSink = addNode(sinkID, StreamIterationTail.class, null, null);
 
 		StreamLoop iteration = streamLoops.get(iterationID);
 		iteration.setSink(getStreamNode(sinkID));
 		vertexIDtoLoop.put(sinkID, iteration);
+		
+		itSink.setParallelism(iteration.getSource().getParallelism());
 
 		setSerializersFrom(iterationTail, sinkID);
-		getStreamNode(sinkID).setOperatorName("IterationTail-" + iterationTail);
+		getStreamNode(sinkID).setOperatorName("IterationSink-" + sinkID);
 
-		iteration.getSource().setParallelism(iteration.getSink().getParallelism());
-		setBufferTimeout(iteration.getSource().getId(), getStreamNode(iterationTail)
-				.getBufferTimeout());
+		setBufferTimeout(iteration.getSource().getId(), getStreamNode(iterationTail).getBufferTimeout());
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("ITERATION SINK: {}", sinkID);

http://git-wip-us.apache.org/repos/asf/flink/blob/1481647d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index 3e8a2c8..ebc4c93 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -31,6 +31,12 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.IterativeDataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamGraph.StreamLoop;
+import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
+import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
@@ -41,7 +47,7 @@ public class IterateTest {
 	private static boolean iterated[];
 	private static int PARALLELISM = 2;
 
-	public static final class IterationHead extends RichFlatMapFunction<Boolean,Boolean> {
+	public static final class IterationHead extends RichFlatMapFunction<Boolean, Boolean> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -58,7 +64,7 @@ public class IterateTest {
 
 	}
 
-	public static final class IterationTail extends RichFlatMapFunction<Boolean,Boolean> {
+	public static final class IterationTail extends RichFlatMapFunction<Boolean, Boolean> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -78,8 +84,8 @@ public class IterateTest {
 		public void invoke(Boolean tuple) {
 		}
 	}
-	
-	public static final class NoOpMap implements MapFunction<Boolean, Boolean>{
+
+	public static final class NoOpMap implements MapFunction<Boolean, Boolean> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -87,10 +93,10 @@ public class IterateTest {
 		public Boolean map(Boolean value) throws Exception {
 			return value;
 		}
-		
+
 	}
 
-	public StreamExecutionEnvironment constructIterativeJob(StreamExecutionEnvironment env){
+	public StreamExecutionEnvironment constructIterativeJob(StreamExecutionEnvironment env) {
 		env.setBufferTimeout(10);
 
 		DataStream<Boolean> source = env.fromCollection(Collections.nCopies(PARALLELISM, false));
@@ -103,7 +109,7 @@ public class IterateTest {
 		iteration.closeWith(increment).addSink(new MySink());
 		return env;
 	}
-	
+
 	@Test
 	public void testColocation() throws Exception {
 		StreamExecutionEnvironment env = new TestStreamEnvironment(4, MEMORYSIZE);
@@ -123,9 +129,9 @@ public class IterateTest {
 		AbstractJobVertex tailOp = null;
 
 		for (AbstractJobVertex vertex : graph.getVertices()) {
-			if (vertex.getName().contains("IterationHead")) {
+			if (vertex.getName().contains("IterationSource")) {
 				itSource = vertex;
-			} else if (vertex.getName().contains("IterationTail")) {
+			} else if (vertex.getName().contains("IterationSink")) {
 				itSink = vertex;
 			} else if (vertex.getName().contains("HeadOperator")) {
 				headOp = vertex;
@@ -141,6 +147,36 @@ public class IterateTest {
 		assertEquals(itSource.getParallelism(), itSink.getParallelism());
 	}
 
+	@SuppressWarnings("unchecked")
+	@Test
+	public void testPartitioning() throws Exception {
+		StreamExecutionEnvironment env = new TestStreamEnvironment(4, MEMORYSIZE);
+
+		IterativeDataStream<Boolean> it = env.fromElements(true).iterate();
+
+		IterativeDataStream<Boolean> it2 = env.fromElements(true).iterate();
+
+		DataStream<Boolean> head = it.map(new NoOpMap()).name("Head1").broadcast();
+		DataStream<Boolean> head2 = it2.map(new NoOpMap()).name("Head2").broadcast();
+
+		it.closeWith(head.union(head.map(new NoOpMap()).shuffle()), true);
+		it2.closeWith(head2, false);
+
+		System.out.println(env.getExecutionPlan());
+		StreamGraph graph = env.getStreamGraph();
+
+		for (StreamLoop loop : graph.getStreamLoops()) {
+			StreamEdge tailToSink = loop.getSink().getInEdges().get(0);
+			if (tailToSink.getSourceVertex().getOperatorName().contains("Head1")) {
+				assertTrue(tailToSink.getPartitioner() instanceof BroadcastPartitioner);
+				assertTrue(loop.getSink().getInEdges().get(1).getPartitioner() instanceof ShufflePartitioner);
+			} else {
+				assertTrue(tailToSink.getPartitioner() instanceof RebalancePartitioner);
+			}
+		}
+
+	}
+
 	@Test
 	public void test() throws Exception {
 		StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);

http://git-wip-us.apache.org/repos/asf/flink/blob/1481647d/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 20b937e..5fae85f 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -317,12 +317,14 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * By default a DataStream with iteration will never terminate, but the user
    * can use the maxWaitTime parameter to set a max waiting time for the iteration head.
    * If no data received in the set time the stream terminates.
-   *
+   * <p>
+   * By default the feedback partitioning is set to match the input, to override this set 
+   * the keepPartitioning flag to true
    *
    */
-  def iterate[R](stepFunction: DataStream[T] => (DataStream[T], DataStream[R])): DataStream[R] = {
+  def iterate[R](stepFunction: DataStream[T] => (DataStream[T], DataStream[R])): DataStream[R] =
     iterate(0)(stepFunction)
-  }
+  
 
   /**
    * Initiates an iterative part of the program that creates a loop by feeding
@@ -339,15 +341,18 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * By default a DataStream with iteration will never terminate, but the user
    * can use the maxWaitTime parameter to set a max waiting time for the iteration head.
    * If no data received in the set time the stream terminates.
-   *
+   * <p>
+   * By default the feedback partitioning is set to match the input, to override this set 
+   * the keepPartitioning flag to true
    *
    */
   def iterate[R](maxWaitTimeMillis:Long = 0)
-                (stepFunction: DataStream[T] => (DataStream[T], DataStream[R])) : DataStream[R] = {
+                (stepFunction: DataStream[T] => (DataStream[T], DataStream[R]), 
+                    keepPartitioning: Boolean = false) : DataStream[R] = {
     val iterativeStream = javaStream.iterate(maxWaitTimeMillis)
 
     val (feedback, output) = stepFunction(new DataStream[T](iterativeStream))
-    iterativeStream.closeWith(feedback.getJavaStream)
+    iterativeStream.closeWith(feedback.getJavaStream, keepPartitioning)
     output
   }