You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/05/26 09:36:41 UTC
flink git commit: [streaming] Fix iteration colocation + enable
chaining for iterative jobs
Repository: flink
Updated Branches:
refs/heads/master 2a65b6221 -> 4de2353f3
[streaming] Fix iteration colocation + enable chaining for iterative jobs
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4de2353f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4de2353f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4de2353f
Branch: refs/heads/master
Commit: 4de2353f312a559802b319a92b09492818f309d0
Parents: 2a65b62
Author: Gyula Fora <gy...@apache.org>
Authored: Tue May 26 08:49:02 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Tue May 26 09:33:31 2015 +0200
----------------------------------------------------------------------
.../flink/streaming/api/graph/StreamGraph.java | 62 ++++++++++----------
.../flink/streaming/api/graph/StreamNode.java | 5 ++
.../api/graph/StreamingJobGraphGenerator.java | 5 +-
.../apache/flink/streaming/api/IterateTest.java | 10 ++--
4 files changed, 43 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4de2353f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index ffc7032..009366c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -46,11 +46,11 @@ import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
+import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
import org.apache.sling.commons.json.JSONException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -189,56 +189,54 @@ public class StreamGraph extends StreamingPlan {
}
}
- public void addIterationHead(Integer vertexID, Integer iterationHead, Integer iterationID,
+ public void addIterationHead(Integer sourceID, Integer iterationHead, Integer iterationID,
long timeOut) {
- addNode(vertexID, StreamIterationHead.class, null, null);
-
- chaining = false;
+ addNode(sourceID, StreamIterationHead.class, null, null);
- StreamLoop iteration = new StreamLoop(iterationID, getStreamNode(vertexID), timeOut);
+ StreamLoop iteration = new StreamLoop(iterationID, getStreamNode(sourceID), timeOut);
streamLoops.put(iterationID, iteration);
- vertexIDtoLoop.put(vertexID, iteration);
+ vertexIDtoLoop.put(sourceID, iteration);
- setSerializersFrom(iterationHead, vertexID);
- getStreamNode(vertexID).setOperatorName("IterationHead-" + iterationHead);
+ setSerializersFrom(iterationHead, sourceID);
+ getStreamNode(sourceID).setOperatorName("IterationHead-" + iterationHead);
int outpartitionerIndex = getStreamNode(iterationHead).getInEdgeIndices().get(0);
StreamPartitioner<?> outputPartitioner = getStreamNode(outpartitionerIndex).getOutEdges()
.get(0).getPartitioner();
- addEdge(vertexID, iterationHead, outputPartitioner, 0, new ArrayList<String>());
+ addEdge(sourceID, iterationHead, outputPartitioner, 0, new ArrayList<String>());
if (LOG.isDebugEnabled()) {
- LOG.debug("ITERATION SOURCE: {}", vertexID);
+ LOG.debug("ITERATION SOURCE: {}", sourceID);
}
- sources.add(vertexID);
+ sources.add(sourceID);
}
- public void addIterationTail(Integer vertexID, Integer iterationTail, Integer iterationID,
+ public void addIterationTail(Integer sinkID, Integer iterationTail, Integer iterationID,
long waitTime) {
if (getStreamNode(iterationTail).getBufferTimeout() == 0) {
throw new RuntimeException("Buffer timeout 0 at iteration tail is not supported.");
}
- addNode(vertexID, StreamIterationTail.class, null, null).setParallelism(
+ addNode(sinkID, StreamIterationTail.class, null, null).setParallelism(
getStreamNode(iterationTail).getParallelism());
StreamLoop iteration = streamLoops.get(iterationID);
- iteration.setTail(getStreamNode(iterationTail));
- vertexIDtoLoop.put(vertexID, iteration);
+ iteration.setSink(getStreamNode(sinkID));
+ vertexIDtoLoop.put(sinkID, iteration);
- setSerializersFrom(iterationTail, vertexID);
- getStreamNode(vertexID).setOperatorName("IterationTail-" + iterationTail);
+ setSerializersFrom(iterationTail, sinkID);
+ getStreamNode(sinkID).setOperatorName("IterationTail-" + iterationTail);
- setParallelism(iteration.getHead().getID(), getStreamNode(iterationTail).getParallelism());
- setBufferTimeout(iteration.getHead().getID(), getStreamNode(iterationTail)
+ iteration.getSource().setParallelism(iteration.getSink().getParallelism());
+ setBufferTimeout(iteration.getSource().getID(), getStreamNode(iterationTail)
.getBufferTimeout());
if (LOG.isDebugEnabled()) {
- LOG.debug("ITERATION SINK: {}", vertexID);
+ LOG.debug("ITERATION SINK: {}", sinkID);
}
}
@@ -468,14 +466,14 @@ public class StreamGraph extends StreamingPlan {
private Integer loopID;
- private StreamNode head;
- private StreamNode tail;
-
+ private StreamNode source;
+ private StreamNode sink;
+
private Long timeout;
- public StreamLoop(Integer loopID, StreamNode head, Long timeout) {
+ public StreamLoop(Integer loopID, StreamNode source, Long timeout) {
this.loopID = loopID;
- this.head = head;
+ this.source = source;
this.timeout = timeout;
}
@@ -487,16 +485,16 @@ public class StreamGraph extends StreamingPlan {
return timeout;
}
- public void setTail(StreamNode tail) {
- this.tail = tail;
+ public void setSink(StreamNode sink) {
+ this.sink = sink;
}
- public StreamNode getHead() {
- return head;
+ public StreamNode getSource() {
+ return source;
}
- public StreamNode getTail() {
- return tail;
+ public StreamNode getSink() {
+ return sink;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4de2353f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index 147ed97..adb07a8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -213,5 +213,10 @@ public class StreamNode implements Serializable {
public void isolateSlot() {
isolatedSlot = true;
}
+
+ @Override
+ public String toString() {
+ return operatorName + ID;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4de2353f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index d16ee58..bc1c984 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -323,6 +323,7 @@ public class StreamingJobGraphGenerator {
return downStreamVertex.getInEdges().size() == 1
&& outOperator != null
+ && headOperator != null
&& upStreamVertex.getSlotSharingID() == downStreamVertex.getSlotSharingID()
&& upStreamVertex.getSlotSharingID() != -1
&& (outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS ||
@@ -357,8 +358,8 @@ public class StreamingJobGraphGenerator {
for (StreamLoop loop : streamGraph.getStreamLoops()) {
CoLocationGroup ccg = new CoLocationGroup();
- AbstractJobVertex tail = jobVertices.get(loop.getTail().getID());
- AbstractJobVertex head = jobVertices.get(loop.getHead().getID());
+ AbstractJobVertex tail = jobVertices.get(loop.getSink().getID());
+ AbstractJobVertex head = jobVertices.get(loop.getSource().getID());
ccg.addVertex(head);
ccg.addVertex(tail);
http://git-wip-us.apache.org/repos/asf/flink/blob/4de2353f/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index 39c3f31..cc59a31 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -17,6 +17,11 @@
package org.apache.flink.streaming.api;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Collections;
+
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.IterativeDataStream;
@@ -26,11 +31,6 @@ import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.util.Collector;
import org.junit.Test;
-import java.util.Collections;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
public class IterateTest {
private static final long MEMORYSIZE = 32;