You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:25:46 UTC

[09/51] [abbrv] git commit: [streaming] Replaced connection types with StreamPartitioner in DataStream

[streaming] Replaced connection types with StreamPartitioner in DataStream


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/1fccb10f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/1fccb10f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/1fccb10f

Branch: refs/heads/master
Commit: 1fccb10ffe79afaaff4a6e810b7572c7a952676a
Parents: 126a1cb
Author: ghermann <re...@gmail.com>
Authored: Thu Jul 24 10:57:44 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 16:19:17 2014 +0200

----------------------------------------------------------------------
 .../apache/flink/streaming/api/DataStream.java  |  63 +++------
 .../flink/streaming/api/JobGraphBuilder.java    | 139 -------------------
 .../api/StreamExecutionEnvironment.java         |  37 ++---
 3 files changed, 33 insertions(+), 206 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1fccb10f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
index 23f8408..d32aa18 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
@@ -27,7 +27,6 @@ import org.apache.flink.api.java.functions.FlatMapFunction;
 import org.apache.flink.api.java.functions.GroupReduceFunction;
 import org.apache.flink.api.java.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.streaming.api.StreamExecutionEnvironment.ConnectionType;
 import org.apache.flink.streaming.api.collector.OutputSelector;
 import org.apache.flink.streaming.api.function.co.CoMapFunction;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
@@ -38,6 +37,12 @@ import org.apache.flink.streaming.api.invokable.operator.FilterInvokable;
 import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable;
 import org.apache.flink.streaming.api.invokable.operator.MapInvokable;
 import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
+import org.apache.flink.streaming.partitioner.BroadcastPartitioner;
+import org.apache.flink.streaming.partitioner.DistributePartitioner;
+import org.apache.flink.streaming.partitioner.FieldsPartitioner;
+import org.apache.flink.streaming.partitioner.ForwardPartitioner;
+import org.apache.flink.streaming.partitioner.ShufflePartitioner;
+import org.apache.flink.streaming.partitioner.StreamPartitioner;
 
 /**
  * A DataStream represents a stream of elements of the same type. A DataStream
@@ -62,8 +67,7 @@ public class DataStream<T extends Tuple> {
 	protected String userDefinedName;
 	protected OutputSelector<T> outputSelector;
 	protected List<String> connectIDs;
-	protected List<ConnectionType> ctypes;
-	protected List<Integer> cparams;
+	protected List<StreamPartitioner<T>> partitioners;
 	protected boolean iterationflag;
 	protected Integer iterationID;
 
@@ -103,8 +107,7 @@ public class DataStream<T extends Tuple> {
 		this.userDefinedName = dataStream.userDefinedName;
 		this.outputSelector = dataStream.outputSelector;
 		this.connectIDs = new ArrayList<String>(dataStream.connectIDs);
-		this.ctypes = new ArrayList<StreamExecutionEnvironment.ConnectionType>(dataStream.ctypes);
-		this.cparams = new ArrayList<Integer>(dataStream.cparams);
+		this.partitioners = new ArrayList<StreamPartitioner<T>>(dataStream.partitioners);
 		this.iterationflag = dataStream.iterationflag;
 		this.iterationID = dataStream.iterationID;
 	}
@@ -116,11 +119,8 @@ public class DataStream<T extends Tuple> {
 	private void initConnections() {
 		connectIDs = new ArrayList<String>();
 		connectIDs.add(getId());
-		ctypes = new ArrayList<StreamExecutionEnvironment.ConnectionType>();
-		ctypes.add(ConnectionType.SHUFFLE);
-		cparams = new ArrayList<Integer>();
-		cparams.add(0);
-
+		partitioners = new ArrayList<StreamPartitioner<T>>();
+		partitioners.add(new ShufflePartitioner<T>());
 	}
 
 	/**
@@ -226,8 +226,7 @@ public class DataStream<T extends Tuple> {
 	 */
 	private DataStream<T> addConnection(DataStream<T> returnStream, DataStream<T> stream) {
 		returnStream.connectIDs.addAll(stream.connectIDs);
-		returnStream.ctypes.addAll(stream.ctypes);
-		returnStream.cparams.addAll(stream.cparams);
+		returnStream.partitioners.addAll(stream.partitioners);
 
 		return returnStream;
 	}
