You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/07/11 15:21:03 UTC
flink git commit: [FLINK-2335] [streaming] Lazy iteration
construction in StreamGraph
Repository: flink
Updated Branches:
refs/heads/master ea4f339d7 -> 3b69b2499
[FLINK-2335] [streaming] Lazy iteration construction in StreamGraph
Closes #900
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3b69b249
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3b69b249
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3b69b249
Branch: refs/heads/master
Commit: 3b69b249991c23995dddc3b5182415f5c7df332a
Parents: ea4f339
Author: Gyula Fora <gy...@apache.org>
Authored: Fri Jul 10 20:03:22 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Sat Jul 11 14:23:59 2015 +0200
----------------------------------------------------------------------
.../streaming/api/datastream/DataStream.java | 48 +-
.../api/datastream/IterativeDataStream.java | 43 +-
.../api/datastream/SplitDataStream.java | 4 +-
.../flink/streaming/api/graph/StreamConfig.java | 8 +-
.../flink/streaming/api/graph/StreamEdge.java | 6 +-
.../flink/streaming/api/graph/StreamGraph.java | 275 ++++++----
.../flink/streaming/api/graph/StreamLoop.java | 122 +++++
.../api/graph/StreamingJobGraphGenerator.java | 27 +-
.../partitioner/RebalancePartitioner.java | 5 +
.../runtime/partitioner/StreamPartitioner.java | 5 +
.../runtime/tasks/StreamIterationHead.java | 4 +-
.../runtime/tasks/StreamIterationTail.java | 4 +-
.../apache/flink/streaming/api/IterateTest.java | 519 ++++++++++++++-----
.../flink/streaming/api/scala/DataStream.scala | 4 +
14 files changed, 804 insertions(+), 270 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index c9c1f49..7896169 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -95,11 +95,11 @@ public class DataStream<OUT> {
protected final StreamExecutionEnvironment environment;
protected final Integer id;
protected int parallelism;
- protected List<String> userDefinedNames;
+ protected List<String> selectedNames;
protected StreamPartitioner<OUT> partitioner;
@SuppressWarnings("rawtypes")
protected TypeInformation typeInfo;
- protected List<DataStream<OUT>> unionizedStreams;
+ protected List<DataStream<OUT>> unionedStreams;
protected Integer iterationID = null;
protected Long iterationWaitTime = null;
@@ -126,11 +126,11 @@ public class DataStream<OUT> {
this.environment = environment;
this.parallelism = environment.getParallelism();
this.streamGraph = environment.getStreamGraph();
- this.userDefinedNames = new ArrayList<String>();
+ this.selectedNames = new ArrayList<String>();
this.partitioner = new RebalancePartitioner<OUT>(true);
this.typeInfo = typeInfo;
- this.unionizedStreams = new ArrayList<DataStream<OUT>>();
- this.unionizedStreams.add(this);
+ this.unionedStreams = new ArrayList<DataStream<OUT>>();
+ this.unionedStreams.add(this);
}
/**
@@ -143,17 +143,17 @@ public class DataStream<OUT> {
this.environment = dataStream.environment;
this.id = dataStream.id;
this.parallelism = dataStream.parallelism;
- this.userDefinedNames = new ArrayList<String>(dataStream.userDefinedNames);
+ this.selectedNames = new ArrayList<String>(dataStream.selectedNames);
this.partitioner = dataStream.partitioner.copy();
this.streamGraph = dataStream.streamGraph;
this.typeInfo = dataStream.typeInfo;
this.iterationID = dataStream.iterationID;
this.iterationWaitTime = dataStream.iterationWaitTime;
- this.unionizedStreams = new ArrayList<DataStream<OUT>>();
- this.unionizedStreams.add(this);
- if (dataStream.unionizedStreams.size() > 1) {
- for (int i = 1; i < dataStream.unionizedStreams.size(); i++) {
- this.unionizedStreams.add(new DataStream<OUT>(dataStream.unionizedStreams.get(i)));
+ this.unionedStreams = new ArrayList<DataStream<OUT>>();
+ this.unionedStreams.add(this);
+ if (dataStream.unionedStreams.size() > 1) {
+ for (int i = 1; i < dataStream.unionedStreams.size(); i++) {
+ this.unionedStreams.add(new DataStream<OUT>(dataStream.unionedStreams.get(i)));
}
}
@@ -176,6 +176,14 @@ public class DataStream<OUT> {
public int getParallelism() {
return this.parallelism;
}
+
+ public StreamPartitioner<OUT> getPartitioner() {
+ return this.partitioner;
+ }
+
+ public List<String> getSelectedNames(){
+ return selectedNames;
+ }
/**
* Gets the type of the stream.
@@ -248,9 +256,9 @@ public class DataStream<OUT> {
DataStream<OUT> returnStream = this.copy();
for (DataStream<OUT> stream : streams) {
- for (DataStream<OUT> ds : stream.unionizedStreams) {
+ for (DataStream<OUT> ds : stream.unionedStreams) {
validateUnion(ds.getId());
- returnStream.unionizedStreams.add(ds.copy());
+ returnStream.unionedStreams.add(ds.copy());
}
}
return returnStream;
@@ -268,7 +276,7 @@ public class DataStream<OUT> {
* @return The {@link SplitDataStream}
*/
public SplitDataStream<OUT> split(OutputSelector<OUT> outputSelector) {
- for (DataStream<OUT> ds : this.unionizedStreams) {
+ for (DataStream<OUT> ds : this.unionedStreams) {
streamGraph.addOutputSelector(ds.getId(), clean(outputSelector));
}
@@ -1103,9 +1111,7 @@ public class DataStream<OUT> {
}
protected <X> void addIterationSource(DataStream<X> dataStream, TypeInformation<?> feedbackType) {
- Integer id = ++counter;
- streamGraph.addIterationHead(id, dataStream.getId(), iterationID, iterationWaitTime, feedbackType);
- streamGraph.setParallelism(id, dataStream.getParallelism());
+ streamGraph.addIterationHead(dataStream.getId(), iterationID, iterationWaitTime, feedbackType);
}
/**
@@ -1118,7 +1124,7 @@ public class DataStream<OUT> {
protected DataStream<OUT> setConnectionType(StreamPartitioner<OUT> partitioner) {
DataStream<OUT> returnStream = this.copy();
- for (DataStream<OUT> stream : returnStream.unionizedStreams) {
+ for (DataStream<OUT> stream : returnStream.unionedStreams) {
stream.partitioner = partitioner;
}
@@ -1139,9 +1145,9 @@ public class DataStream<OUT> {
* Number of the type (used at co-functions)
*/
protected <X> void connectGraph(DataStream<X> inputStream, Integer outputID, int typeNumber) {
- for (DataStream<X> stream : inputStream.unionizedStreams) {
+ for (DataStream<X> stream : inputStream.unionedStreams) {
streamGraph.addEdge(stream.getId(), outputID, stream.partitioner, typeNumber,
- inputStream.userDefinedNames);
+ inputStream.selectedNames);
}
}
@@ -1170,7 +1176,7 @@ public class DataStream<OUT> {
}
private void validateUnion(Integer id) {
- for (DataStream<OUT> ds : this.unionizedStreams) {
+ for (DataStream<OUT> ds : this.unionedStreams) {
if (ds.getId().equals(id)) {
throw new RuntimeException("A DataStream cannot be merged with itself");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
index da3d885..4de368c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
@@ -17,6 +17,8 @@
package org.apache.flink.streaming.api.datastream;
+import java.util.List;
+
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -32,6 +34,8 @@ import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
*/
public class IterativeDataStream<IN> extends
SingleOutputStreamOperator<IN, IterativeDataStream<IN>> {
+
+ protected boolean closed = false;
static Integer iterationCount = 0;
@@ -60,20 +64,18 @@ public class IterativeDataStream<IN> extends
* @return The feedback stream.
*
*/
+ @SuppressWarnings({ "unchecked", "rawtypes" })
public DataStream<IN> closeWith(DataStream<IN> iterationTail, boolean keepPartitioning) {
- DataStream<IN> iterationSink = new DataStreamSink<IN>(environment, "Iteration Sink", null,
- null);
-
- // We add an iteration sink to the tail which will send tuples to the
- // iteration head
- streamGraph.addIterationTail(iterationSink.getId(), iterationTail.getId(), iterationID,
- iterationWaitTime);
-
- if (keepPartitioning) {
- connectGraph(iterationTail, iterationSink.getId(), 0);
- } else {
- connectGraph(iterationTail.forward(), iterationSink.getId(), 0);
+
+ if (closed) {
+ throw new IllegalStateException(
+ "An iterative data stream can only be closed once. Use union to close with multiple stream.");
}
+ closed = true;
+
+ streamGraph.addIterationTail((List) iterationTail.unionedStreams, iterationID,
+ keepPartitioning);
+
return iterationTail;
}
@@ -138,7 +140,8 @@ public class IterativeDataStream<IN> extends
* @return A {@link ConnectedIterativeDataStream}.
*/
public <F> ConnectedIterativeDataStream<IN, F> withFeedbackType(TypeInformation<F> feedbackType) {
- return new ConnectedIterativeDataStream<IN, F>(this, feedbackType);
+ return new ConnectedIterativeDataStream<IN, F>(new IterativeDataStream<IN>(this,
+ iterationWaitTime), feedbackType);
}
/**
@@ -201,14 +204,16 @@ public class IterativeDataStream<IN> extends
* @return The feedback stream.
*
*/
+ @SuppressWarnings({ "rawtypes", "unchecked" })
public DataStream<F> closeWith(DataStream<F> feedbackStream) {
- DataStream<F> iterationSink = new DataStreamSink<F>(input.environment, "Iteration Sink",
- null, null);
+ if (input.closed) {
+ throw new IllegalStateException(
+ "An iterative data stream can only be closed once. Use union to close with multiple stream.");
+ }
+ input.closed = true;
- input.streamGraph.addIterationTail(iterationSink.getId(), feedbackStream.getId(), input.iterationID,
- input.iterationWaitTime);
-
- input.connectGraph(feedbackStream, iterationSink.getId(), 0);
+ input.streamGraph.addIterationTail((List) feedbackStream.unionedStreams,
+ input.iterationID, true);
return feedbackStream;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
index 36a94c7..6b95fe7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
@@ -57,8 +57,8 @@ public class SplitDataStream<OUT> extends DataStream<OUT> {
DataStream<OUT> returnStream = copy();
- for (DataStream<OUT> ds : returnStream.unionizedStreams) {
- ds.userDefinedNames = Arrays.asList(outputNames);
+ for (DataStream<OUT> ds : returnStream.unionedStreams) {
+ ds.selectedNames = Arrays.asList(outputNames);
}
return returnStream;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 0784582..6a44104 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -206,12 +206,12 @@ public class StreamConfig implements Serializable {
}
}
- public void setIterationId(Integer iterationId) {
- config.setInteger(ITERATION_ID, iterationId);
+ public void setIterationId(String iterationId) {
+ config.setString(ITERATION_ID, iterationId);
}
- public Integer getIterationId() {
- return config.getInteger(ITERATION_ID, 0);
+ public String getIterationId() {
+ return config.getString(ITERATION_ID, "");
}
public void setIterationWaitTime(long time) {
http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
index 293f5e0..47d97df 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
@@ -46,7 +46,7 @@ public class StreamEdge implements Serializable {
* output selection).
*/
final private List<String> selectedNames;
- final private StreamPartitioner<?> outputPartitioner;
+ private StreamPartitioner<?> outputPartitioner;
public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int typeNumber,
List<String> selectedNames, StreamPartitioner<?> outputPartitioner) {
@@ -87,6 +87,10 @@ public class StreamEdge implements Serializable {
public StreamPartitioner<?> getPartitioner() {
return outputPartitioner;
}
+
+ public void setPartitioner(StreamPartitioner<?> partitioner) {
+ this.outputPartitioner = partitioner;
+ }
@Override
public int hashCode() {
http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index cae24be..64c349e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -28,6 +28,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import org.apache.flink.api.common.ExecutionConfig;
@@ -41,6 +42,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.state.StateHandleProvider;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
@@ -79,6 +81,7 @@ public class StreamGraph extends StreamingPlan {
private Map<Integer, StreamLoop> streamLoops;
protected Map<Integer, StreamLoop> vertexIDtoLoop;
+ protected Map<Integer, String> vertexIDtoBrokerID;
private StateHandleProvider<?> stateHandleProvider;
private boolean forceCheckpoint = false;
@@ -97,7 +100,8 @@ public class StreamGraph extends StreamingPlan {
public void clear() {
streamNodes = new HashMap<Integer, StreamNode>();
streamLoops = new HashMap<Integer, StreamLoop>();
- vertexIDtoLoop = new HashMap<Integer, StreamGraph.StreamLoop>();
+ vertexIDtoLoop = new HashMap<Integer, StreamLoop>();
+ vertexIDtoBrokerID = new HashMap<Integer, String>();
sources = new HashSet<Integer>();
}
@@ -120,9 +124,9 @@ public class StreamGraph extends StreamingPlan {
public void setCheckpointingInterval(long checkpointingInterval) {
this.checkpointingInterval = checkpointingInterval;
}
-
+
public void forceCheckpoint() {
- this.forceCheckpoint = true;
+ this.forceCheckpoint = true;
}
public void setStateHandleProvider(StateHandleProvider<?> provider) {
@@ -179,8 +183,9 @@ public class StreamGraph extends StreamingPlan {
}
public <IN1, IN2, OUT> void addCoOperator(Integer vertexID,
- TwoInputStreamOperator<IN1, IN2, OUT> taskoperatorObject, TypeInformation<IN1> in1TypeInfo,
- TypeInformation<IN2> in2TypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) {
+ TwoInputStreamOperator<IN1, IN2, OUT> taskoperatorObject,
+ TypeInformation<IN1> in1TypeInfo, TypeInformation<IN2> in2TypeInfo,
+ TypeInformation<OUT> outTypeInfo, String operatorName) {
addNode(vertexID, TwoInputStreamTask.class, taskoperatorObject, operatorName);
@@ -196,59 +201,192 @@ public class StreamGraph extends StreamingPlan {
}
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public void addIterationHead(Integer sourceID, Integer iterationHead, Integer iterationID,
- long timeOut, TypeInformation<?> feedbackType) {
+ public void addIterationHead(Integer iterationHead, Integer iterationID, long timeOut,
+ TypeInformation<?> feedbackType) {
+ // If there is no loop object created for this iteration create one
+ StreamLoop loop = streamLoops.get(iterationID);
+ if (loop == null) {
+ loop = new StreamLoop(iterationID, timeOut, feedbackType);
+ streamLoops.put(iterationID, loop);
+ }
- StreamNode itSource = addNode(sourceID, StreamIterationHead.class, null, null);
+ loop.addHeadOperator(getStreamNode(iterationHead));
+ }
- StreamLoop iteration = new StreamLoop(iterationID, getStreamNode(sourceID), timeOut);
- streamLoops.put(iterationID, iteration);
- vertexIDtoLoop.put(sourceID, iteration);
+ public void addIterationTail(List<DataStream<?>> feedbackStreams, Integer iterationID,
+ boolean keepPartitioning) {
- itSource.setOperatorName("IterationSource-" + sourceID);
- itSource.setParallelism(getStreamNode(iterationHead).getParallelism());
-
- if(feedbackType == null){
- setSerializersFrom(iterationHead, sourceID);
- addEdge(sourceID, iterationHead, new RebalancePartitioner(true), 0, new ArrayList<String>());
- }else{
- itSource.setSerializerOut(new StreamRecordSerializer(feedbackType, executionConfig));
- addEdge(sourceID, iterationHead, new RebalancePartitioner(true), 2, new ArrayList<String>());
+ if (!streamLoops.containsKey(iterationID)) {
+ throw new RuntimeException("Cannot close iteration without head operator.");
}
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("ITERATION SOURCE: {}", sourceID);
+ StreamLoop loop = streamLoops.get(iterationID);
+
+ for (DataStream<?> stream : feedbackStreams) {
+ loop.addTailOperator(getStreamNode(stream.getId()), stream.getPartitioner(),
+ stream.getSelectedNames());
}
- sources.add(sourceID);
+ if (keepPartitioning) {
+ loop.applyTailPartitioning();
+ }
}
- public void addIterationTail(Integer sinkID, Integer iterationTail, Integer iterationID,
- long waitTime) {
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public void finalizeLoops() {
+
+ // We create each loop separately, the order does not matter as sinks
+ // and sources don't interact
+ for (StreamLoop loop : streamLoops.values()) {
+
+ // We make sure not to re-create the loops if the method is called
+ // multiple times
+ if (loop.getSourceSinkPairs().isEmpty()) {
+
+ List<StreamNode> headOps = loop.getHeads();
+ List<StreamNode> tailOps = loop.getTails();
+
+ // This means that the iteration was not closed. It should not
+ // be
+ // allowed.
+ if (tailOps.isEmpty()) {
+ throw new RuntimeException("Cannot execute job with empty iterations.");
+ }
+
+ // Check whether we keep the feedback partitioning
+ if (loop.keepsPartitioning()) {
+ // This is the complicated case as we need to enforce
+ // partitioning on the tail -> sink side, which
+ // requires strict forward connections at source -> head
+
+ // We need one source/sink pair per different head
+ // parallelism
+ // as we depend on strict forwards connections
+ Map<Integer, List<StreamNode>> parallelismToHeads = new HashMap<Integer, List<StreamNode>>();
+
+ // Group head operators by parallelism
+ for (StreamNode head : headOps) {
+ int p = head.getParallelism();
+ if (!parallelismToHeads.containsKey(p)) {
+ parallelismToHeads.put(p, new ArrayList<StreamNode>());
+ }
+ parallelismToHeads.get(p).add(head);
+ }
+
+ // We create the sink/source pair for each parallelism
+ // group,
+ // tails will forward to all sinks but each head operator
+ // will
+ // only receive from one source (corresponding to its
+ // parallelism)
+ int c = 0;
+ for (Entry<Integer, List<StreamNode>> headGroup : parallelismToHeads.entrySet()) {
+ List<StreamNode> headOpsInGroup = headGroup.getValue();
+
+ Tuple2<StreamNode, StreamNode> sourceSinkPair = createItSourceAndSink(loop,
+ c);
+ StreamNode source = sourceSinkPair.f0;
+ StreamNode sink = sourceSinkPair.f1;
+
+ // We connect the source to the heads in this group
+ // (forward), setting
+ // type to 2 in case we have a coIteration (this sets
+ // the
+ // input as the second input of the co-operator)
+ for (StreamNode head : headOpsInGroup) {
+ int inputType = loop.isCoIteration() ? 2 : 0;
+ addEdge(source.getId(), head.getId(), new RebalancePartitioner(true),
+ inputType, new ArrayList<String>());
+ }
+
+ // We connect all the tails to the sink keeping the
+ // partitioner
+ for (int i = 0; i < tailOps.size(); i++) {
+ StreamNode tail = tailOps.get(i);
+ StreamPartitioner<?> partitioner = loop.getTailPartitioners().get(i);
+ addEdge(tail.getId(), sink.getId(), partitioner.copy(), 0, loop
+ .getTailSelectedNames().get(i));
+ }
+
+ // We set the sink/source parallelism to the group
+ // parallelism
+ source.setParallelism(headGroup.getKey());
+ sink.setParallelism(source.getParallelism());
+
+ // We set the proper serializers for the sink/source
+ setSerializersFrom(tailOps.get(0).getId(), sink.getId());
+ if (loop.isCoIteration()) {
+ source.setSerializerOut(new StreamRecordSerializer(loop
+ .getFeedbackType(), executionConfig));
+ } else {
+ setSerializersFrom(headOpsInGroup.get(0).getId(), source.getId());
+ }
+
+ c++;
+ }
+
+ } else {
+ // This is the most simple case, we add one iteration
+ // sink/source pair with the parallelism of the first tail
+ // operator. Tail operators will forward the records and
+ // partitioning will be enforced from source -> head
+
+ Tuple2<StreamNode, StreamNode> sourceSinkPair = createItSourceAndSink(loop, 0);
+ StreamNode source = sourceSinkPair.f0;
+ StreamNode sink = sourceSinkPair.f1;
+
+ // We get the feedback partitioner from the first input of
+ // the
+ // first head.
+ StreamPartitioner<?> partitioner = headOps.get(0).getInEdges().get(0)
+ .getPartitioner();
+
+ // Connect the sources to heads using this partitioner
+ for (StreamNode head : headOps) {
+ addEdge(source.getId(), head.getId(), partitioner.copy(), 0,
+ new ArrayList<String>());
+ }
+
+ // The tails are connected to the sink with forward
+ // partitioning
+ for (int i = 0; i < tailOps.size(); i++) {
+ StreamNode tail = tailOps.get(i);
+ addEdge(tail.getId(), sink.getId(), new RebalancePartitioner(true), 0, loop
+ .getTailSelectedNames().get(i));
+ }
+
+ // We set the parallelism to match the first tail op to make
+ // the
+ // forward more efficient
+ sink.setParallelism(tailOps.get(0).getParallelism());
+ source.setParallelism(sink.getParallelism());
+
+ // We set the proper serializers
+ setSerializersFrom(headOps.get(0).getId(), source.getId());
+ setSerializersFrom(tailOps.get(0).getId(), sink.getId());
+ }
- if (getStreamNode(iterationTail).getBufferTimeout() == 0) {
- throw new RuntimeException("Buffer timeout 0 at iteration tail is not supported.");
- }
+ }
- StreamNode itSink = addNode(sinkID, StreamIterationTail.class, null, null);
+ }
- StreamLoop iteration = streamLoops.get(iterationID);
- iteration.setSink(getStreamNode(sinkID));
- vertexIDtoLoop.put(sinkID, iteration);
-
- itSink.setParallelism(iteration.getSource().getParallelism());
+ }
- setSerializersFrom(iterationTail, sinkID);
- getStreamNode(sinkID).setOperatorName("IterationSink-" + sinkID);
+ private Tuple2<StreamNode, StreamNode> createItSourceAndSink(StreamLoop loop, int c) {
+ StreamNode source = addNode(-1 * streamNodes.size(), StreamIterationHead.class, null, null);
+ sources.add(source.getId());
- setBufferTimeout(iteration.getSource().getId(), getStreamNode(iterationTail).getBufferTimeout());
+ StreamNode sink = addNode(-1 * streamNodes.size(), StreamIterationTail.class, null, null);
- if (LOG.isDebugEnabled()) {
- LOG.debug("ITERATION SINK: {}", sinkID);
- }
+ source.setOperatorName("IterationSource-" + loop.getID() + "_" + c);
+ sink.setOperatorName("IterationSink-" + loop.getID() + "_" + c);
+ vertexIDtoBrokerID.put(source.getId(), loop.getID() + "_" + c);
+ vertexIDtoBrokerID.put(sink.getId(), loop.getID() + "_" + c);
+ vertexIDtoLoop.put(source.getId(), loop);
+ vertexIDtoLoop.put(sink.getId(), loop);
+ loop.addSourceSinkPair(source, sink);
+ return new Tuple2<StreamNode, StreamNode>(source, sink);
}
protected StreamNode addNode(Integer vertexID, Class<? extends AbstractInvokable> vertexClass,
@@ -284,7 +422,7 @@ public class StreamGraph extends StreamingPlan {
getStreamNode(vertexID).setParallelism(parallelism);
}
- public void setKey(Integer vertexID, KeySelector<?,?> key) {
+ public void setKey(Integer vertexID, KeySelector<?, ?> key) {
getStreamNode(vertexID).setStatePartitioner(key);
}
@@ -382,6 +520,10 @@ public class StreamGraph extends StreamingPlan {
return vertexIDtoLoop.get(vertexID).getID();
}
+ public String getBrokerID(Integer vertexID) {
+ return vertexIDtoBrokerID.get(vertexID);
+ }
+
public long getLoopTimeout(Integer vertexID) {
return vertexIDtoLoop.get(vertexID).getTimeout();
}
@@ -421,13 +563,13 @@ public class StreamGraph extends StreamingPlan {
* name of the jobGraph
*/
public JobGraph getJobGraph(String jobGraphName) {
-
+ finalizeLoops();
// temporarily forbid checkpointing for iterative jobs
if (isIterative() && isCheckpointingEnabled() && !forceCheckpoint) {
throw new UnsupportedOperationException(
"Checkpointing is currently not supported by default for iterative jobs, as we cannot guarantee exactly once semantics. "
- + "State checkpoints happen normally, but records in-transit during the snapshot will be lost upon failure. "
- + "\nThe user can force enable state checkpoints with the reduced guarantees by calling: env.enableCheckpointing(interval,true)");
+ + "State checkpoints happen normally, but records in-transit during the snapshot will be lost upon failure. "
+ + "\nThe user can force enable state checkpoints with the reduced guarantees by calling: env.enableCheckpointing(interval,true)");
}
setJobName(jobGraphName);
@@ -474,45 +616,4 @@ public class StreamGraph extends StreamingPlan {
DEFAULT, ISOLATE, NEWGROUP
}
- /**
- * Object for representing loops in streaming programs.
- *
- */
- public static class StreamLoop {
-
- private Integer loopID;
-
- private StreamNode source;
- private StreamNode sink;
-
- private Long timeout;
-
- public StreamLoop(Integer loopID, StreamNode source, Long timeout) {
- this.loopID = loopID;
- this.source = source;
- this.timeout = timeout;
- }
-
- public Integer getID() {
- return loopID;
- }
-
- public Long getTimeout() {
- return timeout;
- }
-
- public void setSink(StreamNode sink) {
- this.sink = sink;
- }
-
- public StreamNode getSource() {
- return source;
- }
-
- public StreamNode getSink() {
- return sink;
- }
-
- }
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamLoop.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamLoop.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamLoop.java
new file mode 100644
index 0000000..ba987ef
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamLoop.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+
+/**
+ * Object for representing loops in streaming programs.
+ *
+ */
+public class StreamLoop {
+
+ private Integer loopID;
+
+ private List<StreamNode> headOperators = new ArrayList<StreamNode>();
+ private List<StreamNode> tailOperators = new ArrayList<StreamNode>();
+ private List<StreamPartitioner<?>> tailPartitioners = new ArrayList<StreamPartitioner<?>>();
+ private List<List<String>> tailSelectedNames = new ArrayList<List<String>>();
+
+ private boolean coIteration = false;
+ private TypeInformation<?> feedbackType = null;
+
+ private long timeout;
+ private boolean tailPartitioning = false;
+
+ private List<Tuple2<StreamNode, StreamNode>> sourcesAndSinks = new ArrayList<Tuple2<StreamNode, StreamNode>>();
+
+ public StreamLoop(Integer loopID, long timeout, TypeInformation<?> feedbackType) {
+ this.loopID = loopID;
+ this.timeout = timeout;
+ if (feedbackType != null) {
+ this.feedbackType = feedbackType;
+ coIteration = true;
+ tailPartitioning = true;
+ }
+ }
+
+ public Integer getID() {
+ return loopID;
+ }
+
+ public long getTimeout() {
+ return timeout;
+ }
+
+ public boolean isCoIteration() {
+ return coIteration;
+ }
+
+ public TypeInformation<?> getFeedbackType() {
+ return feedbackType;
+ }
+
+ public void addSourceSinkPair(StreamNode source, StreamNode sink) {
+ this.sourcesAndSinks.add(new Tuple2<StreamNode, StreamNode>(source, sink));
+ }
+
+ public List<Tuple2<StreamNode, StreamNode>> getSourceSinkPairs() {
+ return this.sourcesAndSinks;
+ }
+
+ public void addHeadOperator(StreamNode head) {
+ this.headOperators.add(head);
+ }
+
+ public void addTailOperator(StreamNode tail, StreamPartitioner<?> partitioner,
+ List<String> selectedNames) {
+ this.tailOperators.add(tail);
+ this.tailPartitioners.add(partitioner);
+ this.tailSelectedNames.add(selectedNames);
+ }
+
+ public void applyTailPartitioning() {
+ this.tailPartitioning = true;
+ }
+
+ public boolean keepsPartitioning() {
+ return tailPartitioning;
+ }
+
+ public List<StreamNode> getHeads() {
+ return headOperators;
+ }
+
+ public List<StreamNode> getTails() {
+ return tailOperators;
+ }
+
+ public List<StreamPartitioner<?>> getTailPartitioners() {
+ return tailPartitioners;
+ }
+
+ public List<List<String>> getTailSelectedNames() {
+ return tailSelectedNames;
+ }
+
+ @Override
+ public String toString() {
+ return "ID: " + loopID + "\n" + "Head: " + headOperators + "\n" + "Tail: " + tailOperators
+ + "\n" + "TP: " + tailPartitioners + "\n" + "TSN: " + tailSelectedNames;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index eb34e3f..4d541bc 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -30,17 +30,17 @@ import java.util.Map.Entry;
import org.apache.commons.lang.StringUtils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.streaming.api.graph.StreamGraph.StreamLoop;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator.ChainingStrategy;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
@@ -250,6 +250,7 @@ public class StreamingJobGraphGenerator {
return retConfig;
}
+ @SuppressWarnings("unchecked")
private void setVertexConfig(Integer vertexID, StreamConfig config,
List<StreamEdge> chainableOutputs, List<StreamEdge> nonChainableOutputs) {
@@ -276,7 +277,7 @@ public class StreamingJobGraphGenerator {
if (vertexClass.equals(StreamIterationHead.class)
|| vertexClass.equals(StreamIterationTail.class)) {
- config.setIterationId(streamGraph.getLoopID(vertexID));
+ config.setIterationId(streamGraph.getBrokerID(vertexID));
config.setIterationWaitTime(streamGraph.getLoopTimeout(vertexID));
}
@@ -360,13 +361,19 @@ public class StreamingJobGraphGenerator {
}
for (StreamLoop loop : streamGraph.getStreamLoops()) {
- CoLocationGroup ccg = new CoLocationGroup();
- JobVertex tail = jobVertices.get(loop.getSink().getId());
- JobVertex head = jobVertices.get(loop.getSource().getId());
- ccg.addVertex(head);
- ccg.addVertex(tail);
- tail.updateCoLocationGroup(ccg);
- head.updateCoLocationGroup(ccg);
+ for (Tuple2<StreamNode, StreamNode> pair : loop.getSourceSinkPairs()) {
+
+ CoLocationGroup ccg = new CoLocationGroup();
+
+ JobVertex source = jobVertices.get(pair.f0.getId());
+ JobVertex sink = jobVertices.get(pair.f1.getId());
+
+ ccg.addVertex(source);
+ ccg.addVertex(sink);
+ source.updateCoLocationGroup(ccg);
+ sink.updateCoLocationGroup(ccg);
+ }
+
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
index 70d9c6b..e6ad821 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
@@ -49,4 +49,9 @@ public class RebalancePartitioner<T> extends StreamPartitioner<T> {
public StreamPartitioner<T> copy() {
return new RebalancePartitioner<T>(forward);
}
+
+ @Override
+ public String toString() {
+ return forward ? "ForwardPartitioner" : "RebalancePartitioner";
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
index ef598c6..b37655b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
@@ -45,4 +45,9 @@ public abstract class StreamPartitioner<T> implements
public StreamPartitioner<T> copy() {
return this;
}
+
+ @Override
+ public String toString() {
+ return this.getClass().getSimpleName();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
index 4952cdf..25fe83d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
@@ -48,12 +48,12 @@ public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> {
super.registerInputOutput();
outputHandler = new OutputHandler<OUT>(this);
- Integer iterationId = configuration.getIterationId();
+ String iterationId = configuration.getIterationId();
iterationWaitTime = configuration.getIterationWaitTime();
shouldWait = iterationWaitTime > 0;
try {
- BlockingQueueBroker.instance().handIn(iterationId.toString()+"-"
+ BlockingQueueBroker.instance().handIn(iterationId+"-"
+getEnvironment().getIndexInSubtaskGroup(), dataChannel);
} catch (Exception e) {
throw new RuntimeException(e);
http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
index 5bbae06..b6e3889 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
@@ -30,7 +30,7 @@ public class StreamIterationTail<IN> extends OneInputStreamTask<IN, IN> {
private static final Logger LOG = LoggerFactory.getLogger(StreamIterationTail.class);
- private Integer iterationId;
+ private String iterationId;
@SuppressWarnings("rawtypes")
private BlockingQueue<StreamRecord> dataChannel;
@@ -47,7 +47,7 @@ public class StreamIterationTail<IN> extends OneInputStreamTask<IN, IN> {
iterationId = configuration.getIterationId();
iterationWaitTime = configuration.getIterationWaitTime();
shouldWait = iterationWaitTime > 0;
- dataChannel = BlockingQueueBroker.instance().get(iterationId.toString()+"-"
+ dataChannel = BlockingQueueBroker.instance().get(iterationId+"-"
+getEnvironment().getIndexInSubtaskGroup());
} catch (Exception e) {
throw new StreamTaskException(String.format(
http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index 3021abb..2a88a32 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -18,175 +18,318 @@
package org.apache.flink.streaming.api;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.List;
+import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.IterativeDataStream;
import org.apache.flink.streaming.api.datastream.IterativeDataStream.ConnectedIterativeDataStream;
+import org.apache.flink.streaming.api.datastream.SplitDataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
+import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.graph.StreamGraph.StreamLoop;
+import org.apache.flink.streaming.api.graph.StreamLoop;
+import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.util.Collector;
import org.junit.Test;
-public class IterateTest {
+@SuppressWarnings({ "unchecked", "unused", "serial" })
+public class IterateTest extends StreamingMultipleProgramsTestBase {
private static final long MEMORYSIZE = 32;
private static boolean iterated[];
private static int PARALLELISM = 2;
- public static final class IterationHead extends RichFlatMapFunction<Boolean, Boolean> {
-
- private static final long serialVersionUID = 1L;
+ @Test
+ public void testException() throws Exception {
- @Override
- public void flatMap(Boolean value, Collector<Boolean> out) throws Exception {
- int indx = getRuntimeContext().getIndexOfThisSubtask();
- if (value) {
- iterated[indx] = true;
- } else {
- out.collect(value);
- }
+ StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
+ DataStream<Integer> source = env.fromElements(1, 10);
+ IterativeDataStream<Integer> iter1 = source.iterate();
+ IterativeDataStream<Integer> iter2 = source.iterate();
+ iter1.closeWith(iter1.map(NoOpIntMap));
+ // Check for double closing
+ try {
+ iter1.closeWith(iter1.map(NoOpIntMap));
+ fail();
+ } catch (Exception e) {
}
- }
+ // Check for closing iteration without head
+ try {
+ iter2.closeWith(iter1.map(NoOpIntMap));
+ fail();
+ } catch (Exception e) {
+ }
- public static final class IterationTail extends RichFlatMapFunction<Boolean, Boolean> {
+ iter2.map(NoOpIntMap);
- private static final long serialVersionUID = 1L;
+ // Check for executing with empty iteration
+ try {
+ env.execute();
+ fail();
+ } catch (Exception e) {
+ }
+ }
- @Override
- public void flatMap(Boolean value, Collector<Boolean> out) throws Exception {
- out.collect(true);
+ @Test
+ public void testImmutabilityWithCoiteration() {
+ StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
+ DataStream<Integer> source = env.fromElements(1, 10);
- }
+ IterativeDataStream<Integer> iter1 = source.iterate();
+ // Calling withFeedbackType should create a new iteration
+ ConnectedIterativeDataStream<Integer, String> iter2 = iter1.withFeedbackType(String.class);
- }
+ iter1.closeWith(iter1.map(NoOpIntMap));
+ iter2.closeWith(iter2.map(NoOpCoMap));
- public static final class MySink implements SinkFunction<Boolean> {
+ StreamGraph graph = env.getStreamGraph();
- private static final long serialVersionUID = 1L;
+ graph.getJobGraph();
- @Override
- public void invoke(Boolean tuple) {
+ assertEquals(2, graph.getStreamLoops().size());
+ for (StreamLoop loop : graph.getStreamLoops()) {
+ assertEquals(loop.getHeads(), loop.getTails());
+ List<Tuple2<StreamNode, StreamNode>> sourceSinkPairs = loop.getSourceSinkPairs();
+ assertEquals(1, sourceSinkPairs.size());
}
}
- public static final class NoOpMap implements MapFunction<Boolean, Boolean> {
+ @Test
+ public void testmultipleHeadsTailsSimple() {
+ StreamExecutionEnvironment env = new TestStreamEnvironment(4, MEMORYSIZE);
+ DataStream<Integer> source1 = env.fromElements(1, 2, 3, 4, 5).shuffle();
+ DataStream<Integer> source2 = env.fromElements(1, 2, 3, 4, 5);
- private static final long serialVersionUID = 1L;
+ IterativeDataStream<Integer> iter1 = source1.union(source2).iterate();
- @Override
- public Boolean map(Boolean value) throws Exception {
- return value;
- }
+ DataStream<Integer> head1 = iter1.map(NoOpIntMap);
+ DataStream<Integer> head2 = iter1.map(NoOpIntMap).setParallelism(2);
+ DataStream<Integer> head3 = iter1.map(NoOpIntMap).setParallelism(2)
+ .addSink(new NoOpSink<Integer>());
+ DataStream<Integer> head4 = iter1.map(NoOpIntMap).addSink(new NoOpSink<Integer>());
- }
+ SplitDataStream<Integer> source3 = env.fromElements(1, 2, 3, 4, 5).split(
+ new OutputSelector<Integer>() {
- public StreamExecutionEnvironment constructIterativeJob(StreamExecutionEnvironment env) {
- env.setBufferTimeout(10);
+ @Override
+ public Iterable<String> select(Integer value) {
+ return value % 2 == 0 ? Arrays.asList("even") : Arrays.asList("odd");
+ }
+ });
- DataStream<Boolean> source = env.fromCollection(Collections.nCopies(PARALLELISM, false));
+ iter1.closeWith(source3.select("even").union(
+ head1.map(NoOpIntMap).broadcast().setParallelism(1), head2.shuffle()));
- IterativeDataStream<Boolean> iteration = source.iterate(3000);
+ StreamGraph graph = env.getStreamGraph();
- DataStream<Boolean> increment = iteration.flatMap(new IterationHead()).flatMap(
- new IterationTail());
+ JobGraph jg = graph.getJobGraph();
- iteration.closeWith(increment).addSink(new MySink());
- return env;
- }
+ assertEquals(1, graph.getStreamLoops().size());
+ StreamLoop loop = new ArrayList<StreamLoop>(graph.getStreamLoops()).get(0);
- @Test
- public void testColocation() throws Exception {
- StreamExecutionEnvironment env = new TestStreamEnvironment(4, MEMORYSIZE);
+ assertEquals(4, loop.getHeads().size());
+ assertEquals(3, loop.getTails().size());
- IterativeDataStream<Boolean> it = env.fromElements(true).rebalance().map(new NoOpMap())
- .iterate();
+ assertEquals(1, loop.getSourceSinkPairs().size());
+ Tuple2<StreamNode, StreamNode> pair = loop.getSourceSinkPairs().get(0);
- DataStream<Boolean> head = it.map(new NoOpMap()).setParallelism(2).name("HeadOperator");
+ assertEquals(pair.f0.getParallelism(), pair.f1.getParallelism());
+ assertEquals(4, pair.f0.getOutEdges().size());
+ assertEquals(3, pair.f1.getInEdges().size());
- it.closeWith(head.map(new NoOpMap()).setParallelism(3).name("TailOperator")).print();
+ for (StreamEdge edge : pair.f0.getOutEdges()) {
+ assertTrue(edge.getPartitioner() instanceof ShufflePartitioner);
+ }
+ for (StreamEdge edge : pair.f1.getInEdges()) {
+ assertTrue(edge.getPartitioner() instanceof RebalancePartitioner);
+ }
- JobGraph graph = env.getStreamGraph().getJobGraph();
+ assertTrue(loop.getTailSelectedNames().contains(Arrays.asList("even")));
- JobVertex itSource = null;
- JobVertex itSink = null;
- JobVertex headOp = null;
- JobVertex tailOp = null;
+ // Test co-location
- for (JobVertex vertex : graph.getVertices()) {
+ JobVertex itSource1 = null;
+ JobVertex itSink1 = null;
+
+ for (JobVertex vertex : jg.getVertices()) {
if (vertex.getName().contains("IterationSource")) {
- itSource = vertex;
+ itSource1 = vertex;
} else if (vertex.getName().contains("IterationSink")) {
- itSink = vertex;
- } else if (vertex.getName().contains("HeadOperator")) {
- headOp = vertex;
- } else if (vertex.getName().contains("TailOp")) {
- tailOp = vertex;
+
+ itSink1 = vertex;
+
}
}
- assertTrue(itSource.getCoLocationGroup() != null);
- assertEquals(itSource.getCoLocationGroup(), itSink.getCoLocationGroup());
- assertEquals(headOp.getParallelism(), 2);
- assertEquals(tailOp.getParallelism(), 3);
- assertEquals(itSource.getParallelism(), itSink.getParallelism());
+ assertTrue(itSource1.getCoLocationGroup() != null);
+ assertEquals(itSource1.getCoLocationGroup(), itSink1.getCoLocationGroup());
}
- @SuppressWarnings("unchecked")
@Test
- public void testPartitioning() throws Exception {
+ public void testmultipleHeadsTailsWithTailPartitioning() {
StreamExecutionEnvironment env = new TestStreamEnvironment(4, MEMORYSIZE);
+ DataStream<Integer> source1 = env.fromElements(1, 2, 3, 4, 5).shuffle();
+ DataStream<Integer> source2 = env.fromElements(1, 2, 3, 4, 5);
- IterativeDataStream<Boolean> it = env.fromElements(true).iterate();
+ IterativeDataStream<Integer> iter1 = source1.union(source2).iterate();
- IterativeDataStream<Boolean> it2 = env.fromElements(true).iterate();
+ DataStream<Integer> head1 = iter1.map(NoOpIntMap);
+ DataStream<Integer> head2 = iter1.map(NoOpIntMap).setParallelism(2).name("shuffle");
+ DataStream<Integer> head3 = iter1.map(NoOpIntMap).setParallelism(2)
+ .addSink(new NoOpSink<Integer>());
+ DataStream<Integer> head4 = iter1.map(NoOpIntMap).addSink(new NoOpSink<Integer>());
- DataStream<Boolean> head = it.map(new NoOpMap()).name("Head1").broadcast();
- DataStream<Boolean> head2 = it2.map(new NoOpMap()).name("Head2").broadcast();
+ SplitDataStream<Integer> source3 = env.fromElements(1, 2, 3, 4, 5).name("split")
+ .split(new OutputSelector<Integer>() {
- it.closeWith(head.union(head.map(new NoOpMap()).shuffle()), true);
- it2.closeWith(head2, false);
+ @Override
+ public Iterable<String> select(Integer value) {
+ return value % 2 == 0 ? Arrays.asList("even") : Arrays.asList("odd");
+ }
+ });
+
+ iter1.closeWith(
+ source3.select("even").union(
+ head1.map(NoOpIntMap).broadcast().setParallelism(1).name("bc"),
+ head2.shuffle()), true);
StreamGraph graph = env.getStreamGraph();
- for (StreamLoop loop : graph.getStreamLoops()) {
- StreamEdge tailToSink = loop.getSink().getInEdges().get(0);
- if (tailToSink.getSourceVertex().getOperatorName().contains("Head1")) {
- assertTrue(tailToSink.getPartitioner() instanceof BroadcastPartitioner);
- assertTrue(loop.getSink().getInEdges().get(1).getPartitioner() instanceof ShufflePartitioner);
- } else {
- assertTrue(tailToSink.getPartitioner() instanceof RebalancePartitioner);
+ JobGraph jg = graph.getJobGraph();
+
+ assertEquals(1, graph.getStreamLoops().size());
+
+ StreamLoop loop = new ArrayList<StreamLoop>(graph.getStreamLoops()).get(0);
+
+ assertEquals(4, loop.getHeads().size());
+ assertEquals(3, loop.getTails().size());
+
+ assertEquals(2, loop.getSourceSinkPairs().size());
+ List<Tuple2<StreamNode, StreamNode>> pairs = loop.getSourceSinkPairs();
+ Tuple2<StreamNode, StreamNode> pair1 = pairs.get(0).f0.getParallelism() == 2 ? pairs.get(0)
+ : pairs.get(1);
+ Tuple2<StreamNode, StreamNode> pair2 = pairs.get(0).f0.getParallelism() == 4 ? pairs.get(0)
+ : pairs.get(1);
+
+ assertEquals(pair1.f0.getParallelism(), pair1.f1.getParallelism());
+ assertEquals(2, pair1.f0.getParallelism());
+ assertEquals(2, pair1.f0.getOutEdges().size());
+ assertEquals(3, pair1.f1.getInEdges().size());
+
+ for (StreamEdge edge : pair1.f0.getOutEdges()) {
+ assertTrue(edge.getPartitioner() instanceof RebalancePartitioner);
+ assertEquals(2, edge.getTargetVertex().getParallelism());
+ }
+ for (StreamEdge edge : pair1.f1.getInEdges()) {
+ String tailName = edge.getSourceVertex().getOperatorName();
+ if (tailName.equals("split")) {
+ assertTrue(edge.getPartitioner() instanceof RebalancePartitioner);
+ } else if (tailName.equals("bc")) {
+ assertTrue(edge.getPartitioner() instanceof BroadcastPartitioner);
+ } else if (tailName.equals("shuffle")) {
+ assertTrue(edge.getPartitioner() instanceof ShufflePartitioner);
+ }
+
+ }
+
+ assertEquals(pair2.f0.getParallelism(), pair2.f1.getParallelism());
+ assertEquals(4, pair2.f0.getParallelism());
+ assertEquals(2, pair2.f0.getOutEdges().size());
+ assertEquals(3, pair2.f1.getInEdges().size());
+
+ for (StreamEdge edge : pair2.f0.getOutEdges()) {
+ assertTrue(edge.getPartitioner() instanceof RebalancePartitioner);
+ assertEquals(4, edge.getTargetVertex().getParallelism());
+ }
+ for (StreamEdge edge : pair2.f1.getInEdges()) {
+ String tailName = edge.getSourceVertex().getOperatorName();
+ if (tailName.equals("split")) {
+ assertTrue(edge.getPartitioner() instanceof RebalancePartitioner);
+ } else if (tailName.equals("bc")) {
+ assertTrue(edge.getPartitioner() instanceof BroadcastPartitioner);
+ } else if (tailName.equals("shuffle")) {
+ assertTrue(edge.getPartitioner() instanceof ShufflePartitioner);
}
+
}
+ assertTrue(loop.getTailSelectedNames().contains(Arrays.asList("even")));
+
+ // Test co-location
+
+ JobVertex itSource1 = null;
+ JobVertex itSource2 = null;
+ JobVertex itSink1 = null;
+ JobVertex itSink2 = null;
+
+ for (JobVertex vertex : jg.getVertices()) {
+ if (vertex.getName().contains("IterationSource")) {
+ if (vertex.getName().contains("_0")) {
+ itSource1 = vertex;
+ } else if (vertex.getName().contains("_1")) {
+ itSource2 = vertex;
+ }
+ } else if (vertex.getName().contains("IterationSink")) {
+ if (vertex.getName().contains("_0")) {
+ itSink1 = vertex;
+ } else if (vertex.getName().contains("_1")) {
+ itSink2 = vertex;
+ }
+ }
+ }
+
+ assertTrue(itSource1.getCoLocationGroup() != null);
+ assertTrue(itSource2.getCoLocationGroup() != null);
+
+ assertEquals(itSource1.getCoLocationGroup(), itSink1.getCoLocationGroup());
+ assertEquals(itSource2.getCoLocationGroup(), itSink2.getCoLocationGroup());
+ assertNotEquals(itSource1.getCoLocationGroup(), itSource2.getCoLocationGroup());
}
+ @SuppressWarnings("rawtypes")
@Test
- public void test() throws Exception {
- StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
+ public void testSimpleIteration() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(PARALLELISM);
iterated = new boolean[PARALLELISM];
- env = constructIterativeJob(env);
+ DataStream<Boolean> source = env
+ .fromCollection(Collections.nCopies(PARALLELISM * 2, false));
+
+ IterativeDataStream<Boolean> iteration = source.iterate(3000);
+
+ DataStream<Boolean> increment = iteration.flatMap(new IterationHead()).map(NoOpBoolMap);
+
+ iteration.map(NoOpBoolMap).addSink(new NoOpSink());
+
+ iteration.closeWith(increment).addSink(new NoOpSink());
env.execute();
@@ -195,55 +338,135 @@ public class IterateTest {
}
}
-
+
@Test
public void testCoIteration() throws Exception {
- StreamExecutionEnvironment env = new TestStreamEnvironment(2, MEMORYSIZE);
-
-
- ConnectedIterativeDataStream<Integer, String> coIt = env.fromElements(0, 0).iterate(2000).withFeedbackType("String");
-
- try{
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(2);
+
+ ConnectedIterativeDataStream<Integer, String> coIt = env.fromElements(0, 0).iterate(2000)
+ .withFeedbackType("String");
+
+ try {
coIt.groupBy(1, 2);
fail();
- } catch (UnsupportedOperationException e){}
-
- DataStream<String> head = coIt.flatMap(new CoFlatMapFunction<Integer, String, String>() {
+ } catch (UnsupportedOperationException e) {
+ }
+
+ DataStream<String> head = coIt
+ .flatMap(new RichCoFlatMapFunction<Integer, String, String>() {
+
+ private static final long serialVersionUID = 1L;
+ boolean seenFromSource = false;
+
+ @Override
+ public void flatMap1(Integer value, Collector<String> out) throws Exception {
+ out.collect(((Integer) (value + 1)).toString());
+ }
+
+ @Override
+ public void flatMap2(String value, Collector<String> out) throws Exception {
+ Integer intVal = Integer.valueOf(value);
+ if (intVal < 2) {
+ out.collect(((Integer) (intVal + 1)).toString());
+ }
+ if (intVal == 1000 || intVal == 2000) {
+ seenFromSource = true;
+ }
+ }
- private static final long serialVersionUID = 1L;
+ @Override
+ public void close() {
+ assertTrue(seenFromSource);
+ }
+ });
+
+ coIt.map(new CoMapFunction<Integer, String, String>() {
@Override
- public void flatMap1(Integer value, Collector<String> out) throws Exception {
- out.collect(((Integer) (value + 1)).toString());
+ public String map1(Integer value) throws Exception {
+ return value.toString();
}
@Override
- public void flatMap2(String value, Collector<String> out) throws Exception {
- Integer intVal = Integer.valueOf(value);
- if(intVal < 2){
- out.collect(((Integer) (intVal + 1)).toString());
- }
-
+ public String map2(String value) throws Exception {
+ return value;
}
- });
-
- coIt.closeWith(head.broadcast());
-
+ }).setParallelism(1).addSink(new NoOpSink<String>());
+
+ coIt.closeWith(head.broadcast().union(env.fromElements("1000", "2000").rebalance()));
+
head.addSink(new TestSink()).setParallelism(1);
-
+
env.execute();
-
- assertEquals(new HashSet<String>(Arrays.asList("1","1","2","2","2","2")), TestSink.collected);
+
+ Collections.sort(TestSink.collected);
+ assertEquals(Arrays.asList("1", "1", "2", "2", "2", "2"), TestSink.collected);
+ assertEquals(2, new ArrayList<StreamLoop>(env.getStreamGraph().getStreamLoops()).get(0)
+ .getSourceSinkPairs().size());
}
@Test
+ public void testGroupByFeedback() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(3);
+
+ KeySelector<Integer, Integer> key = new KeySelector<Integer, Integer>() {
+
+ @Override
+ public Integer getKey(Integer value) throws Exception {
+ return value % 3;
+ }
+ };
+
+ DataStream<Integer> source = env.fromElements(1, 2, 3);
+
+ IterativeDataStream<Integer> it = source.groupBy(key).iterate(3000);
+
+ DataStream<Integer> head = it.flatMap(new RichFlatMapFunction<Integer, Integer>() {
+
+ int received = 0;
+ int key = -1;
+
+ @Override
+ public void flatMap(Integer value, Collector<Integer> out) throws Exception {
+ received++;
+ if (key == -1) {
+ key = value % 3;
+ } else {
+ assertEquals(key, value % 3);
+ }
+ if (value > 0) {
+ out.collect(value - 1);
+ }
+ }
+
+ @Override
+ public void close() {
+ assertTrue(received > 1);
+ }
+ });
+
+ it.closeWith(head.groupBy(key).union(head.map(NoOpIntMap).setParallelism(2).groupBy(key)),
+ true).addSink(new NoOpSink<Integer>());
+
+ env.execute();
+ }
+
+ @SuppressWarnings("deprecation")
+ @Test
public void testWithCheckPointing() throws Exception {
StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
+ env.enableCheckpointing();
- env = constructIterativeJob(env);
+ DataStream<Boolean> source = env
+ .fromCollection(Collections.nCopies(PARALLELISM * 2, false));
+
+ IterativeDataStream<Boolean> iteration = source.iterate(3000);
+
+ iteration.closeWith(iteration.flatMap(new IterationHead()));
- env.enableCheckpointing();
try {
env.execute();
@@ -252,8 +475,7 @@ public class IterateTest {
} catch (UnsupportedOperationException e) {
// expected behaviour
}
-
-
+
// Test force checkpointing
try {
@@ -265,22 +487,75 @@ public class IterateTest {
} catch (UnsupportedOperationException e) {
// expected behaviour
}
-
+
env.enableCheckpointing(1, true);
env.getStreamGraph().getJobGraph();
+ }
+
+ public static final class IterationHead extends RichFlatMapFunction<Boolean, Boolean> {
+ public void flatMap(Boolean value, Collector<Boolean> out) throws Exception {
+ int indx = getRuntimeContext().getIndexOfThisSubtask();
+ if (value) {
+ iterated[indx] = true;
+ } else {
+ out.collect(true);
+ }
+ }
+ }
+
+ public static final class NoOpSink<T> extends RichSinkFunction<T> {
+ private List<T> received;
+
+ public void invoke(T tuple) {
+ received.add(tuple);
+ }
+
+ public void open(Configuration conf) {
+ received = new ArrayList<T>();
+ }
+ public void close() {
+ assertTrue(received.size() > 0);
+ }
}
-
- public static class TestSink implements SinkFunction<String>{
+
+ public static CoMapFunction<Integer, String, String> NoOpCoMap = new CoMapFunction<Integer, String, String>() {
+
+ public String map1(Integer value) throws Exception {
+ return value.toString();
+ }
+
+ public String map2(String value) throws Exception {
+ return value;
+ }
+ };
+
+ public static MapFunction<Integer, Integer> NoOpIntMap = new MapFunction<Integer, Integer>() {
+
+ public Integer map(Integer value) throws Exception {
+ return value;
+ }
+
+ };
+
+ public static MapFunction<Boolean, Boolean> NoOpBoolMap = new MapFunction<Boolean, Boolean>() {
+
+ public Boolean map(Boolean value) throws Exception {
+ return value;
+ }
+
+ };
+
+ public static class TestSink implements SinkFunction<String> {
private static final long serialVersionUID = 1L;
- public static Set<String> collected = new HashSet<String>();
-
+ public static List<String> collected = new ArrayList<String>();
+
@Override
public void invoke(String value) throws Exception {
collected.add(value);
}
-
+
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3b69b249/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index fbd6502..2b0f60e 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -78,6 +78,10 @@ class DataStream[T](javaStream: JavaStream[T]) {
* Returns the parallelism of this operation.
*/
def getParallelism = javaStream.getParallelism
+
+ def getPartitioner = javaStream.getPartitioner
+
+ def getSelectedNames = javaStream.getSelectedNames
/**
* Returns the execution config.