You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:25:46 UTC
[09/51] [abbrv] git commit: [streaming] Replaced connection types
with StreamPartitioner in DataStream
[streaming] Replaced connection types with StreamPartitioner in DataStream
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/1fccb10f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/1fccb10f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/1fccb10f
Branch: refs/heads/master
Commit: 1fccb10ffe79afaaff4a6e810b7572c7a952676a
Parents: 126a1cb
Author: ghermann <re...@gmail.com>
Authored: Thu Jul 24 10:57:44 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 16:19:17 2014 +0200
----------------------------------------------------------------------
.../apache/flink/streaming/api/DataStream.java | 63 +++------
.../flink/streaming/api/JobGraphBuilder.java | 139 -------------------
.../api/StreamExecutionEnvironment.java | 37 ++---
3 files changed, 33 insertions(+), 206 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1fccb10f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
index 23f8408..d32aa18 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
@@ -27,7 +27,6 @@ import org.apache.flink.api.java.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.GroupReduceFunction;
import org.apache.flink.api.java.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.streaming.api.StreamExecutionEnvironment.ConnectionType;
import org.apache.flink.streaming.api.collector.OutputSelector;
import org.apache.flink.streaming.api.function.co.CoMapFunction;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
@@ -38,6 +37,12 @@ import org.apache.flink.streaming.api.invokable.operator.FilterInvokable;
import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable;
import org.apache.flink.streaming.api.invokable.operator.MapInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
+import org.apache.flink.streaming.partitioner.BroadcastPartitioner;
+import org.apache.flink.streaming.partitioner.DistributePartitioner;
+import org.apache.flink.streaming.partitioner.FieldsPartitioner;
+import org.apache.flink.streaming.partitioner.ForwardPartitioner;
+import org.apache.flink.streaming.partitioner.ShufflePartitioner;
+import org.apache.flink.streaming.partitioner.StreamPartitioner;
/**
* A DataStream represents a stream of elements of the same type. A DataStream
@@ -62,8 +67,7 @@ public class DataStream<T extends Tuple> {
protected String userDefinedName;
protected OutputSelector<T> outputSelector;
protected List<String> connectIDs;
- protected List<ConnectionType> ctypes;
- protected List<Integer> cparams;
+ protected List<StreamPartitioner<T>> partitioners;
protected boolean iterationflag;
protected Integer iterationID;
@@ -103,8 +107,7 @@ public class DataStream<T extends Tuple> {
this.userDefinedName = dataStream.userDefinedName;
this.outputSelector = dataStream.outputSelector;
this.connectIDs = new ArrayList<String>(dataStream.connectIDs);
- this.ctypes = new ArrayList<StreamExecutionEnvironment.ConnectionType>(dataStream.ctypes);
- this.cparams = new ArrayList<Integer>(dataStream.cparams);
+ this.partitioners = new ArrayList<StreamPartitioner<T>>(dataStream.partitioners);
this.iterationflag = dataStream.iterationflag;
this.iterationID = dataStream.iterationID;
}
@@ -116,11 +119,8 @@ public class DataStream<T extends Tuple> {
private void initConnections() {
connectIDs = new ArrayList<String>();
connectIDs.add(getId());
- ctypes = new ArrayList<StreamExecutionEnvironment.ConnectionType>();
- ctypes.add(ConnectionType.SHUFFLE);
- cparams = new ArrayList<Integer>();
- cparams.add(0);
-
+ partitioners = new ArrayList<StreamPartitioner<T>>();
+ partitioners.add(new ShufflePartitioner<T>());
}
/**
@@ -226,8 +226,7 @@ public class DataStream<T extends Tuple> {
*/
private DataStream<T> addConnection(DataStream<T> returnStream, DataStream<T> stream) {
returnStream.connectIDs.addAll(stream.connectIDs);
- returnStream.ctypes.addAll(stream.ctypes);
- returnStream.cparams.addAll(stream.cparams);
+ returnStream.partitioners.addAll(stream.partitioners);
return returnStream;
}
@@ -261,13 +260,7 @@ public class DataStream<T extends Tuple> {
throw new IllegalArgumentException("The position of the field must be non-negative");
}
- DataStream<T> returnStream = new DataStream<T>(this);
-
- for (int i = 0; i < returnStream.ctypes.size(); i++) {
- returnStream.ctypes.set(i, ConnectionType.FIELD);
- returnStream.cparams.set(i, keyposition);
- }
- return returnStream;
+ return setConnectionType(new FieldsPartitioner<T>(keyposition));
}
/**
@@ -277,12 +270,7 @@ public class DataStream<T extends Tuple> {
* @return The DataStream with broadcast partitioning set.
*/
public DataStream<T> broadcast() {
- DataStream<T> returnStream = new DataStream<T>(this);
-
- for (int i = 0; i < returnStream.ctypes.size(); i++) {
- returnStream.ctypes.set(i, ConnectionType.BROADCAST);
- }
- return returnStream;
+ return setConnectionType(new BroadcastPartitioner<T>());
}
/**
@@ -292,12 +280,7 @@ public class DataStream<T extends Tuple> {
* @return The DataStream with shuffle partitioning set.
*/
public DataStream<T> shuffle() {
- DataStream<T> returnStream = new DataStream<T>(this);
-
- for (int i = 0; i < returnStream.ctypes.size(); i++) {
- returnStream.ctypes.set(i, ConnectionType.SHUFFLE);
- }
- return returnStream;
+ return setConnectionType(new ShufflePartitioner<T>());
}
/**
@@ -307,12 +290,7 @@ public class DataStream<T extends Tuple> {
* @return The DataStream with shuffle partitioning set.
*/
public DataStream<T> forward() {
- DataStream<T> returnStream = new DataStream<T>(this);
-
- for (int i = 0; i < returnStream.ctypes.size(); i++) {
- returnStream.ctypes.set(i, ConnectionType.FORWARD);
- }
- return returnStream;
+ return setConnectionType(new ForwardPartitioner<T>());
}
/**
@@ -322,14 +300,19 @@ public class DataStream<T extends Tuple> {
* @return The DataStream with shuffle partitioning set.
*/
public DataStream<T> distribute() {
+ return setConnectionType(new DistributePartitioner<T>());
+ }
+
+ private DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
DataStream<T> returnStream = new DataStream<T>(this);
- for (int i = 0; i < returnStream.ctypes.size(); i++) {
- returnStream.ctypes.set(i, ConnectionType.DISTRIBUTE);
+ for (int i = 0; i < returnStream.partitioners.size(); i++) {
+ returnStream.partitioners.set(i, partitioner);
}
+
return returnStream;
}
-
+
/**
* Applies a Map transformation on a {@link DataStream}. The transformation
* calls a {@link MapFunction} for each element of the DataStream. Each
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1fccb10f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index 986172b..64fdc03 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -53,12 +53,7 @@ import org.apache.flink.streaming.api.streamcomponent.StreamIterationSource;
import org.apache.flink.streaming.api.streamcomponent.StreamSink;
import org.apache.flink.streaming.api.streamcomponent.StreamSource;
import org.apache.flink.streaming.api.streamcomponent.StreamTask;
-import org.apache.flink.streaming.partitioner.BroadcastPartitioner;
-import org.apache.flink.streaming.partitioner.DistributePartitioner;
-import org.apache.flink.streaming.partitioner.FieldsPartitioner;
import org.apache.flink.streaming.partitioner.ForwardPartitioner;
-import org.apache.flink.streaming.partitioner.GlobalPartitioner;
-import org.apache.flink.streaming.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.partitioner.StreamPartitioner;
/**
@@ -463,140 +458,6 @@ public class JobGraphBuilder {
}
/**
- * Connects two components with the given names by broadcast partitioning.
- * <p>
- * Broadcast partitioning: All the emitted tuples are replicated to all of
- * the output instances
- *
- * @param inputStream
- * The DataStream object of the input
- * @param upStreamComponentName
- * Name of the upstream component, that will emit the records
- * @param downStreamComponentName
- * Name of the downstream component, that will receive the
- * records
- * @param typeNumber
- * Number of the type (used at co-functions)
- */
- public <T extends Tuple> void broadcastConnect(DataStream<T> inputStream,
- String upStreamComponentName, String downStreamComponentName, int typeNumber) {
- setEdge(upStreamComponentName, downStreamComponentName, new BroadcastPartitioner<T>(),
- typeNumber);
- }
-
- /**
- * Connects two components with the given names by fields partitioning on
- * the given field.
- * <p>
- * Fields partitioning: Tuples are hashed by the given key, and grouped to
- * outputs accordingly
- *
- * @param inputStream
- * The DataStream object of the input
- * @param upStreamComponentName
- * Name of the upstream component, that will emit the records
- * @param downStreamComponentName
- * Name of the downstream component, that will receive the
- * records
- * @param keyPosition
- * Position of key in the tuple
- * @param typeNumber
- * Number of the type (used at co-functions)
- */
- public <T extends Tuple> void fieldsConnect(DataStream<T> inputStream,
- String upStreamComponentName, String downStreamComponentName, int keyPosition,
- int typeNumber) {
-
- setEdge(upStreamComponentName, downStreamComponentName, new FieldsPartitioner<T>(
- keyPosition), typeNumber);
- }
-
- /**
- * Connects two components with the given names by global partitioning.
- * <p>
- * Global partitioning: sends all emitted tuples to one output instance
- * (i.e. the first one)
- *
- * @param inputStream
- * The DataStream object of the input
- * @param upStreamComponentName
- * Name of the upstream component, that will emit the tuples
- * @param downStreamComponentName
- * Name of the downstream component, that will receive the tuples
- * @param typeNumber
- * Number of the type (used at co-functions)
- */
- public <T extends Tuple> void globalConnect(DataStream<T> inputStream,
- String upStreamComponentName, String downStreamComponentName, int typeNumber) {
- setEdge(upStreamComponentName, downStreamComponentName, new GlobalPartitioner<T>(),
- typeNumber);
- }
-
- /**
- * Connects two components with the given names by shuffle partitioning.
- * <p>
- * Shuffle partitioning: sends the output tuples to a randomly selected
- * channel
- *
- * @param inputStream
- * The DataStream object of the input
- * @param upStreamComponentName
- * Name of the upstream component, that will emit the tuples
- * @param downStreamComponentName
- * Name of the downstream component, that will receive the tuples
- * @param typeNumber
- * Number of the type (used at co-functions)
- */
- public <T extends Tuple> void shuffleConnect(DataStream<T> inputStream,
- String upStreamComponentName, String downStreamComponentName, int typeNumber) {
- setEdge(upStreamComponentName, downStreamComponentName, new ShufflePartitioner<T>(),
- typeNumber);
- }
-
- /**
- * Connects two components with the given names by connecting the local
- * subtasks in memory.
- * <p>
- * Forward partitioning: sends the output tuples to the local subtask of the
- * output vertex
- *
- * @param inputStream
- * The DataStream object of the input
- * @param upStreamComponentName
- * Name of the upstream component, that will emit the tuples
- * @param downStreamComponentName
- * Name of the downstream component, that will receive the tuples
- * @param typeNumber
- * Number of the type (used at co-functions)
- */
- public <T extends Tuple> void forwardConnect(DataStream<T> inputStream,
- String upStreamComponentName, String downStreamComponentName, int typeNumber) {
- setEdge(upStreamComponentName, downStreamComponentName, new ForwardPartitioner<T>(),
- typeNumber);
- }
-
- /**
- * Connects two components with the given names by distribute partitioning.
- * <p>
- * Distribute partitioning: sends the output tuples evenly distributed along
- * the selected channels
- *
- * @param inputStream
- * The DataStream object of the input
- * @param upStreamComponentName
- * Name of the upstream component, that will emit the tuples
- * @param downStreamComponentName
- * Name of the downstream component, that will receive the tuples
- * @param typeNumber
- * Number of the type (used at co-functions)
- */
- public <T extends Tuple> void distributeConnect(DataStream<T> inputStream,
- String upStreamComponentName, String downStreamComponentName, int typeNumber) {
- setEdge(upStreamComponentName, downStreamComponentName, new DistributePartitioner<T>(),
- typeNumber);
- }
-
- /**
* Connects to JobGraph components with the given names, partitioning and
* channel type
*
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1fccb10f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
index 4b5bc98..35cfc24 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
@@ -43,6 +43,8 @@ import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.streaming.api.invokable.SinkInvokable;
import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
+import org.apache.flink.streaming.partitioner.ForwardPartitioner;
+import org.apache.flink.streaming.partitioner.StreamPartitioner;
/**
* {@link ExecutionEnvironment} for streaming jobs. An instance of it is
@@ -364,9 +366,8 @@ public abstract class StreamExecutionEnvironment {
jobGraphBuilder.setIterationSourceParallelism(iterationID, inputStream.getParallelism());
for (int i = 0; i < inputStream.connectIDs.size(); i++) {
- String input = inputStream.connectIDs.get(i);
- jobGraphBuilder.forwardConnect(inputStream, input, returnStream.getId(), 0);
-
+ String inputID = inputStream.connectIDs.get(i);
+ jobGraphBuilder.setEdge(inputID, returnStream.getId(), new ForwardPartitioner<T>(), 0);
}
}
@@ -552,32 +553,14 @@ public abstract class StreamExecutionEnvironment {
* Number of the type (used at co-functions)
*/
private <T extends Tuple> void connectGraph(DataStream<T> inputStream, String outputID, int typeNumber) {
-
+
for (int i = 0; i < inputStream.connectIDs.size(); i++) {
- ConnectionType type = inputStream.ctypes.get(i);
- String input = inputStream.connectIDs.get(i);
- int param = inputStream.cparams.get(i);
-
- switch (type) {
- case SHUFFLE:
- jobGraphBuilder.shuffleConnect(inputStream, input, outputID, typeNumber);
- break;
- case BROADCAST:
- jobGraphBuilder.broadcastConnect(inputStream, input, outputID, typeNumber);
- break;
- case FIELD:
- jobGraphBuilder.fieldsConnect(inputStream, input, outputID, param, typeNumber);
- break;
- case FORWARD:
- jobGraphBuilder.forwardConnect(inputStream, input, outputID, typeNumber);
- break;
- case DISTRIBUTE:
- jobGraphBuilder.distributeConnect(inputStream, input, outputID, typeNumber);
- break;
- }
-
+ String inputID = inputStream.connectIDs.get(i);
+ StreamPartitioner<T> partitioner = inputStream.partitioners.get(i);
+
+ jobGraphBuilder.setEdge(inputID, outputID, partitioner,
+ typeNumber);
}
-
}
protected <T extends Tuple> void setName(DataStream<T> stream, String name) {