@@ -261,13 +260,7 @@ public class DataStream<T extends Tuple> {
 			throw new IllegalArgumentException("The position of the field must be non-negative");
 		}
 
-		DataStream<T> returnStream = new DataStream<T>(this);
-
-		for (int i = 0; i < returnStream.ctypes.size(); i++) {
-			returnStream.ctypes.set(i, ConnectionType.FIELD);
-			returnStream.cparams.set(i, keyposition);
-		}
-		return returnStream;
+		return setConnectionType(new FieldsPartitioner<T>(keyposition));
 	}
 
 	/**
@@ -277,12 +270,7 @@ public class DataStream<T extends Tuple> {
 	 * @return The DataStream with broadcast partitioning set.
 	 */
 	public DataStream<T> broadcast() {
-		DataStream<T> returnStream = new DataStream<T>(this);
-
-		for (int i = 0; i < returnStream.ctypes.size(); i++) {
-			returnStream.ctypes.set(i, ConnectionType.BROADCAST);
-		}
-		return returnStream;
+		return setConnectionType(new BroadcastPartitioner<T>());
 	}
 
 	/**
@@ -292,12 +280,7 @@ public class DataStream<T extends Tuple> {
 	 * @return The DataStream with shuffle partitioning set.
 	 */
 	public DataStream<T> shuffle() {
-		DataStream<T> returnStream = new DataStream<T>(this);
-
-		for (int i = 0; i < returnStream.ctypes.size(); i++) {
-			returnStream.ctypes.set(i, ConnectionType.SHUFFLE);
-		}
-		return returnStream;
+		return setConnectionType(new ShufflePartitioner<T>());
 	}
 
 	/**
@@ -307,12 +290,7 @@ public class DataStream<T extends Tuple> {
 	 * @return The DataStream with shuffle partitioning set.
 	 */
 	public DataStream<T> forward() {
-		DataStream<T> returnStream = new DataStream<T>(this);
-
-		for (int i = 0; i < returnStream.ctypes.size(); i++) {
-			returnStream.ctypes.set(i, ConnectionType.FORWARD);
-		}
-		return returnStream;
+		return setConnectionType(new ForwardPartitioner<T>());
 	}
 	
 	/**
@@ -322,14 +300,19 @@ public class DataStream<T extends Tuple> {
 	 * @return The DataStream with shuffle partitioning set.
 	 */
 	public DataStream<T> distribute() {
+		return setConnectionType(new DistributePartitioner<T>());
+	}
+
+	private DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
 		DataStream<T> returnStream = new DataStream<T>(this);
 
-		for (int i = 0; i < returnStream.ctypes.size(); i++) {
-			returnStream.ctypes.set(i, ConnectionType.DISTRIBUTE);
+		for (int i = 0; i < returnStream.partitioners.size(); i++) {
+			returnStream.partitioners.set(i, partitioner);
 		}
+		
 		return returnStream;
 	}
