You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:26:07 UTC

[30/51] [abbrv] git commit: [streaming] DataStream type refactor

[streaming] DataStream type refactor


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/f932700a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/f932700a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/f932700a

Branch: refs/heads/master
Commit: f932700ae4e7e6110b729afa8530ac029ed0b700
Parents: 910f74d
Author: gyfora <gy...@gmail.com>
Authored: Sun Aug 3 22:47:23 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 16:22:50 2014 +0200

----------------------------------------------------------------------
 .../apache/flink/streaming/api/DataStream.java  | 509 +++++++++----------
 .../streaming/api/IterativeDataStream.java      |  19 +-
 .../flink/streaming/api/SplitDataStream.java    |   7 +-
 .../flink/streaming/api/StreamOperator.java     |   8 +-
 .../api/collector/DirectedOutputTest.java       |  59 ++-
 5 files changed, 318 insertions(+), 284 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f932700a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
index c648ab2..10f8114 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
@@ -77,16 +77,14 @@ public class DataStream<T> {
 	protected String id;
 	protected int degreeOfParallelism;
 	protected String userDefinedName;
-	protected List<String> connectIDs;
-	protected List<StreamPartitioner<T>> partitioners;
-	protected boolean iterationflag;
-	protected Integer iterationID;
+	protected StreamPartitioner<T> partitioner;
+	protected List<DataStream<T>> connectedStreams;
 
 	protected JobGraphBuilder jobGraphBuilder;
 
 	/**
 	 * Create a new {@link DataStream} in the given execution environment with
-	 * partitioning set to shuffle by default.
+	 * partitioning set to forward by default.
 	 * 
 	 * @param environment
 	 *            StreamExecutionEnvironment
@@ -104,7 +102,9 @@ public class DataStream<T> {
 		this.environment = environment;
 		this.degreeOfParallelism = environment.getDegreeOfParallelism();
 		this.jobGraphBuilder = environment.getJobGraphBuilder();
-		initConnections();
+		this.partitioner = new ForwardPartitioner<T>();
+		this.connectedStreams = new ArrayList<DataStream<T>>();
+		this.connectedStreams.add(this.copy());
 	}
 
 	/**
@@ -118,11 +118,13 @@ public class DataStream<T> {
 		this.id = dataStream.id;
 		this.degreeOfParallelism = dataStream.degreeOfParallelism;
 		this.userDefinedName = dataStream.userDefinedName;
-		this.connectIDs = new ArrayList<String>(dataStream.connectIDs);
-		this.partitioners = new ArrayList<StreamPartitioner<T>>(dataStream.partitioners);
-		this.iterationflag = dataStream.iterationflag;
-		this.iterationID = dataStream.iterationID;
+		this.partitioner = dataStream.partitioner;
 		this.jobGraphBuilder = dataStream.jobGraphBuilder;
+		this.connectedStreams = new ArrayList<DataStream<T>>();
+		for (DataStream<T> stream : dataStream.connectedStreams) {
+			this.connectedStreams.add(stream.copy());
+		}
+
 	}
 
 	/**
@@ -142,17 +144,6 @@ public class DataStream<T> {
 	}
 
 	/**
-	 * Initialize the connection and partitioning among the connected
-	 * {@link DataStream}s.
-	 */
-	private void initConnections() {
-		connectIDs = new ArrayList<String>();
-		connectIDs.add(getId());
-		partitioners = new ArrayList<StreamPartitioner<T>>();
-		partitioners.add(new ForwardPartitioner<T>());
-	}
-
-	/**
 	 * Returns the ID of the {@link DataStream}.
 	 * 
 	 * @return ID of the DataStream
@@ -162,31 +153,12 @@ public class DataStream<T> {
 	}
 
 	/**
-	 * Sets the mutability of the operator represented by the DataStream. If the
-	 * operator is set to mutable, the tuples received in the user defined
-	 * functions, will be reused after the function call. Setting an operator to
-	 * mutable greatly reduces garbage collection overhead and thus scalability.
-	 * 
-	 * @param isMutable
-	 *            The mutability of the operator.
-	 * @return The DataStream with mutability set.
-	 */
-	public DataStream<T> setMutability(boolean isMutable) {
-		jobGraphBuilder.setMutability(id, isMutable);
-		return this;
-	}
-
-	/**
-	 * Sets the maximum time frequency (ms) for the flushing of the output
-	 * buffer. By default the output buffers flush only when they are full.
+	 * Gets the degree of parallelism for this operator.
 	 * 
-	 * @param timeoutMillis
-	 *            The maximum time between two output flushes.
-	 * @return The DataStream with buffer timeout set.
+	 * @return The parallelism set for this operator.
 	 */
-	public DataStream<T> setBufferTimeout(long timeoutMillis) {
-		jobGraphBuilder.setBufferTimeout(id, timeoutMillis);
-		return this;
+	public int getParallelism() {
+		return this.degreeOfParallelism;
 	}
 
 	/**
@@ -202,39 +174,37 @@ public class DataStream<T> {
 			throw new IllegalArgumentException("The parallelism of an operator must be at least 1.");
 		}
 		this.degreeOfParallelism = dop;
-
+	
 		jobGraphBuilder.setParallelism(id, degreeOfParallelism);
-
+	
 		return this;
 	}
 
 	/**
-	 * Gets the degree of parallelism for this operator.
+	 * Sets the mutability of the operator represented by the DataStream. If the
+	 * operator is set to mutable, the tuples received in the user defined
+	 * functions, will be reused after the function call. Setting an operator to
+	 * mutable greatly reduces garbage collection overhead and thus scalability.
 	 * 
-	 * @return The parallelism set for this operator.
+	 * @param isMutable
+	 *            The mutability of the operator.
+	 * @return The DataStream with mutability set.
 	 */
-	public int getParallelism() {
-		return this.degreeOfParallelism;
+	public DataStream<T> setMutability(boolean isMutable) {
+		jobGraphBuilder.setMutability(id, isMutable);
+		return this;
 	}
 
 	/**
-	 * Gives the data transformation(vertex) a user defined name in order to use
-	 * with directed outputs. The {@link OutputSelector} of the input vertex
-	 * should use this name for directed emits.
+	 * Sets the maximum time frequency (ms) for the flushing of the output
+	 * buffer. By default the output buffers flush only when they are full.
 	 * 
-	 * @param name
-	 *            The name to set
-	 * @return The named DataStream.
+	 * @param timeoutMillis
+	 *            The maximum time between two output flushes.
+	 * @return The DataStream with buffer timeout set.
 	 */
-	protected DataStream<T> name(String name) {
-		// TODO copy DataStream?
-		if (name == "") {
-			throw new IllegalArgumentException("User defined name must not be empty string");
-		}
-
-		userDefinedName = name;
-		jobGraphBuilder.setUserDefinedName(id, name);
-
+	public DataStream<T> setBufferTimeout(long timeoutMillis) {
+		jobGraphBuilder.setBufferTimeout(id, timeoutMillis);
 		return this;
 	}
 
@@ -258,19 +228,6 @@ public class DataStream<T> {
 	}
 
 	/**
-	 * Connects two DataStreams
-	 * 
-	 * @param returnStream
-	 *            The other DataStream will connected to this
-	 * @param stream
-	 *            This DataStream will be connected to returnStream
-	 */
-	private void addConnection(DataStream<T> returnStream, DataStream<T> stream) {
-		returnStream.connectIDs.addAll(stream.connectIDs);
-		returnStream.partitioners.addAll(stream.partitioners);
-	}
-
-	/**
 	 * Operator used for directing tuples to specific named outputs. Sets an
 	 * {@link OutputSelector} for the vertex. The tuples emitted from this
 	 * vertex will be sent to the output names selected by the OutputSelector.
@@ -282,8 +239,9 @@ public class DataStream<T> {
 	 */
 	public SplitDataStream<T> split(OutputSelector<T> outputSelector) {
 		try {
-			for (String id : connectIDs) {
-				jobGraphBuilder.setOutputSelector(id, SerializationUtils.serialize(outputSelector));
+			for (DataStream<T> stream : connectedStreams) {
+				jobGraphBuilder.setOutputSelector(stream.id,
+						SerializationUtils.serialize(outputSelector));
 			}
 		} catch (SerializationException e) {
 			throw new RuntimeException("Cannot serialize OutputSelector");
@@ -349,16 +307,6 @@ public class DataStream<T> {
 		return setConnectionType(new DistributePartitioner<T>());
 	}
 
-	private DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
-		DataStream<T> returnStream = this.copy();
-
-		for (int i = 0; i < returnStream.partitioners.size(); i++) {
-			returnStream.partitioners.set(i, partitioner);
-		}
-
-		return returnStream;
-	}
-
 	/**
 	 * Applies a Map transformation on a {@link DataStream}. The transformation
 	 * calls a {@link RichMapFunction} for each element of the DataStream. Each
@@ -377,28 +325,6 @@ public class DataStream<T> {
 	}
 
 	/**
-	 * Applies a CoMap transformation on two separate {@link DataStream}s. The
-	 * transformation calls a {@link CoMapFunction#map1(Tuple)} for each element
-	 * of the first DataStream (on which .coMapWith was called) and
-	 * {@link CoMapFunction#map2(Tuple)} for each element of the second
-	 * DataStream. Each CoMapFunction call returns exactly one element.
-	 * 
-	 * @param coMapper
-	 *            The CoMapFunction used to jointly transform the two input
-	 *            DataStreams
-	 * @param otherStream
-	 *            The DataStream that will be transformed with
-	 *            {@link CoMapFunction#map2(Tuple)}
-	 * @return The transformed DataStream
-	 */
-	public <T2, R> DataStream<R> coMapWith(CoMapFunction<T, T2, R> coMapper,
-			DataStream<T2> otherStream) {
-		return addCoFunction("coMap", this.copy(), otherStream.copy(), coMapper,
-				new FunctionTypeWrapper<T, T2, R>(coMapper, CoMapFunction.class, 0, 1, 2),
-				new CoMapInvokable<T, T2, R>(coMapper));
-	}
-
-	/**
 	 * Applies a FlatMap transformation on a {@link DataStream}. The
 	 * transformation calls a {@link RichFlatMapFunction} for each element of
 	 * the DataStream. Each RichFlatMapFunction call can return any number of
@@ -418,19 +344,25 @@ public class DataStream<T> {
 	}
 
 	/**
-	 * Applies a Filter transformation on a {@link DataStream}. The
-	 * transformation calls a {@link RichFilterFunction} for each element of the
-	 * DataStream and retains only those element for which the function returns
-	 * true. Elements for which the function returns false are filtered.
+	 * Applies a CoMap transformation on two separate {@link DataStream}s. The
+	 * transformation calls a {@link CoMapFunction#map1(Tuple)} for each element
+	 * of the first DataStream (on which .coMapWith was called) and
+	 * {@link CoMapFunction#map2(Tuple)} for each element of the second
+	 * DataStream. Each CoMapFunction call returns exactly one element.
 	 * 
-	 * @param filter
-	 *            The RichFilterFunction that is called for each element of the
-	 *            DataSet.
-	 * @return The filtered DataStream.
+	 * @param coMapper
+	 *            The CoMapFunction used to jointly transform the two input
+	 *            DataStreams
+	 * @param otherStream
+	 *            The DataStream that will be transformed with
+	 *            {@link CoMapFunction#map2(Tuple)}
+	 * @return The transformed DataStream
 	 */
-	public StreamOperator<T, T> filter(RichFilterFunction<T> filter) {
-		return addFunction("filter", filter, new FunctionTypeWrapper<T, Tuple, T>(filter,
-				RichFilterFunction.class, 0, -1, 0), new FilterInvokable<T>(filter));
+	public <T2, R> DataStream<R> coMapWith(CoMapFunction<T, T2, R> coMapper,
+			DataStream<T2> otherStream) {
+		return addCoFunction("coMap", this.copy(), otherStream.copy(), coMapper,
+				new FunctionTypeWrapper<T, T2, R>(coMapper, CoMapFunction.class, 0, 1, 2),
+				new CoMapInvokable<T, T2, R>(coMapper));
 	}
 
 	/**
@@ -480,115 +412,19 @@ public class DataStream<T> {
 	}
 
 	/**
-	 * Internal function for passing the user defined functions to the JobGraph
-	 * of the job.
-	 * 
-	 * @param functionName
-	 *            name of the function
-	 * @param function
-	 *            the user defined function
-	 * @param functionInvokable
-	 *            the wrapping JobVertex instance
-	 * @param <T>
-	 *            type of the input stream
-	 * @param <R>
-	 *            type of the return stream
-	 * @return the data stream constructed
-	 */
-	private <R> StreamOperator<T, R> addFunction(String functionName,
-			final AbstractRichFunction function, TypeSerializerWrapper<T, Tuple, R> typeWrapper,
-			UserTaskInvokable<T, R> functionInvokable) {
-
-		DataStream<T> inputStream = this.copy();
-		StreamOperator<T, R> returnStream = new StreamOperator<T, R>(environment, functionName);
-
-		try {
-			jobGraphBuilder.addTask(returnStream.getId(), functionInvokable, typeWrapper,
-					functionName, SerializationUtils.serialize(function), degreeOfParallelism);
-		} catch (SerializationException e) {
-			throw new RuntimeException("Cannot serialize user defined function");
-		}
-
-		connectGraph(inputStream, returnStream.getId(), 0);
-
-		if (inputStream.iterationflag) {
-			returnStream.addIterationSource(inputStream.iterationID.toString());
-			inputStream.iterationflag = false;
-		}
-
-		if (inputStream instanceof NamedDataStream) {
-			returnStream.name(inputStream.userDefinedName);
-		}
-
-		return returnStream;
-	}
-
-	protected <T1, T2, R> DataStream<R> addCoFunction(String functionName,
-			DataStream<T1> inputStream1, DataStream<T2> inputStream2,
-			final AbstractRichFunction function, TypeSerializerWrapper<T1, T2, R> typeWrapper,
-			CoInvokable<T1, T2, R> functionInvokable) {
-
-		DataStream<R> returnStream = new DataStream<R>(environment, functionName);
-
-		try {
-			jobGraphBuilder.addCoTask(returnStream.getId(), functionInvokable, typeWrapper,
-					functionName, SerializationUtils.serialize(function), degreeOfParallelism);
-		} catch (SerializationException e) {
-			throw new RuntimeException("Cannot serialize user defined function");
-		}
-
-		connectGraph(inputStream1, returnStream.getId(), 1);
-		connectGraph(inputStream2, returnStream.getId(), 2);
-
-		if ((inputStream1 instanceof NamedDataStream) && (inputStream2 instanceof NamedDataStream)) {
-			throw new RuntimeException("An operator cannot have two names");
-		} else {
-			if (inputStream1 instanceof NamedDataStream) {
-				returnStream.name(inputStream1.userDefinedName);
-			}
-
-			if (inputStream2 instanceof NamedDataStream) {
-				returnStream.name(inputStream2.userDefinedName);
-			}
-		}
-		// TODO consider iteration
-
-		return returnStream;
-	}
-
-	/**
-	 * Internal function for assembling the underlying
-	 * {@link org.apache.flink.nephele.jobgraph.JobGraph} of the job. Connects
-	 * the outputs of the given input stream to the specified output stream
-	 * given by the outputID.
-	 * 
-	 * @param inputStream
-	 *            input data stream
-	 * @param outputID
-	 *            ID of the output
-	 * @param typeNumber
-	 *            Number of the type (used at co-functions)
-	 */
-	private <X> void connectGraph(DataStream<X> inputStream, String outputID, int typeNumber) {
-		for (int i = 0; i < inputStream.connectIDs.size(); i++) {
-			String inputID = inputStream.connectIDs.get(i);
-			StreamPartitioner<X> partitioner = inputStream.partitioners.get(i);
-
-			jobGraphBuilder.setEdge(inputID, outputID, partitioner, typeNumber);
-		}
-	}
-
-	/**
-	 * Adds the given sink to this DataStream. Only streams with sinks added
-	 * will be executed once the {@link StreamExecutionEnvironment#execute()}
-	 * method is called.
+	 * Applies a Filter transformation on a {@link DataStream}. The
+	 * transformation calls a {@link RichFilterFunction} for each element of the
+	 * DataStream and retains only those element for which the function returns
+	 * true. Elements for which the function returns false are filtered.
 	 * 
-	 * @param sinkFunction
-	 *            The object containing the sink's invoke function.
-	 * @return The closed DataStream.
+	 * @param filter
+	 *            The RichFilterFunction that is called for each element of the
+	 *            DataSet.
+	 * @return The filtered DataStream.
 	 */
-	public DataStream<T> addSink(SinkFunction<T> sinkFunction) {
-		return addSink(this.copy(), sinkFunction);
+	public StreamOperator<T, T> filter(RichFilterFunction<T> filter) {
+		return addFunction("filter", filter, new FunctionTypeWrapper<T, Tuple, T>(filter,
+				RichFilterFunction.class, 0, -1, 0), new FilterInvokable<T>(filter));
 	}
 
 	/**
@@ -608,32 +444,6 @@ public class DataStream<T> {
 		return returnStream;
 	}
 
-	private DataStream<T> addSink(DataStream<T> inputStream, SinkFunction<T> sinkFunction) {
-		return addSink(inputStream, sinkFunction, new FunctionTypeWrapper<T, Tuple, T>(
-				sinkFunction, SinkFunction.class, 0, -1, 0));
-	}
-
-	private DataStream<T> addSink(DataStream<T> inputStream, SinkFunction<T> sinkFunction,
-			TypeSerializerWrapper<T, Tuple, T> typeWrapper) {
-		DataStream<T> returnStream = new DataStream<T>(environment, "sink");
-
-		try {
-			jobGraphBuilder.addSink(returnStream.getId(), new SinkInvokable<T>(sinkFunction),
-					typeWrapper, "sink", SerializationUtils.serialize(sinkFunction),
-					degreeOfParallelism);
-		} catch (SerializationException e) {
-			throw new RuntimeException("Cannot serialize SinkFunction");
-		}
-
-		inputStream.connectGraph(inputStream, returnStream.getId(), 0);
-
-		if (this.copy() instanceof NamedDataStream) {
-			returnStream.name(inputStream.userDefinedName);
-		}
-
-		return returnStream;
-	}
-
 	/**
 	 * Writes a DataStream to the file specified by path in text format. For
 	 * every element of the DataStream the result of {@link Object#toString()}
@@ -956,10 +766,193 @@ public class DataStream<T> {
 
 	protected <R> DataStream<T> addIterationSource(String iterationID) {
 		DataStream<R> returnStream = new DataStream<R>(environment, "iterationSource");
-
+	
 		jobGraphBuilder.addIterationSource(returnStream.getId(), this.getId(), iterationID,
 				degreeOfParallelism);
-
+	
 		return this.copy();
 	}
+
+	/**
+	 * Internal function for passing the user defined functions to the JobGraph
+	 * of the job.
+	 * 
+	 * @param functionName
+	 *            name of the function
+	 * @param function
+	 *            the user defined function
+	 * @param functionInvokable
+	 *            the wrapping JobVertex instance
+	 * @param <T>
+	 *            type of the input stream
+	 * @param <R>
+	 *            type of the return stream
+	 * @return the data stream constructed
+	 */
+	private <R> StreamOperator<T, R> addFunction(String functionName,
+			final AbstractRichFunction function, TypeSerializerWrapper<T, Tuple, R> typeWrapper,
+			UserTaskInvokable<T, R> functionInvokable) {
+	
+		DataStream<T> inputStream = this.copy();
+		StreamOperator<T, R> returnStream = new StreamOperator<T, R>(environment, functionName);
+	
+		try {
+			jobGraphBuilder.addTask(returnStream.getId(), functionInvokable, typeWrapper,
+					functionName, SerializationUtils.serialize(function), degreeOfParallelism);
+		} catch (SerializationException e) {
+			throw new RuntimeException("Cannot serialize user defined function");
+		}
+	
+		connectGraph(inputStream, returnStream.getId(), 0);
+	
+		if (inputStream instanceof IterativeDataStream) {
+			returnStream.addIterationSource(((IterativeDataStream<T>) inputStream).iterationID
+					.toString());
+		}
+	
+		if (inputStream instanceof NamedDataStream) {
+			returnStream.name(inputStream.userDefinedName);
+		}
+	
+		return returnStream;
+	}
+
+	protected <T1, T2, R> DataStream<R> addCoFunction(String functionName,
+			DataStream<T1> inputStream1, DataStream<T2> inputStream2,
+			final AbstractRichFunction function, TypeSerializerWrapper<T1, T2, R> typeWrapper,
+			CoInvokable<T1, T2, R> functionInvokable) {
+	
+		DataStream<R> returnStream = new DataStream<R>(environment, functionName);
+	
+		try {
+			jobGraphBuilder.addCoTask(returnStream.getId(), functionInvokable, typeWrapper,
+					functionName, SerializationUtils.serialize(function), degreeOfParallelism);
+		} catch (SerializationException e) {
+			throw new RuntimeException("Cannot serialize user defined function");
+		}
+	
+		connectGraph(inputStream1, returnStream.getId(), 1);
+		connectGraph(inputStream2, returnStream.getId(), 2);
+	
+		if ((inputStream1 instanceof NamedDataStream) && (inputStream2 instanceof NamedDataStream)) {
+			throw new RuntimeException("An operator cannot have two names");
+		} else {
+			if (inputStream1 instanceof NamedDataStream) {
+				returnStream.name(inputStream1.userDefinedName);
+			}
+	
+			if (inputStream2 instanceof NamedDataStream) {
+				returnStream.name(inputStream2.userDefinedName);
+			}
+		}
+		// TODO consider iteration
+	
+		return returnStream;
+	}
+
+	/**
+	 * Gives the data transformation(vertex) a user defined name in order to use
+	 * with directed outputs. The {@link OutputSelector} of the input vertex
+	 * should use this name for directed emits.
+	 * 
+	 * @param name
+	 *            The name to set
+	 * @return The named DataStream.
+	 */
+	protected DataStream<T> name(String name) {
+		// TODO copy DataStream?
+		if (name == "") {
+			throw new IllegalArgumentException("User defined name must not be empty string");
+		}
+	
+		userDefinedName = name;
+		jobGraphBuilder.setUserDefinedName(id, name);
+	
+		return this;
+	}
+
+	/**
+	 * Connects two DataStreams
+	 * 
+	 * @param returnStream
+	 *            The other DataStream will connected to this
+	 * @param stream
+	 *            This DataStream will be connected to returnStream
+	 */
+	private void addConnection(DataStream<T> returnStream, DataStream<T> stream) {
+		if ((stream instanceof NamedDataStream) || (returnStream instanceof NamedDataStream)) {
+			if (!returnStream.userDefinedName.equals(stream.userDefinedName)) {
+				throw new RuntimeException("Error: Connected NamedDataStreams must have same names");
+			}
+		}
+		returnStream.connectedStreams.add(stream.copy());
+	}
+
+	private DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
+		DataStream<T> returnStream = this.copy();
+	
+		for (DataStream<T> stream : returnStream.connectedStreams) {
+			stream.partitioner = partitioner;
+		}
+	
+		return returnStream;
+	}
+
+	/**
+	 * Internal function for assembling the underlying
+	 * {@link org.apache.flink.nephele.jobgraph.JobGraph} of the job. Connects
+	 * the outputs of the given input stream to the specified output stream
+	 * given by the outputID.
+	 * 
+	 * @param inputStream
+	 *            input data stream
+	 * @param outputID
+	 *            ID of the output
+	 * @param typeNumber
+	 *            Number of the type (used at co-functions)
+	 */
+	private <X> void connectGraph(DataStream<X> inputStream, String outputID, int typeNumber) {
+		for (DataStream<X> stream : inputStream.connectedStreams) {
+			jobGraphBuilder.setEdge(stream.getId(), outputID, stream.partitioner, typeNumber);
+		}
+	}
+
+	/**
+	 * Adds the given sink to this DataStream. Only streams with sinks added
+	 * will be executed once the {@link StreamExecutionEnvironment#execute()}
+	 * method is called.
+	 * 
+	 * @param sinkFunction
+	 *            The object containing the sink's invoke function.
+	 * @return The closed DataStream.
+	 */
+	public DataStream<T> addSink(SinkFunction<T> sinkFunction) {
+		return addSink(this.copy(), sinkFunction);
+	}
+
+	private DataStream<T> addSink(DataStream<T> inputStream, SinkFunction<T> sinkFunction) {
+		return addSink(inputStream, sinkFunction, new FunctionTypeWrapper<T, Tuple, T>(
+				sinkFunction, SinkFunction.class, 0, -1, 0));
+	}
+
+	private DataStream<T> addSink(DataStream<T> inputStream, SinkFunction<T> sinkFunction,
+			TypeSerializerWrapper<T, Tuple, T> typeWrapper) {
+		DataStream<T> returnStream = new DataStream<T>(environment, "sink");
+	
+		try {
+			jobGraphBuilder.addSink(returnStream.getId(), new SinkInvokable<T>(sinkFunction),
+					typeWrapper, "sink", SerializationUtils.serialize(sinkFunction),
+					degreeOfParallelism);
+		} catch (SerializationException e) {
+			throw new RuntimeException("Cannot serialize SinkFunction");
+		}
+	
+		inputStream.connectGraph(inputStream, returnStream.getId(), 0);
+	
+		if (this.copy() instanceof NamedDataStream) {
+			returnStream.name(inputStream.userDefinedName);
+		}
+	
+		return returnStream;
+	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f932700a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/IterativeDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/IterativeDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/IterativeDataStream.java
index bfce834..d5a5f78 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/IterativeDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/IterativeDataStream.java
@@ -31,12 +31,17 @@ import org.apache.flink.streaming.partitioner.ForwardPartitioner;
 public class IterativeDataStream<T> extends StreamOperator<T, T> {
 
 	static Integer iterationCount = 0;
+	protected Integer iterationID;
 
 	protected IterativeDataStream(DataStream<T> dataStream) {
 		super(dataStream);
 		iterationID = iterationCount;
 		iterationCount++;
-		iterationflag = true;
+	}
+
+	protected IterativeDataStream(DataStream<T> dataStream, Integer iterationID) {
+		super(dataStream);
+		this.iterationID = iterationID;
 	}
 
 	/**
@@ -68,8 +73,7 @@ public class IterativeDataStream<T> extends StreamOperator<T, T> {
 	 *            when used with directed emits
 	 * 
 	 */
-	public <R> DataStream<T> closeWith(DataStream<T> iterationResult,
-			String iterationName) {
+	public <R> DataStream<T> closeWith(DataStream<T> iterationResult, String iterationName) {
 		DataStream<R> returnStream = new DataStream<R>(environment, "iterationSink");
 
 		jobGraphBuilder.addIterationSink(returnStream.getId(), iterationResult.getId(),
@@ -78,11 +82,16 @@ public class IterativeDataStream<T> extends StreamOperator<T, T> {
 		jobGraphBuilder.setIterationSourceParallelism(iterationID.toString(),
 				iterationResult.getParallelism());
 
-		for (int i = 0; i < iterationResult.connectIDs.size(); i++) {
-			String inputID = iterationResult.connectIDs.get(i);
+		for (DataStream<T> stream : iterationResult.connectedStreams) {
+			String inputID = stream.getId();
 			jobGraphBuilder.setEdge(inputID, returnStream.getId(), new ForwardPartitioner<T>(), 0);
 		}
 
 		return iterationResult;
 	}
+
+	@Override
+	protected DataStream<T> copy() {
+		return new IterativeDataStream<T>(this, iterationID);
+	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f932700a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/SplitDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/SplitDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/SplitDataStream.java
index b4bbe52..627aa42 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/SplitDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/SplitDataStream.java
@@ -34,10 +34,9 @@ public class SplitDataStream<T> extends DataStream<T> {
 	 * @return Returns the modified DataStream
 	 */
 	public NamedDataStream<T> select(String outputName) {
-
-		userDefinedName = outputName;
-
-		return new NamedDataStream<T>(this);
+		NamedDataStream<T> returnStream = new NamedDataStream<T>(this);
+		returnStream.userDefinedName = outputName;
+		return returnStream;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f932700a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamOperator.java
index 7edde1c..5ecd930 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamOperator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamOperator.java
@@ -19,8 +19,7 @@
 
 package org.apache.flink.streaming.api;
 
-
-public class StreamOperator<IN, OUT > extends DataStream<OUT> {
+public class StreamOperator<IN, OUT> extends DataStream<OUT> {
 
 	protected StreamOperator(StreamExecutionEnvironment environment, String operatorType) {
 		super(environment, operatorType);
@@ -30,4 +29,9 @@ public class StreamOperator<IN, OUT > extends DataStream<OUT> {
 		super(dataStream);
 	}
 
+	@Override
+	protected DataStream<OUT> copy() {
+		return new StreamOperator<IN, OUT>(this);
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f932700a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
index 074992b..e2991b4 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
@@ -20,6 +20,7 @@
 package org.apache.flink.streaming.api.collector;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 import java.util.Arrays;
 import java.util.Collection;
@@ -38,9 +39,9 @@ public class DirectedOutputTest {
 
 	static HashSet<Long> evenSet = new HashSet<Long>();
 	static HashSet<Long> oddSet = new HashSet<Long>();
-	
+
 	private static class PlusTwo extends RichMapFunction<Long, Long> {
-	
+
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -59,7 +60,7 @@ public class DirectedOutputTest {
 			evenSet.add(tuple);
 		}
 	}
-	
+
 	private static class OddSink extends SinkFunction<Long> {
 
 		private static final long serialVersionUID = 1L;
@@ -69,26 +70,24 @@ public class DirectedOutputTest {
 			oddSet.add(tuple);
 		}
 	}
-	
-	
+
 	private static class MySelector extends OutputSelector<Long> {
-		
+
 		private static final long serialVersionUID = 1L;
 
 		@Override
 		public void select(Long tuple, Collection<String> outputs) {
 			int mod = (int) (tuple % 2);
 			switch (mod) {
-				case 0:
-					outputs.add("ds1");
-					break;
-				case 1:
-					outputs.add("ds2");
-					break;
+			case 0:
+				outputs.add("ds1");
+				break;
+			case 1:
+				outputs.add("ds2");
+				break;
 			}
 		}
 	}
-	
 
 	@SuppressWarnings("unused")
 	@Test
@@ -102,11 +101,41 @@ public class DirectedOutputTest {
 		DataStream<Long> ds3 = s.map(new PlusTwo()).addSink(new OddSink());
 
 		env.execute();
-		
+
 		HashSet<Long> expectedEven = new HashSet<Long>(Arrays.asList(4L, 6L, 8L));
 		HashSet<Long> expectedOdd = new HashSet<Long>(Arrays.asList(3L, 5L, 7L));
-		
+
 		assertEquals(expectedEven, evenSet);
 		assertEquals(expectedOdd, oddSet);
 	}
+
+	@SuppressWarnings({ "unchecked" })
+	@Test
+	public void directNamingTest() {
+		LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+		SplitDataStream<Long> s = env.generateSequence(1, 10).split(new MySelector());
+		try {
+			s.select("ds2").connectWith(s.select("ds1"));
+			fail();
+		} catch (Exception e) {
+			// Exception thrown
+		}
+		try {
+			s.shuffle().connectWith(s.select("ds1"));
+			fail();
+		} catch (Exception e) {
+			// Exception thrown
+		}
+		try {
+			s.select("ds2").connectWith(s);
+			fail();
+		} catch (Exception e) {
+			// Exception thrown
+		}
+		s.connectWith(s);
+		s.select("ds2").connectWith(s.select("ds2"));
+
+	}
 }