You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/05/26 09:36:41 UTC

flink git commit: [streaming] Fix iteration colocation + enable chaining for iterative jobs

Repository: flink
Updated Branches:
  refs/heads/master 2a65b6221 -> 4de2353f3


[streaming] Fix iteration colocation + enable chaining for iterative jobs


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4de2353f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4de2353f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4de2353f

Branch: refs/heads/master
Commit: 4de2353f312a559802b319a92b09492818f309d0
Parents: 2a65b62
Author: Gyula Fora <gy...@apache.org>
Authored: Tue May 26 08:49:02 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Tue May 26 09:33:31 2015 +0200

----------------------------------------------------------------------
 .../flink/streaming/api/graph/StreamGraph.java  | 62 ++++++++++----------
 .../flink/streaming/api/graph/StreamNode.java   |  5 ++
 .../api/graph/StreamingJobGraphGenerator.java   |  5 +-
 .../apache/flink/streaming/api/IterateTest.java | 10 ++--
 4 files changed, 43 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4de2353f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index ffc7032..009366c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -46,11 +46,11 @@ import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
 import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
 import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
+import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
 import org.apache.sling.commons.json.JSONException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -189,56 +189,54 @@ public class StreamGraph extends StreamingPlan {
 		}
 	}
 
-	public void addIterationHead(Integer vertexID, Integer iterationHead, Integer iterationID,
+	public void addIterationHead(Integer sourceID, Integer iterationHead, Integer iterationID,
 			long timeOut) {
 
-		addNode(vertexID, StreamIterationHead.class, null, null);
-
-		chaining = false;
+		addNode(sourceID, StreamIterationHead.class, null, null);
 
-		StreamLoop iteration = new StreamLoop(iterationID, getStreamNode(vertexID), timeOut);
+		StreamLoop iteration = new StreamLoop(iterationID, getStreamNode(sourceID), timeOut);
 		streamLoops.put(iterationID, iteration);
-		vertexIDtoLoop.put(vertexID, iteration);
+		vertexIDtoLoop.put(sourceID, iteration);
 
-		setSerializersFrom(iterationHead, vertexID);
-		getStreamNode(vertexID).setOperatorName("IterationHead-" + iterationHead);
+		setSerializersFrom(iterationHead, sourceID);
+		getStreamNode(sourceID).setOperatorName("IterationHead-" + iterationHead);
 
 		int outpartitionerIndex = getStreamNode(iterationHead).getInEdgeIndices().get(0);
 		StreamPartitioner<?> outputPartitioner = getStreamNode(outpartitionerIndex).getOutEdges()
 				.get(0).getPartitioner();
 
-		addEdge(vertexID, iterationHead, outputPartitioner, 0, new ArrayList<String>());
+		addEdge(sourceID, iterationHead, outputPartitioner, 0, new ArrayList<String>());
 
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("ITERATION SOURCE: {}", vertexID);
+			LOG.debug("ITERATION SOURCE: {}", sourceID);
 		}
 
-		sources.add(vertexID);
+		sources.add(sourceID);
 	}
 
-	public void addIterationTail(Integer vertexID, Integer iterationTail, Integer iterationID,
+	public void addIterationTail(Integer sinkID, Integer iterationTail, Integer iterationID,
 			long waitTime) {
 
 		if (getStreamNode(iterationTail).getBufferTimeout() == 0) {
 			throw new RuntimeException("Buffer timeout 0 at iteration tail is not supported.");
 		}
 
-		addNode(vertexID, StreamIterationTail.class, null, null).setParallelism(
+		addNode(sinkID, StreamIterationTail.class, null, null).setParallelism(
 				getStreamNode(iterationTail).getParallelism());
 
 		StreamLoop iteration = streamLoops.get(iterationID);
-		iteration.setTail(getStreamNode(iterationTail));
-		vertexIDtoLoop.put(vertexID, iteration);
+		iteration.setSink(getStreamNode(sinkID));
+		vertexIDtoLoop.put(sinkID, iteration);
 
-		setSerializersFrom(iterationTail, vertexID);
-		getStreamNode(vertexID).setOperatorName("IterationTail-" + iterationTail);
+		setSerializersFrom(iterationTail, sinkID);
+		getStreamNode(sinkID).setOperatorName("IterationTail-" + iterationTail);
 
-		setParallelism(iteration.getHead().getID(), getStreamNode(iterationTail).getParallelism());
-		setBufferTimeout(iteration.getHead().getID(), getStreamNode(iterationTail)
+		iteration.getSource().setParallelism(iteration.getSink().getParallelism());
+		setBufferTimeout(iteration.getSource().getID(), getStreamNode(iterationTail)
 				.getBufferTimeout());
 
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("ITERATION SINK: {}", vertexID);
+			LOG.debug("ITERATION SINK: {}", sinkID);
 		}
 
 	}
@@ -468,14 +466,14 @@ public class StreamGraph extends StreamingPlan {
 
 		private Integer loopID;
 
-		private StreamNode head;
-		private StreamNode tail;
-
+		private StreamNode source;
+		private StreamNode sink;
+		
 		private Long timeout;
 
-		public StreamLoop(Integer loopID, StreamNode head, Long timeout) {
+		public StreamLoop(Integer loopID, StreamNode source, Long timeout) {
 			this.loopID = loopID;
-			this.head = head;
+			this.source = source;
 			this.timeout = timeout;
 		}
 
@@ -487,16 +485,16 @@ public class StreamGraph extends StreamingPlan {
 			return timeout;
 		}
 
-		public void setTail(StreamNode tail) {
-			this.tail = tail;
+		public void setSink(StreamNode sink) {
+			this.sink = sink;
 		}
 
-		public StreamNode getHead() {
-			return head;
+		public StreamNode getSource() {
+			return source;
 		}
 
-		public StreamNode getTail() {
-			return tail;
+		public StreamNode getSink() {
+			return sink;
 		}
 
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/4de2353f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index 147ed97..adb07a8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -213,5 +213,10 @@ public class StreamNode implements Serializable {
 	public void isolateSlot() {
 		isolatedSlot = true;
 	}
+	
+	@Override
+	public String toString() {
+		return operatorName + ID;
+	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4de2353f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index d16ee58..bc1c984 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -323,6 +323,7 @@ public class StreamingJobGraphGenerator {
 
 		return downStreamVertex.getInEdges().size() == 1
 				&& outOperator != null
+				&& headOperator != null
 				&& upStreamVertex.getSlotSharingID() == downStreamVertex.getSlotSharingID()
 				&& upStreamVertex.getSlotSharingID() != -1
 				&& (outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS ||
@@ -357,8 +358,8 @@ public class StreamingJobGraphGenerator {
 
 		for (StreamLoop loop : streamGraph.getStreamLoops()) {
 			CoLocationGroup ccg = new CoLocationGroup();
-			AbstractJobVertex tail = jobVertices.get(loop.getTail().getID());
-			AbstractJobVertex head = jobVertices.get(loop.getHead().getID());
+			AbstractJobVertex tail = jobVertices.get(loop.getSink().getID());
+			AbstractJobVertex head = jobVertices.get(loop.getSource().getID());
 
 			ccg.addVertex(head);
 			ccg.addVertex(tail);

http://git-wip-us.apache.org/repos/asf/flink/blob/4de2353f/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index 39c3f31..cc59a31 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -17,6 +17,11 @@
 
 package org.apache.flink.streaming.api;
 
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Collections;
+
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.IterativeDataStream;
@@ -26,11 +31,6 @@ import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
-import java.util.Collections;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 public class IterateTest {
 
 	private static final long MEMORYSIZE = 32;