-
+	
 	/**
 	 * Applies a Map transformation on a {@link DataStream}. The transformation
 	 * calls a {@link MapFunction} for each element of the DataStream. Each

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1fccb10f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index 986172b..64fdc03 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -53,12 +53,7 @@ import org.apache.flink.streaming.api.streamcomponent.StreamIterationSource;
 import org.apache.flink.streaming.api.streamcomponent.StreamSink;
 import org.apache.flink.streaming.api.streamcomponent.StreamSource;
 import org.apache.flink.streaming.api.streamcomponent.StreamTask;
-import org.apache.flink.streaming.partitioner.BroadcastPartitioner;
-import org.apache.flink.streaming.partitioner.DistributePartitioner;
-import org.apache.flink.streaming.partitioner.FieldsPartitioner;
 import org.apache.flink.streaming.partitioner.ForwardPartitioner;
-import org.apache.flink.streaming.partitioner.GlobalPartitioner;
-import org.apache.flink.streaming.partitioner.ShufflePartitioner;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
 
 /**
@@ -463,140 +458,6 @@ public class JobGraphBuilder {
 	}
 
 	/**
-	 * Connects two components with the given names by broadcast partitioning.
-	 * <p>
-	 * Broadcast partitioning: All the emitted tuples are replicated to all of
-	 * the output instances
-	 * 
-	 * @param inputStream
-	 *            The DataStream object of the input
-	 * @param upStreamComponentName
-	 *            Name of the upstream component, that will emit the records
-	 * @param downStreamComponentName
-	 *            Name of the downstream component, that will receive the
-	 *            records
-	 * @param typeNumber
-	 *            Number of the type (used at co-functions)
-	 */
-	public <T extends Tuple> void broadcastConnect(DataStream<T> inputStream,
-			String upStreamComponentName, String downStreamComponentName, int typeNumber) {
-		setEdge(upStreamComponentName, downStreamComponentName, new BroadcastPartitioner<T>(),
-				typeNumber);
-	}
-
-	/**
-	 * Connects two components with the given names by fields partitioning on
-	 * the given field.
-	 * <p>
-	 * Fields partitioning: Tuples are hashed by the given key, and grouped to
-	 * outputs accordingly
-	 * 
-	 * @param inputStream
-	 *            The DataStream object of the input
-	 * @param upStreamComponentName
-	 *            Name of the upstream component, that will emit the records
-	 * @param downStreamComponentName
-	 *            Name of the downstream component, that will receive the
-	 *            records
-	 * @param keyPosition
-	 *            Position of key in the tuple
-	 * @param typeNumber
-	 *            Number of the type (used at co-functions)
-	 */
-	public <T extends Tuple> void fieldsConnect(DataStream<T> inputStream,
-			String upStreamComponentName, String downStreamComponentName, int keyPosition,
-			int typeNumber) {
-
-		setEdge(upStreamComponentName, downStreamComponentName, new FieldsPartitioner<T>(
-				keyPosition), typeNumber);
-	}
-
-	/**
-	 * Connects two components with the given names by global partitioning.
-	 * <p>
-	 * Global partitioning: sends all emitted tuples to one output instance
-	 * (i.e. the first one)
-	 * 
-	 * @param inputStream
-	 *            The DataStream object of the input
-	 * @param upStreamComponentName
-	 *            Name of the upstream component, that will emit the tuples
-	 * @param downStreamComponentName
-	 *            Name of the downstream component, that will receive the tuples
-	 * @param typeNumber
-	 *            Number of the type (used at co-functions)
-	 */
-	public <T extends Tuple> void globalConnect(DataStream<T> inputStream,
-			String upStreamComponentName, String downStreamComponentName, int typeNumber) {
-		setEdge(upStreamComponentName, downStreamComponentName, new GlobalPartitioner<T>(),
-				typeNumber);
-	}
-
-	/**
-	 * Connects two components with the given names by shuffle partitioning.
-	 * <p>
-	 * Shuffle partitioning: sends the output tuples to a randomly selected
-	 * channel
-	 * 
-	 * @param inputStream
-	 *            The DataStream object of the input
-	 * @param upStreamComponentName
-	 *            Name of the upstream component, that will emit the tuples
-	 * @param downStreamComponentName
-	 *            Name of the downstream component, that will receive the tuples
-	 * @param typeNumber
-	 *            Number of the type (used at co-functions)
-	 */
-	public <T extends Tuple> void shuffleConnect(DataStream<T> inputStream,
-			String upStreamComponentName, String downStreamComponentName, int typeNumber) {
-		setEdge(upStreamComponentName, downStreamComponentName, new ShufflePartitioner<T>(),
-				typeNumber);
-	}
-
-	/**
-	 * Connects two components with the given names by connecting the local
-	 * subtasks in memory.
-	 * <p>
-	 * Forward partitioning: sends the output tuples to the local subtask of the
-	 * output vertex
-	 * 
-	 * @param inputStream
-	 *            The DataStream object of the input
-	 * @param upStreamComponentName
-	 *            Name of the upstream component, that will emit the tuples
-	 * @param downStreamComponentName
-	 *            Name of the downstream component, that will receive the tuples
-	 * @param typeNumber
-	 *            Number of the type (used at co-functions)
-	 */
-	public <T extends Tuple> void forwardConnect(DataStream<T> inputStream,
-			String upStreamComponentName, String downStreamComponentName, int typeNumber) {
-		setEdge(upStreamComponentName, downStreamComponentName, new ForwardPartitioner<T>(),
-				typeNumber);
-	}
-
-	/**
-	 * Connects two components with the given names by distribute partitioning.
-	 * <p>
-	 * Distribute partitioning: sends the output tuples evenly distributed along
-	 * the selected channels
-	 * 
-	 * @param inputStream
-	 *            The DataStream object of the input
-	 * @param upStreamComponentName
-	 *            Name of the upstream component, that will emit the tuples
-	 * @param downStreamComponentName
-	 *            Name of the downstream component, that will receive the tuples
-	 * @param typeNumber
-	 *            Number of the type (used at co-functions)
-	 */
-	public <T extends Tuple> void distributeConnect(DataStream<T> inputStream,
-			String upStreamComponentName, String downStreamComponentName, int typeNumber) {
-		setEdge(upStreamComponentName, downStreamComponentName, new DistributePartitioner<T>(),
-				typeNumber);
-	}
-
-	/**
 	 * Connects to JobGraph components with the given names, partitioning and
 	 * channel type
 	 * 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1fccb10f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
index 4b5bc98..35cfc24 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
@@ -43,6 +43,8 @@ import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.streaming.api.invokable.SinkInvokable;
 import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
 import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
+import org.apache.flink.streaming.partitioner.ForwardPartitioner;
+import org.apache.flink.streaming.partitioner.StreamPartitioner;
 
 /**
  * {@link ExecutionEnvironment} for streaming jobs. An instance of it is
@@ -364,9 +366,8 @@ public abstract class StreamExecutionEnvironment {
 		jobGraphBuilder.setIterationSourceParallelism(iterationID, inputStream.getParallelism());
 
 		for (int i = 0; i < inputStream.connectIDs.size(); i++) {
-			String input = inputStream.connectIDs.get(i);
-			jobGraphBuilder.forwardConnect(inputStream, input, returnStream.getId(), 0);
-
+			String inputID = inputStream.connectIDs.get(i);
+			jobGraphBuilder.setEdge(inputID, returnStream.getId(), new ForwardPartitioner<T>(), 0);
 		}
 	}
 
@@ -552,32 +553,14 @@ public abstract class StreamExecutionEnvironment {
 	 *            Number of the type (used at co-functions)
 	 */
 	private <T extends Tuple> void connectGraph(DataStream<T> inputStream, String outputID, int typeNumber) {
-
+		
 		for (int i = 0; i < inputStream.connectIDs.size(); i++) {
-			ConnectionType type = inputStream.ctypes.get(i);
-			String input = inputStream.connectIDs.get(i);
-			int param = inputStream.cparams.get(i);
-
-			switch (type) {
-			case SHUFFLE:
-				jobGraphBuilder.shuffleConnect(inputStream, input, outputID, typeNumber);
-				break;
-			case BROADCAST:
-				jobGraphBuilder.broadcastConnect(inputStream, input, outputID, typeNumber);
-				break;
-			case FIELD:
-				jobGraphBuilder.fieldsConnect(inputStream, input, outputID, param, typeNumber);
-				break;
-			case FORWARD:
-				jobGraphBuilder.forwardConnect(inputStream, input, outputID, typeNumber);
-				break;
-			case DISTRIBUTE:
-				jobGraphBuilder.distributeConnect(inputStream, input, outputID, typeNumber);
-				break;
-			}
-
+			String inputID = inputStream.connectIDs.get(i);
+			StreamPartitioner<T> partitioner = inputStream.partitioners.get(i);
+			
+			jobGraphBuilder.setEdge(inputID, outputID, partitioner,
+					typeNumber);
 		}
-
 	}
 
 	protected <T extends Tuple> void setName(DataStream<T> stream, String name) {