You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:26:16 UTC

[39/51] [abbrv] [streaming] API update with more differentiated DataStream types and javadoc + several fixes

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
deleted file mode 100644
index 76adf62..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
+++ /dev/null
@@ -1,410 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.api;
-
-import java.io.Serializable;
-import java.util.Collection;
-
-import org.apache.commons.lang3.SerializationException;
-import org.apache.commons.lang3.SerializationUtils;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.streaming.api.function.source.FileSourceFunction;
-import org.apache.flink.streaming.api.function.source.FileStreamFunction;
-import org.apache.flink.streaming.api.function.source.FromElementsFunction;
-import org.apache.flink.streaming.api.function.source.GenSequenceFunction;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.streaming.api.invokable.SourceInvokable;
-import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
-import org.apache.flink.streaming.util.serialization.ObjectTypeWrapper;
-
-/**
- * {@link ExecutionEnvironment} for streaming jobs. An instance of it is
- * necessary to construct streaming topologies.
- * 
- */
-public abstract class StreamExecutionEnvironment {
-
-	/**
-	 * The environment of the context (local by default, cluster if invoked
-	 * through command line)
-	 */
-	private static StreamExecutionEnvironment contextEnvironment;
-
-	/** flag to disable local executor when using the ContextEnvironment */
-	private static boolean allowLocalExecution = true;
-
-	private static int defaultLocalDop = Runtime.getRuntime().availableProcessors();
-
-	private int degreeOfParallelism = 1;
-
-	private int executionParallelism = -1;
-
-	protected JobGraphBuilder jobGraphBuilder;
-
-	// --------------------------------------------------------------------------------------------
-	// Constructor and Properties
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * Constructor for creating StreamExecutionEnvironment
-	 */
-	protected StreamExecutionEnvironment() {
-		jobGraphBuilder = new JobGraphBuilder("jobGraph");
-	}
-
-	public int getExecutionParallelism() {
-		return executionParallelism == -1 ? degreeOfParallelism : executionParallelism;
-	}
-
-	/**
-	 * Gets the degree of parallelism with which operation are executed by
-	 * default. Operations can individually override this value to use a
-	 * specific degree of parallelism via {@link DataStream#setParallelism}.
-	 * 
-	 * @return The degree of parallelism used by operations, unless they
-	 *         override that value.
-	 */
-	public int getDegreeOfParallelism() {
-		return this.degreeOfParallelism;
-	}
-
-	/**
-	 * Sets the degree of parallelism (DOP) for operations executed through this
-	 * environment. Setting a DOP of x here will cause all operators (such as
-	 * map, batchReduce) to run with x parallel instances. This method overrides
-	 * the default parallelism for this environment. The
-	 * {@link LocalStreamEnvironment} uses by default a value equal to the
-	 * number of hardware contexts (CPU cores / threads). When executing the
-	 * program via the command line client from a JAR file, the default degree
-	 * of parallelism is the one configured for that setup.
-	 * 
-	 * @param degreeOfParallelism
-	 *            The degree of parallelism
-	 */
-	protected void setDegreeOfParallelism(int degreeOfParallelism) {
-		if (degreeOfParallelism < 1) {
-			throw new IllegalArgumentException("Degree of parallelism must be at least one.");
-		}
-		this.degreeOfParallelism = degreeOfParallelism;
-	}
-
-	/**
-	 * Sets the number of hardware contexts (CPU cores / threads) used when
-	 * executed in {@link LocalStreamEnvironment}.
-	 * 
-	 * @param degreeOfParallelism
-	 *            The degree of parallelism in local environment
-	 */
-	public void setExecutionParallelism(int degreeOfParallelism) {
-		if (degreeOfParallelism < 1) {
-			throw new IllegalArgumentException("Degree of parallelism must be at least one.");
-		}
-
-		this.executionParallelism = degreeOfParallelism;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	// Data stream creations
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * Creates a DataStream that represents the Strings produced by reading the
-	 * given file line wise. The file will be read with the system's default
-	 * character set.
-	 * 
-	 * @param filePath
-	 *            The path of the file, as a URI (e.g.,
-	 *            "file:///some/local/file" or "hdfs://host:port/file/path").
-	 * @return The DataStream representing the text file.
-	 */
-	public DataStream<String> readTextFile(String filePath) {
-		return addSource(new FileSourceFunction(filePath), 1);
-	}
-
-	public DataStream<String> readTextFile(String filePath, int parallelism) {
-		return addSource(new FileSourceFunction(filePath), parallelism);
-	}
-
-	/**
-	 * Creates a DataStream that represents the Strings produced by reading the
-	 * given file line wise multiple times(infinite). The file will be read with
-	 * the system's default character set.
-	 * 
-	 * @param filePath
-	 *            The path of the file, as a URI (e.g.,
-	 *            "file:///some/local/file" or "hdfs://host:port/file/path").
-	 * @return The DataStream representing the text file.
-	 */
-	public DataStream<String> readTextStream(String filePath) {
-		return addSource(new FileStreamFunction(filePath), 1);
-	}
-
-	public DataStream<String> readTextStream(String filePath, int parallelism) {
-		return addSource(new FileStreamFunction(filePath), parallelism);
-	}
-
-	/**
-	 * Creates a new DataStream that contains the given elements. The elements
-	 * must all be of the same type, for example, all of the String or Integer.
-	 * The sequence of elements must not be empty. Furthermore, the elements
-	 * must be serializable (as defined in java.io.Serializable), because the
-	 * execution environment may ship the elements into the cluster.
-	 * 
-	 * @param data
-	 *            The collection of elements to create the DataStream from.
-	 * @param <OUT>
-	 *            type of the returned stream
-	 * @return The DataStream representing the elements.
-	 */
-	public <OUT extends Serializable> DataStream<OUT> fromElements(OUT... data) {
-		DataStream<OUT> returnStream = new DataStream<OUT>(this, "elements");
-
-		try {
-			SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
-			jobGraphBuilder.addSource(returnStream.getId(), new SourceInvokable<OUT>(function),
-					new ObjectTypeWrapper<OUT, Tuple, OUT>(data[0], null, data[0]), "source",
-					SerializationUtils.serialize(function), 1);
-		} catch (SerializationException e) {
-			throw new RuntimeException("Cannot serialize elements");
-		}
-		return returnStream;
-	}
-
-	/**
-	 * Creates a DataStream from the given non-empty collection. The type of the
-	 * DataStream is that of the elements in the collection. The elements need
-	 * to be serializable (as defined by java.io.Serializable), because the
-	 * framework may move the elements into the cluster if needed.
-	 * 
-	 * @param data
-	 *            The collection of elements to create the DataStream from.
-	 * @param <OUT>
-	 *            type of the returned stream
-	 * @return The DataStream representing the elements.
-	 */
-	@SuppressWarnings("unchecked")
-	public <OUT extends Serializable> DataStream<OUT> fromCollection(Collection<OUT> data) {
-		DataStream<OUT> returnStream = new DataStream<OUT>(this, "elements");
-
-		if (data.isEmpty()) {
-			throw new RuntimeException("Collection must not be empty");
-		}
-
-		try {
-			SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
-
-			jobGraphBuilder.addSource(
-					returnStream.getId(),
-					new SourceInvokable<OUT>(new FromElementsFunction<OUT>(data)),
-					new ObjectTypeWrapper<OUT, Tuple, OUT>((OUT) data.toArray()[0], null, (OUT) data
-							.toArray()[0]), "source", SerializationUtils.serialize(function), 1);
-		} catch (SerializationException e) {
-			throw new RuntimeException("Cannot serialize collection");
-		}
-
-		return returnStream;
-	}
-
-	/**
-	 * Creates a new DataStream that contains a sequence of numbers.
-	 * 
-	 * @param from
-	 *            The number to start at (inclusive).
-	 * @param to
-	 *            The number to stop at (inclusive)
-	 * @return A DataStrean, containing all number in the [from, to] interval.
-	 */
-	public DataStream<Long> generateSequence(long from, long to) {
-		return addSource(new GenSequenceFunction(from, to), 1);
-	}
-
-	/**
-	 * Ads a data source thus opening a {@link DataStream}.
-	 * 
-	 * @param function
-	 *            the user defined function
-	 * @param parallelism
-	 *            number of parallel instances of the function
-	 * @param <OUT>
-	 *            type of the returned stream
-	 * @return the data stream constructed
-	 */
-	public <OUT> DataStream<OUT> addSource(SourceFunction<OUT> function, int parallelism) {
-		DataStream<OUT> returnStream = new DataStream<OUT>(this, "source");
-
-		try {
-			jobGraphBuilder.addSource(returnStream.getId(), new SourceInvokable<OUT>(function),
-					new FunctionTypeWrapper<OUT, Tuple, OUT>(function, SourceFunction.class, 0, -1, 0),
-					"source", SerializationUtils.serialize(function), parallelism);
-		} catch (SerializationException e) {
-			throw new RuntimeException("Cannot serialize SourceFunction");
-		}
-
-		return returnStream;
-	}
-
-	public <OUT> DataStream<OUT> addSource(SourceFunction<OUT> sourceFunction) {
-		return addSource(sourceFunction, 1);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	// Instantiation of Execution Contexts
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * Creates an execution environment that represents the context in which the
-	 * program is currently executed. If the program is invoked standalone, this
-	 * method returns a local execution environment, as returned by
-	 * {@link #createLocalEnvironment()}.
-	 * 
-	 * @return The execution environment of the context in which the program is
-	 *         executed.
-	 */
-	public static StreamExecutionEnvironment getExecutionEnvironment() {
-		return contextEnvironment == null ? createLocalEnvironment() : contextEnvironment;
-	}
-
-	/**
-	 * Creates a {@link LocalStreamEnvironment}. The local execution environment
-	 * will run the program in a multi-threaded fashion in the same JVM as the
-	 * environment was created in. The default degree of parallelism of the
-	 * local environment is the number of hardware contexts (CPU cores /
-	 * threads), unless it was specified differently by
-	 * {@link #setDegreeOfParallelism(int)}.
-	 * 
-	 * @return A local execution environment.
-	 */
-	public static LocalStreamEnvironment createLocalEnvironment() {
-		return createLocalEnvironment(defaultLocalDop);
-	}
-
-	/**
-	 * Creates a {@link LocalStreamEnvironment}. The local execution environment
-	 * will run the program in a multi-threaded fashion in the same JVM as the
-	 * environment was created in. It will use the degree of parallelism
-	 * specified in the parameter.
-	 * 
-	 * @param degreeOfParallelism
-	 *            The degree of parallelism for the local environment.
-	 * @return A local execution environment with the specified degree of
-	 *         parallelism.
-	 */
-	public static LocalStreamEnvironment createLocalEnvironment(int degreeOfParallelism) {
-		LocalStreamEnvironment lee = new LocalStreamEnvironment();
-		lee.setDegreeOfParallelism(degreeOfParallelism);
-		return lee;
-	}
-
-	// TODO:fix cluster default parallelism
-	/**
-	 * Creates a {@link RemoteStreamEnvironment}. The remote environment sends
-	 * (parts of) the program to a cluster for execution. Note that all file
-	 * paths used in the program must be accessible from the cluster. The
-	 * execution will use no parallelism, unless the parallelism is set
-	 * explicitly via {@link #setDegreeOfParallelism}.
-	 * 
-	 * @param host
-	 *            The host name or address of the master (JobManager), where the
-	 *            program should be executed.
-	 * @param port
-	 *            The port of the master (JobManager), where the program should
-	 *            be executed.
-	 * @param jarFiles
-	 *            The JAR files with code that needs to be shipped to the
-	 *            cluster. If the program uses user-defined functions,
-	 *            user-defined input formats, or any libraries, those must be
-	 *            provided in the JAR files.
-	 * @return A remote environment that executes the program on a cluster.
-	 */
-	public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port,
-			String... jarFiles) {
-		return new RemoteStreamEnvironment(host, port, jarFiles);
-	}
-
-	/**
-	 * Creates a {@link RemoteStreamEnvironment}. The remote environment sends
-	 * (parts of) the program to a cluster for execution. Note that all file
-	 * paths used in the program must be accessible from the cluster. The
-	 * execution will use the specified degree of parallelism.
-	 * 
-	 * @param host
-	 *            The host name or address of the master (JobManager), where the
-	 *            program should be executed.
-	 * @param port
-	 *            The port of the master (JobManager), where the program should
-	 *            be executed.
-	 * @param degreeOfParallelism
-	 *            The degree of parallelism to use during the execution.
-	 * @param jarFiles
-	 *            The JAR files with code that needs to be shipped to the
-	 *            cluster. If the program uses user-defined functions,
-	 *            user-defined input formats, or any libraries, those must be
-	 *            provided in the JAR files.
-	 * @return A remote environment that executes the program on a cluster.
-	 */
-	public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port,
-			int degreeOfParallelism, String... jarFiles) {
-		RemoteStreamEnvironment rec = new RemoteStreamEnvironment(host, port, jarFiles);
-		rec.setDegreeOfParallelism(degreeOfParallelism);
-		return rec;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	// Methods to control the context and local environments for execution from
-	// packaged programs
-	// --------------------------------------------------------------------------------------------
-
-	protected static void initializeContextEnvironment(StreamExecutionEnvironment ctx) {
-		contextEnvironment = ctx;
-	}
-
-	protected static boolean isContextEnvironmentSet() {
-		return contextEnvironment != null;
-	}
-
-	protected static void disableLocalExecution() {
-		allowLocalExecution = false;
-	}
-
-	public static boolean localExecutionIsAllowed() {
-		return allowLocalExecution;
-	}
-
-	/**
-	 * Triggers the program execution. The environment will execute all parts of
-	 * the program that have resulted in a "sink" operation. Sink operations are
-	 * for example printing results or forwarding them to a message queue.
-	 * <p>
-	 * The program execution will be logged and displayed with a generated
-	 * default name.
-	 **/
-	public abstract void execute();
-
-	/**
-	 * Getter of the {@link JobGraphBuilder} of the streaming job.
-	 * 
-	 * @return jobgraph
-	 */
-	public JobGraphBuilder getJobGraphBuilder() {
-		return jobGraphBuilder;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamOperator.java
deleted file mode 100755
index a39823c..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamOperator.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.api;
-
-/**
- * The StreamOperator represents a {@link DataStream} transformed with some user
- * defined operator.
- *
- * @param <OUT>
- *            Output Type of the operator.
- */
-public class StreamOperator<OUT> extends DataStream<OUT> {
-
-	protected StreamOperator(StreamExecutionEnvironment environment, String operatorType) {
-		super(environment, operatorType);
-	}
-
-	protected StreamOperator(DataStream<OUT> dataStream) {
-		super(dataStream);
-	}
-
-	@Override
-	protected DataStream<OUT> copy() {
-		return new StreamOperator<OUT>(this);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/TwoInputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/TwoInputStreamOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/TwoInputStreamOperator.java
deleted file mode 100755
index a459dbf..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/TwoInputStreamOperator.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.api;
-
-/**
- * The TwoInputStreamOperator represents a {@link StreamOperator} with two
- * inputs.
- *
- * @param <IN1>
- *            Type of the first input.
- * 
- * @param <IN2>
- *            Type of the second input.
- * @param <OUT>
- *            Output Type of the operator.
- */
-public class TwoInputStreamOperator<IN1, IN2, OUT> extends StreamOperator<OUT> {
-
-	protected TwoInputStreamOperator(StreamExecutionEnvironment environment, String operatorType) {
-		super(environment, operatorType);
-	}
-
-	protected TwoInputStreamOperator(DataStream<OUT> dataStream) {
-		super(dataStream);
-	}
-
-	@Override
-	protected DataStream<OUT> copy() {
-		return new TwoInputStreamOperator<IN1, IN2, OUT>(this);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
index 73a5749..ced3de7 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
@@ -19,7 +19,9 @@
 
 package org.apache.flink.streaming.api.collector;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -39,6 +41,7 @@ public class DirectedStreamCollector<T> extends StreamCollector<T> {
 
 	OutputSelector<T> outputSelector;
 	private static final Log log = LogFactory.getLog(DirectedStreamCollector.class);
+	private List<RecordWriter<SerializationDelegate<StreamRecord<T>>>> emitted;
 
 	/**
 	 * Creates a new DirectedStreamCollector
@@ -55,6 +58,7 @@ public class DirectedStreamCollector<T> extends StreamCollector<T> {
 			OutputSelector<T> outputSelector) {
 		super(channelID, serializationDelegate);
 		this.outputSelector = outputSelector;
+		this.emitted = new ArrayList<RecordWriter<SerializationDelegate<StreamRecord<T>>>>();
 
 	}
 
@@ -82,11 +86,14 @@ public class DirectedStreamCollector<T> extends StreamCollector<T> {
 		Collection<String> outputNames = outputSelector.getOutputs(streamRecord.getObject());
 		streamRecord.setId(channelID);
 		serializationDelegate.setInstance(streamRecord);
+		emitted.clear();
 		for (String outputName : outputNames) {
 			try {
-				for (RecordWriter<SerializationDelegate<StreamRecord<T>>> output : outputMap
-						.get(outputName)) {
+				RecordWriter<SerializationDelegate<StreamRecord<T>>> output = outputMap
+						.get(outputName);
+				if (!emitted.contains(output)) {
 					output.emit(serializationDelegate);
+					emitted.add(output);
 				}
 			} catch (Exception e) {
 				if (log.isErrorEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
index 798d8fa..17d7e7b 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
@@ -23,13 +23,16 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.datastream.SplitDataStream;
+
 /**
- * Class for defining an OutputSelector for the directTo operator. Every output
- * object of a directed DataStream will run through this operator to select
- * outputs.
+ * Class for defining an OutputSelector for a {@link SplitDataStream} using the
+ * {@link SingleOutputStreamOperator#split} call. Every output object of a
+ * {@link SplitDataStream} will run through this operator to select outputs.
  * 
  * @param <T>
- *            Type parameter of the directed tuples/objects.
+ *            Type parameter of the split values.
  */
 public abstract class OutputSelector<T> implements Serializable {
 	private static final long serialVersionUID = 1L;
@@ -48,8 +51,9 @@ public abstract class OutputSelector<T> implements Serializable {
 
 	/**
 	 * Method for selecting output names for the emitted objects when using the
-	 * directTo operator. The tuple will be emitted only to output names which
-	 * are added to the outputs collection.
+	 * {@link SingleOutputStreamOperator#split} method. The values will be
+	 * emitted only to output names which are added to the outputs collection.
+	 * The outputs collection is cleared automatically after each select call.
 	 * 
 	 * @param value
 	 *            Output object for which the output selection should be made.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
index 4317f75..20c3b78 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
@@ -34,7 +34,7 @@ import org.apache.flink.util.StringUtils;
 
 /**
  * Collector for tuples in Apache Flink stream processing. The collected
- * tuples/obecjts will be wrapped with ID in a {@link StreamRecord} and then
+ * values will be wrapped with ID in a {@link StreamRecord} and then
  * emitted to the outputs.
  * 
  * @param <T>
@@ -47,7 +47,7 @@ public class StreamCollector<T> implements Collector<T> {
 	protected StreamRecord<T> streamRecord;
 	protected int channelID;
 	private List<RecordWriter<SerializationDelegate<StreamRecord<T>>>> outputs;
-	protected Map<String, List<RecordWriter<SerializationDelegate<StreamRecord<T>>>>> outputMap;
+	protected Map<String, RecordWriter<SerializationDelegate<StreamRecord<T>>>> outputMap;
 	protected SerializationDelegate<StreamRecord<T>> serializationDelegate;
 
 	/**
@@ -65,7 +65,7 @@ public class StreamCollector<T> implements Collector<T> {
 		this.streamRecord = new StreamRecord<T>();
 		this.channelID = channelID;
 		this.outputs = new ArrayList<RecordWriter<SerializationDelegate<StreamRecord<T>>>>();
-		this.outputMap = new HashMap<String, List<RecordWriter<SerializationDelegate<StreamRecord<T>>>>>();
+		this.outputMap = new HashMap<String, RecordWriter<SerializationDelegate<StreamRecord<T>>>>();
 	}
 
 	/**
@@ -73,21 +73,19 @@ public class StreamCollector<T> implements Collector<T> {
 	 * 
 	 * @param output
 	 *            The RecordWriter object representing the output.
-	 * @param outputName
-	 *            User defined name of the output.
+	 * @param outputNames
+	 *            User defined names of the output.
 	 */
 	public void addOutput(RecordWriter<SerializationDelegate<StreamRecord<T>>> output,
-			String outputName) {
+			List<String> outputNames) {
 		outputs.add(output);
-		if (outputName != null) {
-			if (outputMap.containsKey(outputName)) {
-				outputMap.get(outputName).add(output);
-			} else {
-				outputMap.put(outputName,
-						new ArrayList<RecordWriter<SerializationDelegate<StreamRecord<T>>>>());
-				outputMap.get(outputName).add(output);
-			}
+		for (String outputName : outputNames) {
+			if (outputName != null) {
+				if (!outputMap.containsKey(outputName)) {
+					outputMap.put(outputName, output);
+				}
 
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoDataStream.java
new file mode 100755
index 0000000..c6cb8af
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoDataStream.java
@@ -0,0 +1,132 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import java.io.Serializable;
+
+import org.apache.commons.lang3.SerializationException;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.JobGraphBuilder;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.co.CoMapFunction;
+import org.apache.flink.streaming.api.function.co.RichCoMapFunction;
+import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
+import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
+import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
+import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
+
+/**
+ * The CoDataStream represents a stream for two different data types. It can be
+ * used to apply transformations like {@link CoMapFunction} on two
+ * {@link DataStream}s
+ *
+ * @param <IN1>
+ *            Type of the first DataSteam.
+ * @param <IN2>
+ *            Type of the second DataStream.
+ */
+public class CoDataStream<IN1, IN2> {
+
+	StreamExecutionEnvironment environment;
+	JobGraphBuilder jobGraphBuilder;
+	DataStream<IN1> input1;
+	DataStream<IN2> input2;
+
+	protected CoDataStream(StreamExecutionEnvironment environment, JobGraphBuilder jobGraphBuilder,
+			DataStream<IN1> input1, DataStream<IN2> input2) {
+		this.jobGraphBuilder = jobGraphBuilder;
+		this.environment = environment;
+		this.input1 = input1.copy();
+		this.input2 = input2.copy();
+	}
+
+	/**
+	 * Returns the first {@link DataStream}.
+	 * 
+	 * @return The first DataStream.
+	 */
+	public DataStream<IN1> getFirst() {
+		return input1.copy();
+	}
+
+	/**
+	 * Returns the second {@link DataStream}.
+	 * 
+	 * @return The second DataStream.
+	 */
+	public DataStream<IN2> getSecond() {
+		return input2.copy();
+	}
+
+	/**
+	 * Applies a CoMap transformation on two separate {@link DataStream}s. The
+	 * transformation calls a {@link CoMapFunction#map1} for each element
+	 * of the first input and {@link CoMapFunction#map2} for each element
+	 * of the second input. Each CoMapFunction call returns exactly one element.
+	 * The user can also extend {@link RichCoMapFunction} to gain access to
+	 * other features provided by the {@link RichFuntion} interface.
+	 * 
+	 * @param coMapper
+	 *            The CoMapFunction used to jointly transform the two input
+	 *            DataStreams
+	 * @return The transformed DataStream
+	 */
+	public <OUT> SingleOutputStreamOperator<OUT, ?> map(CoMapFunction<IN1, IN2, OUT> coMapper) {
+		return addCoFunction("coMap", coMapper, new FunctionTypeWrapper<IN1, IN2, OUT>(coMapper,
+				CoMapFunction.class, 0, 1, 2), new CoMapInvokable<IN1, IN2, OUT>(coMapper));
+	}
+
+	protected <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String functionName,
+			final Function function, TypeSerializerWrapper<IN1, IN2, OUT> typeWrapper,
+			CoInvokable<IN1, IN2, OUT> functionInvokable) {
+
+		@SuppressWarnings({ "unchecked", "rawtypes" })
+		SingleOutputStreamOperator<OUT, ?> returnStream = new SingleOutputStreamOperator(
+				environment, functionName);
+
+		try {
+			input1.jobGraphBuilder.addCoTask(returnStream.getId(), functionInvokable, typeWrapper,
+					functionName, SerializationUtils.serialize((Serializable) function),
+					environment.getDegreeOfParallelism());
+		} catch (SerializationException e) {
+			throw new RuntimeException("Cannot serialize user defined function");
+		}
+
+		input1.connectGraph(input1, returnStream.getId(), 1);
+		input1.connectGraph(input2, returnStream.getId(), 2);
+
+		if ((input1.userDefinedName != null) && (input2.userDefinedName != null)) {
+			throw new RuntimeException("An operator cannot have two names");
+		} else {
+			if (input1.userDefinedName != null) {
+				returnStream.name(input1.getUserDefinedNames());
+			}
+
+			if (input2.userDefinedName != null) {
+				returnStream.name(input2.getUserDefinedNames());
+			}
+		}
+		// TODO consider iteration
+
+		return returnStream;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
new file mode 100755
index 0000000..d17990c
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -0,0 +1,98 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.partitioner.StreamPartitioner;
+
+/**
+ * The ConnectedDataStream represents a DataStream which consists of connected
+ * outputs of DataStreams of the same type. Operators applied on this will
+ * transform all the connected outputs jointly.
+ *
+ * @param <OUT>
+ *            Type of the output.
+ */
+public class ConnectedDataStream<OUT> extends DataStream<OUT> {
+
+	protected List<DataStream<OUT>> connectedStreams;
+
+	protected ConnectedDataStream(StreamExecutionEnvironment environment, String operatorType) {
+		super(environment, operatorType);
+		this.connectedStreams = new ArrayList<DataStream<OUT>>();
+		this.connectedStreams.add(this);
+	}
+
+	protected ConnectedDataStream(DataStream<OUT> dataStream) {
+		super(dataStream);
+		connectedStreams = new ArrayList<DataStream<OUT>>();
+		if (dataStream instanceof ConnectedDataStream) {
+			for (DataStream<OUT> stream : ((ConnectedDataStream<OUT>) dataStream).connectedStreams) {
+				connectedStreams.add(stream);
+			}
+		} else {
+			this.connectedStreams.add(this);
+		}
+
+	}
+
+	// @Override
+	// public IterativeDataStream<OUT> iterate() {
+	// throw new RuntimeException("Cannot iterate connected DataStreams");
+	// }
+
+	protected void addConnection(DataStream<OUT> stream) {
+		if ((stream.userDefinedName != null) || (this.userDefinedName != null)) {
+			if (!this.userDefinedName.equals(stream.userDefinedName)) {
+				throw new RuntimeException("Error: Connected NamedDataStreams must have same names");
+			}
+		}
+		connectedStreams.add(stream.copy());
+	}
+
+	@Override
+	protected List<String> getUserDefinedNames() {
+		List<String> nameList = new ArrayList<String>();
+		for (DataStream<OUT> stream : connectedStreams) {
+			nameList.add(stream.userDefinedName);
+		}
+		return nameList;
+	}
+
+	@Override
+	protected DataStream<OUT> setConnectionType(StreamPartitioner<OUT> partitioner) {
+		ConnectedDataStream<OUT> returnStream = (ConnectedDataStream<OUT>) this.copy();
+
+		for (DataStream<OUT> stream : returnStream.connectedStreams) {
+			stream.partitioner = partitioner;
+		}
+
+		return returnStream;
+	}
+
+	@Override
+	protected ConnectedDataStream<OUT> copy() {
+		return new ConnectedDataStream<OUT>(this);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
new file mode 100644
index 0000000..b692984
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -0,0 +1,852 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.SerializationException;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichFilterFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.JobGraphBuilder;
+import org.apache.flink.streaming.api.collector.OutputSelector;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.api.function.sink.WriteFormatAsCsv;
+import org.apache.flink.streaming.api.function.sink.WriteFormatAsText;
+import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByBatches;
+import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByMillis;
+import org.apache.flink.streaming.api.invokable.SinkInvokable;
+import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
+import org.apache.flink.streaming.api.invokable.operator.BatchReduceInvokable;
+import org.apache.flink.streaming.api.invokable.operator.FilterInvokable;
+import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable;
+import org.apache.flink.streaming.api.invokable.operator.MapInvokable;
+import org.apache.flink.streaming.api.invokable.operator.WindowReduceInvokable;
+import org.apache.flink.streaming.partitioner.BroadcastPartitioner;
+import org.apache.flink.streaming.partitioner.DistributePartitioner;
+import org.apache.flink.streaming.partitioner.FieldsPartitioner;
+import org.apache.flink.streaming.partitioner.ForwardPartitioner;
+import org.apache.flink.streaming.partitioner.ShufflePartitioner;
+import org.apache.flink.streaming.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
+import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
+
+/**
+ * A DataStream represents a stream of elements of the same type. A DataStream
+ * can be transformed into another DataStream by applying a transformation as
+ * for example
+ * <ul>
+ * <li>{@link DataStream#map},</li>
+ * <li>{@link DataStream#filter}, or</li>
+ * <li>{@link DataStream#batchReduce}.</li>
+ * </ul>
+ * 
+ * @param <OUT>
+ *            The type of the DataStream, i.e., the type of the elements of the
+ *            DataStream.
+ */
+public abstract class DataStream<OUT> {
+
+	protected static Integer counter = 0;
+	protected final StreamExecutionEnvironment environment;
+	protected final String id;
+	protected int degreeOfParallelism;
+	protected String userDefinedName;
+	protected StreamPartitioner<OUT> partitioner;
+
+	protected final JobGraphBuilder jobGraphBuilder;
+
+	/**
+	 * Create a new {@link DataStream} in the given execution environment with
+	 * partitioning set to forward by default.
+	 * 
+	 * @param environment
+	 *            StreamExecutionEnvironment
+	 * @param operatorType
+	 *            The type of the operator in the component
+	 */
+	public DataStream(StreamExecutionEnvironment environment, String operatorType) {
+		if (environment == null) {
+			throw new NullPointerException("context is null");
+		}
+
+		// TODO add name based on component number an preferable sequential id
+		counter++;
+		this.id = operatorType + "-" + counter.toString();
+		this.environment = environment;
+		this.degreeOfParallelism = environment.getDegreeOfParallelism();
+		this.jobGraphBuilder = environment.getJobGraphBuilder();
+		this.partitioner = new ForwardPartitioner<OUT>();
+
+	}
+
+	/**
+	 * Create a new DataStream by creating a copy of another DataStream
+	 * 
+	 * @param dataStream
+	 *            The DataStream that will be copied.
+	 */
+	public DataStream(DataStream<OUT> dataStream) {
+		this.environment = dataStream.environment;
+		this.id = dataStream.id;
+		this.degreeOfParallelism = dataStream.degreeOfParallelism;
+		this.userDefinedName = dataStream.userDefinedName;
+		this.partitioner = dataStream.partitioner;
+		this.jobGraphBuilder = dataStream.jobGraphBuilder;
+
+	}
+
+	/**
+	 * Partitioning strategy on the stream.
+	 */
+	public static enum ConnectionType {
+		SHUFFLE, BROADCAST, FIELD, FORWARD, DISTRIBUTE
+	}
+
+	/**
+	 * Returns the ID of the {@link DataStream}.
+	 * 
+	 * @return ID of the DataStream
+	 */
+	public String getId() {
+		return id;
+	}
+
+	/**
+	 * Gets the degree of parallelism for this operator.
+	 * 
+	 * @return The parallelism set for this operator.
+	 */
+	public int getParallelism() {
+		return this.degreeOfParallelism;
+	}
+
+	/**
+	 * Creates a new by connecting {@link DataStream} outputs of the same type
+	 * with each other. The DataStreams connected using this operator will be
+	 * transformed simultaneously.
+	 * 
+	 * @param streams
+	 *            The DataStreams to connect output with.
+	 * @return The {@link ConnectedDataStream}.
+	 */
+	public ConnectedDataStream<OUT> connectWith(DataStream<OUT>... streams) {
+		ConnectedDataStream<OUT> returnStream = new ConnectedDataStream<OUT>(this);
+
+		for (DataStream<OUT> stream : streams) {
+			returnStream.addConnection(stream);
+		}
+		return returnStream;
+	}
+
+	/**
+	 * Creates a new {@link CoDataStream} bye connecting {@link DataStream}
+	 * outputs of different type with each other. The DataStreams connected
+	 * using this operators can be used with CoFunctions.
+	 * 
+	 * @param dataStream
+	 *            The DataStream with which this stream will be joined.
+	 * @return The {@link CoDataStream}.
+	 */
+	public <R> CoDataStream<OUT, R> co(DataStream<R> dataStream) {
+		return new CoDataStream<OUT, R>(environment, jobGraphBuilder, this, dataStream);
+	}
+
+	/**
+	 * Sets the partitioning of the {@link DataStream} so that the output tuples
+	 * are partitioned by their hashcode and are sent to only one component.
+	 * 
+	 * @param keyposition
+	 *            The field used to compute the hashcode.
+	 * @return The DataStream with field partitioning set.
+	 */
+	public DataStream<OUT> partitionBy(int keyposition) {
+		if (keyposition < 0) {
+			throw new IllegalArgumentException("The position of the field must be non-negative");
+		}
+
+		return setConnectionType(new FieldsPartitioner<OUT>(keyposition));
+	}
+
+	/**
+	 * Sets the partitioning of the {@link DataStream} so that the output tuples
+	 * are broadcasted to every parallel instance of the next component.
+	 * 
+	 * @return The DataStream with broadcast partitioning set.
+	 */
+	public DataStream<OUT> broadcast() {
+		return setConnectionType(new BroadcastPartitioner<OUT>());
+	}
+
+	/**
+	 * Sets the partitioning of the {@link DataStream} so that the output tuples
+	 * are shuffled to the next component.
+	 * 
+	 * @return The DataStream with shuffle partitioning set.
+	 */
+	public DataStream<OUT> shuffle() {
+		return setConnectionType(new ShufflePartitioner<OUT>());
+	}
+
+	/**
+	 * Sets the partitioning of the {@link DataStream} so that the output tuples
+	 * are forwarded to the local subtask of the next component. This is the
+	 * default partitioner setting.
+	 * 
+	 * @return The DataStream with shuffle partitioning set.
+	 */
+	public DataStream<OUT> forward() {
+		return setConnectionType(new ForwardPartitioner<OUT>());
+	}
+
+	/**
+	 * Sets the partitioning of the {@link DataStream} so that the output tuples
+	 * are distributed evenly to the next component.
+	 * 
+	 * @return The DataStream with shuffle partitioning set.
+	 */
+	public DataStream<OUT> distribute() {
+		return setConnectionType(new DistributePartitioner<OUT>());
+	}
+
+	/**
+	 * Applies a Map transformation on a {@link DataStream}. The transformation
+	 * calls a {@link MapFunction} for each element of the DataStream. Each
+	 * MapFunction call returns exactly one element. The user can also extend
+	 * {@link RichMapFunction} to gain access to other features provided by the
+	 * {@link RichFuntion} interface.
+	 * 
+	 * @param mapper
+	 *            The MapFunction that is called for each element of the
+	 *            DataStream.
+	 * @param <R>
+	 *            output type
+	 * @return The transformed {@link DataStream}.
+	 */
+	public <R> SingleOutputStreamOperator<R, ?> map(MapFunction<OUT, R> mapper) {
+		return addFunction("map", mapper, new FunctionTypeWrapper<OUT, Tuple, R>(mapper,
+				MapFunction.class, 0, -1, 1), new MapInvokable<OUT, R>(mapper));
+	}
+
+	/**
+	 * Applies a FlatMap transformation on a {@link DataStream}. The
+	 * transformation calls a {@link FlatMapFunction} for each element of the
+	 * DataStream. Each FlatMapFunction call can return any number of elements
+	 * including none. The user can also extend {@link RichFlatMapFunction} to
+	 * gain access to other features provided by the {@link RichFuntion}
+	 * interface.
+	 * 
+	 * @param flatMapper
+	 *            The FlatMapFunction that is called for each element of the
+	 *            DataStream
+	 * 
+	 * @param <R>
+	 *            output type
+	 * @return The transformed {@link DataStream}.
+	 */
+	public <R> SingleOutputStreamOperator<R, ?> flatMap(FlatMapFunction<OUT, R> flatMapper) {
+		return addFunction("flatMap", flatMapper, new FunctionTypeWrapper<OUT, Tuple, R>(
+				flatMapper, FlatMapFunction.class, 0, -1, 1), new FlatMapInvokable<OUT, R>(
+				flatMapper));
+	}
+
+	/**
+	 * Applies a reduce transformation on preset chunks of the DataStream. The
+	 * transformation calls a {@link GroupReduceFunction} for each tuple batch
+	 * of the predefined size. Each GroupReduceFunction call can return any
+	 * number of elements including none. The user can also extend
+	 * {@link RichGroupReduceFunction} to gain access to other features provided
+	 * by the {@link RichFuntion} interface.
+	 * 
+	 * 
+	 * @param reducer
+	 *            The GroupReduceFunction that is called for each tuple batch.
+	 * @param batchSize
+	 *            The number of tuples grouped together in the batch.
+	 * @param <R>
+	 *            output type
+	 * @return The transformed {@link DataStream}.
+	 */
+	public <R> SingleOutputStreamOperator<R, ?> batchReduce(GroupReduceFunction<OUT, R> reducer,
+			int batchSize) {
+		return addFunction("batchReduce", reducer, new FunctionTypeWrapper<OUT, Tuple, R>(reducer,
+				GroupReduceFunction.class, 0, -1, 1), new BatchReduceInvokable<OUT, R>(reducer,
+				batchSize));
+	}
+
+	/**
+	 * Applies a reduce transformation on preset "time" chunks of the
+	 * DataStream. The transformation calls a {@link GroupReduceFunction} on
+	 * records received during the predefined time window. The window shifted
+	 * after each reduce call. Each GroupReduceFunction call can return any
+	 * number of elements including none.The user can also extend
+	 * {@link RichGroupReduceFunction} to gain access to other features provided
+	 * by the {@link RichFuntion} interface.
+	 * 
+	 * 
+	 * @param reducer
+	 *            The GroupReduceFunction that is called for each time window.
+	 * @param windowSize
+	 *            The time window to run the reducer on, in milliseconds.
+	 * @param <R>
+	 *            output type
+	 * @return The transformed DataStream.
+	 */
+	public <R> SingleOutputStreamOperator<R, ?> windowReduce(GroupReduceFunction<OUT, R> reducer,
+			long windowSize) {
+		return addFunction("batchReduce", reducer, new FunctionTypeWrapper<OUT, Tuple, R>(reducer,
+				GroupReduceFunction.class, 0, -1, 1), new WindowReduceInvokable<OUT, R>(reducer,
+				windowSize));
+	}
+
+	/**
+	 * Applies a Filter transformation on a {@link DataStream}. The
+	 * transformation calls a {@link FilterFunction} for each element of the
+	 * DataStream and retains only those element for which the function returns
+	 * true. Elements for which the function returns false are filtered. The
+	 * user can also extend {@link RichFilterFunction} to gain access to other
+	 * features provided by the {@link RichFuntion} interface.
+	 * 
+	 * @param filter
+	 *            The FilterFunction that is called for each element of the
+	 *            DataSet.
+	 * @return The filtered DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> filter(FilterFunction<OUT> filter) {
+		return addFunction("filter", filter, new FunctionTypeWrapper<OUT, Tuple, OUT>(filter,
+				FilterFunction.class, 0, -1, 0), new FilterInvokable<OUT>(filter));
+	}
+
+	/**
+	 * Writes a DataStream to the standard output stream (stdout). For each
+	 * element of the DataStream the result of {@link Object#toString()} is
+	 * written.
+	 * 
+	 * @return The closed DataStream.
+	 */
+	public DataStream<OUT> print() {
+		DataStream<OUT> inputStream = this.copy();
+		PrintSinkFunction<OUT> printFunction = new PrintSinkFunction<OUT>();
+		DataStream<OUT> returnStream = addSink(inputStream, printFunction, null);
+
+		jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
+
+		return returnStream;
+	}
+
+	/**
+	 * Writes a DataStream to the file specified by path in text format. For
+	 * every element of the DataStream the result of {@link Object#toString()}
+	 * is written.
+	 * 
+	 * @param path
+	 *            is the path to the location where the tuples are written
+	 * 
+	 * @return The closed DataStream
+	 */
+	public DataStream<OUT> writeAsText(String path) {
+		return writeAsText(this, path, new WriteFormatAsText<OUT>(), 1, null);
+	}
+
+	/**
+	 * Writes a DataStream to the file specified by path in text format. The
+	 * writing is performed periodically, in every millis milliseconds. For
+	 * every element of the DataStream the result of {@link Object#toString()}
+	 * is written.
+	 * 
+	 * @param path
+	 *            is the path to the location where the tuples are written
+	 * @param millis
+	 *            is the file update frequency
+	 * 
+	 * @return The closed DataStream
+	 */
+	public DataStream<OUT> writeAsText(String path, long millis) {
+		return writeAsText(this, path, new WriteFormatAsText<OUT>(), millis, null);
+	}
+
+	/**
+	 * Writes a DataStream to the file specified by path in text format. The
+	 * writing is performed periodically in equally sized batches. For every
+	 * element of the DataStream the result of {@link Object#toString()} is
+	 * written.
+	 * 
+	 * @param path
+	 *            is the path to the location where the tuples are written
+	 * @param batchSize
+	 *            is the size of the batches, i.e. the number of tuples written
+	 *            to the file at a time
+	 * 
+	 * @return The closed DataStream
+	 */
+	public DataStream<OUT> writeAsText(String path, int batchSize) {
+		return writeAsText(this, path, new WriteFormatAsText<OUT>(), batchSize, null);
+	}
+
+	/**
+	 * Writes a DataStream to the file specified by path in text format. The
+	 * writing is performed periodically, in every millis milliseconds. For
+	 * every element of the DataStream the result of {@link Object#toString()}
+	 * is written.
+	 * 
+	 * @param path
+	 *            is the path to the location where the tuples are written
+	 * @param millis
+	 *            is the file update frequency
+	 * @param endTuple
+	 *            is a special tuple indicating the end of the stream. If an
+	 *            endTuple is caught, the last pending batch of tuples will be
+	 *            immediately appended to the target file regardless of the
+	 *            system time.
+	 * 
+	 * @return The closed DataStream
+	 */
+	public DataStream<OUT> writeAsText(String path, long millis, OUT endTuple) {
+		return writeAsText(this, path, new WriteFormatAsText<OUT>(), millis, endTuple);
+	}
+
+	/**
+	 * Writes a DataStream to the file specified by path in text format. The
+	 * writing is performed periodically in equally sized batches. For every
+	 * element of the DataStream the result of {@link Object#toString()} is
+	 * written.
+	 * 
+	 * @param path
+	 *            is the path to the location where the tuples are written
+	 * @param batchSize
+	 *            is the size of the batches, i.e. the number of tuples written
+	 *            to the file at a time
+	 * @param endTuple
+	 *            is a special tuple indicating the end of the stream. If an
+	 *            endTuple is caught, the last pending batch of tuples will be
+	 *            immediately appended to the target file regardless of the
+	 *            batchSize.
+	 * 
+	 * @return The closed DataStream
+	 */
+	public DataStream<OUT> writeAsText(String path, int batchSize, OUT endTuple) {
+		return writeAsText(this, path, new WriteFormatAsText<OUT>(), batchSize, endTuple);
+	}
+
+	/**
+	 * Writes a DataStream to the file specified by path in text format. The
+	 * writing is performed periodically, in every millis milliseconds. For
+	 * every element of the DataStream the result of {@link Object#toString()}
+	 * is written.
+	 * 
+	 * @param path
+	 *            is the path to the location where the tuples are written
+	 * @param millis
+	 *            is the file update frequency
+	 * @param endTuple
+	 *            is a special tuple indicating the end of the stream. If an
+	 *            endTuple is caught, the last pending batch of tuples will be
+	 *            immediately appended to the target file regardless of the
+	 *            system time.
+	 * 
+	 * @return the data stream constructed
+	 */
+	private DataStream<OUT> writeAsText(DataStream<OUT> inputStream, String path,
+			WriteFormatAsText<OUT> format, long millis, OUT endTuple) {
+		DataStream<OUT> returnStream = addSink(inputStream, new WriteSinkFunctionByMillis<OUT>(
+				path, format, millis, endTuple), null);
+		jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
+		jobGraphBuilder.setMutability(returnStream.getId(), false);
+		return returnStream;
+	}
+
+	/**
+	 * Writes a DataStream to the file specified by path in text format. The
+	 * writing is performed periodically in equally sized batches. For every
+	 * element of the DataStream the result of {@link Object#toString()} is
+	 * written.
+	 * 
+	 * @param path
+	 *            is the path to the location where the tuples are written
+	 * @param batchSize
+	 *            is the size of the batches, i.e. the number of tuples written
+	 *            to the file at a time
+	 * @param endTuple
+	 *            is a special tuple indicating the end of the stream. If an
+	 *            endTuple is caught, the last pending batch of tuples will be
+	 *            immediately appended to the target file regardless of the
+	 *            batchSize.
+	 * 
+	 * @return the data stream constructed
+	 */
+	private DataStream<OUT> writeAsText(DataStream<OUT> inputStream, String path,
+			WriteFormatAsText<OUT> format, int batchSize, OUT endTuple) {
+		DataStream<OUT> returnStream = addSink(inputStream, new WriteSinkFunctionByBatches<OUT>(
+				path, format, batchSize, endTuple), null);
+		jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
+		jobGraphBuilder.setMutability(returnStream.getId(), false);
+		return returnStream;
+	}
+
+	/**
+	 * Writes a DataStream to the file specified by path in text format. For
+	 * every element of the DataStream the result of {@link Object#toString()}
+	 * is written.
+	 * 
+	 * @param path
+	 *            is the path to the location where the tuples are written
+	 * 
+	 * @return The closed DataStream
+	 */
+	public DataStream<OUT> writeAsCsv(String path) {
+		return writeAsCsv(this, path, new WriteFormatAsCsv<OUT>(), 1, null);
+	}
+
+	/**
+	 * Writes a DataStream to the file specified by path in text format. The
+	 * writing is performed periodically, in every millis milliseconds. For
+	 * every element of the DataStream the result of {@link Object#toString()}
+	 * is written.
+	 * 
+	 * @param path
+	 *            is the path to the location where the tuples are written
+	 * @param millis
+	 *            is the file update frequency
+	 * 
+	 * @return The closed DataStream
+	 */
+	public DataStream<OUT> writeAsCsv(String path, long millis) {
+		return writeAsCsv(this, path, new WriteFormatAsCsv<OUT>(), millis, null);
+	}
+
+	/**
+	 * Writes a DataStream to the file specified by path in text format. The
+	 * writing is performed periodically in equally sized batches. For every
+	 * element of the DataStream the result of {@link Object#toString()} is
+	 * written.
+	 * 
+	 * @param path
+	 *            is the path to the location where the tuples are written
+	 * @param batchSize
+	 *            is the size of the batches, i.e. the number of tuples written
+	 *            to the file at a time
+	 * 
+	 * @return The closed DataStream
+	 */
+	public DataStream<OUT> writeAsCsv(String path, int batchSize) {
+		return writeAsCsv(this, path, new WriteFormatAsCsv<OUT>(), batchSize, null);
+	}
+
+	/**
+	 * Writes a DataStream to the file specified by path in text format. The
+	 * writing is performed periodically, in every millis milliseconds. For
+	 * every element of the DataStream the result of {@link Object#toString()}
+	 * is written.
+	 * 
+	 * @param path
+	 *            is the path to the location where the tuples are written
+	 * @param millis
+	 *            is the file update frequency
+	 * @param endTuple
+	 *            is a special tuple indicating the end of the stream. If an
+	 *            endTuple is caught, the last pending batch of tuples will be
+	 *            immediately appended to the target file regardless of the
+	 *            system time.
+	 * 
+	 * @return The closed DataStream
+	 */
+	public DataStream<OUT> writeAsCsv(String path, long millis, OUT endTuple) {
+		return writeAsCsv(this, path, new WriteFormatAsCsv<OUT>(), millis, endTuple);
+	}
+
+	/**
+	 * Writes a DataStream to the file specified by path in text format. The
+	 * writing is performed periodically in equally sized batches. For every
+	 * element of the DataStream the result of {@link Object#toString()} is
+	 * written.
+	 * 
+	 * @param path
+	 *            is the path to the location where the tuples are written
+	 * @param batchSize
+	 *            is the size of the batches, i.e. the number of tuples written
+	 *            to the file at a time
+	 * @param endTuple
+	 *            is a special tuple indicating the end of the stream. If an
+	 *            endTuple is caught, the last pending batch of tuples will be
+	 *            immediately appended to the target file regardless of the
+	 *            batchSize.
+	 * 
+	 * @return The closed DataStream
+	 */
+	public DataStream<OUT> writeAsCsv(String path, int batchSize, OUT endTuple) {
+		if (this instanceof SingleOutputStreamOperator) {
+			((SingleOutputStreamOperator<?, ?>) this).setMutability(false);
+		}
+		return writeAsCsv(this, path, new WriteFormatAsCsv<OUT>(), batchSize, endTuple);
+	}
+
+	/**
+	 * Writes a DataStream to the file specified by path in csv format. The
+	 * writing is performed periodically, in every millis milliseconds. For
+	 * every element of the DataStream the result of {@link Object#toString()}
+	 * is written.
+	 * 
+	 * @param path
+	 *            is the path to the location where the tuples are written
+	 * @param millis
+	 *            is the file update frequency
+	 * @param endTuple
+	 *            is a special tuple indicating the end of the stream. If an
+	 *            endTuple is caught, the last pending batch of tuples will be
+	 *            immediately appended to the target file regardless of the
+	 *            system time.
+	 * 
+	 * @return the data stream constructed
+	 */
+	private DataStream<OUT> writeAsCsv(DataStream<OUT> inputStream, String path,
+			WriteFormatAsCsv<OUT> format, long millis, OUT endTuple) {
+		DataStream<OUT> returnStream = addSink(inputStream, new WriteSinkFunctionByMillis<OUT>(
+				path, format, millis, endTuple));
+		jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
+		jobGraphBuilder.setMutability(returnStream.getId(), false);
+		return returnStream;
+	}
+
+	/**
+	 * Writes a DataStream to the file specified by path in csv format. The
+	 * writing is performed periodically in equally sized batches. For every
+	 * element of the DataStream the result of {@link Object#toString()} is
+	 * written.
+	 * 
+	 * @param path
+	 *            is the path to the location where the tuples are written
+	 * @param batchSize
+	 *            is the size of the batches, i.e. the number of tuples written
+	 *            to the file at a time
+	 * @param endTuple
+	 *            is a special tuple indicating the end of the stream. If an
+	 *            endTuple is caught, the last pending batch of tuples will be
+	 *            immediately appended to the target file regardless of the
+	 *            batchSize.
+	 * 
+	 * @return the data stream constructed
+	 */
+	private DataStream<OUT> writeAsCsv(DataStream<OUT> inputStream, String path,
+			WriteFormatAsCsv<OUT> format, int batchSize, OUT endTuple) {
+		DataStream<OUT> returnStream = addSink(inputStream, new WriteSinkFunctionByBatches<OUT>(
+				path, format, batchSize, endTuple), null);
+		jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
+		jobGraphBuilder.setMutability(returnStream.getId(), false);
+		return returnStream;
+	}
+
+	/**
+	 * Initiates an iterative part of the program that executes multiple times
+	 * and feeds back data streams. The iterative part needs to be closed by
+	 * calling {@link IterativeDataStream#closeWith(DataStream)}. The
+	 * transformation of this IterativeDataStream will be the iteration head.
+	 * The data stream given to the {@code closeWith(DataStream)} method is the
+	 * data stream that will be fed back and used as the input for the iteration
+	 * head. Unlike in batch processing by default the output of the iteration
+	 * stream is directed to both to the iteration head and the next component.
+	 * To direct tuples to the iteration head or the output specifically one can
+	 * use the {@code split(OutputSelector)} on the iteration tail while
+	 * referencing the iteration head as 'iterate'.
+	 * 
+	 * The iteration edge will be partitioned the same way as the first input of
+	 * the iteration head.
+	 * 
+	 * @return The iterative data stream created.
+	 */
+	public IterativeDataStream<OUT> iterate() {
+		return new IterativeDataStream<OUT>(this);
+	}
+
+	protected <R> DataStream<OUT> addIterationSource(String iterationID) {
+
+		DataStream<R> returnStream = new DataStreamSource<R>(environment, "iterationSource");
+
+		jobGraphBuilder.addIterationSource(returnStream.getId(), this.getId(), iterationID,
+				degreeOfParallelism);
+
+		return this.copy();
+	}
+
+	/**
+	 * Internal function for passing the user defined functions to the JobGraph
+	 * of the job.
+	 * 
+	 * @param functionName
+	 *            name of the function
+	 * @param function
+	 *            the user defined function
+	 * @param functionInvokable
+	 *            the wrapping JobVertex instance
+	 * @param <R>
+	 *            type of the return stream
+	 * @return the data stream constructed
+	 */
+	private <R> SingleOutputStreamOperator<R, ?> addFunction(String functionName,
+			final Function function, TypeSerializerWrapper<OUT, Tuple, R> typeWrapper,
+			UserTaskInvokable<OUT, R> functionInvokable) {
+
+		DataStream<OUT> inputStream = this.copy();
+		@SuppressWarnings({ "unchecked", "rawtypes" })
+		SingleOutputStreamOperator<R, ?> returnStream = new SingleOutputStreamOperator(environment,
+				functionName);
+
+		try {
+			jobGraphBuilder.addTask(returnStream.getId(), functionInvokable, typeWrapper,
+					functionName, SerializationUtils.serialize((Serializable) function),
+					degreeOfParallelism);
+		} catch (SerializationException e) {
+			throw new RuntimeException("Cannot serialize user defined function");
+		}
+
+		connectGraph(inputStream, returnStream.getId(), 0);
+
+		if (inputStream instanceof IterativeDataStream) {
+			returnStream.addIterationSource(((IterativeDataStream<OUT>) inputStream).iterationID
+					.toString());
+		}
+
+		if (userDefinedName != null) {
+			returnStream.name(getUserDefinedNames());
+		}
+
+		return returnStream;
+	}
+
+	protected List<String> getUserDefinedNames() {
+		List<String> nameList = new ArrayList<String>();
+		nameList.add(userDefinedName);
+		return nameList;
+	}
+
+	/**
+	 * Gives the data transformation(vertex) a user defined name in order to use
+	 * with directed outputs. The {@link OutputSelector} of the input vertex
+	 * should use this name for directed emits.
+	 * 
+	 * @param name
+	 *            The name to set
+	 * @return The named DataStream.
+	 */
+	protected DataStream<OUT> name(List<String> name) {
+
+		userDefinedName = name.get(0);
+		jobGraphBuilder.setUserDefinedName(id, name);
+
+		return this;
+	}
+
+	/**
+	 * Internal function for setting the partitioner for the DataStream
+	 * 
+	 * @param partitioner
+	 *            Partitioner to set.
+	 * @return The modified DataStream.
+	 */
+	protected DataStream<OUT> setConnectionType(StreamPartitioner<OUT> partitioner) {
+		DataStream<OUT> returnStream = this.copy();
+
+		returnStream.partitioner = partitioner;
+
+		return returnStream;
+	}
+
+	/**
+	 * Internal function for assembling the underlying
+	 * {@link org.apache.flink.nephele.jobgraph.JobGraph} of the job. Connects
+	 * the outputs of the given input stream to the specified output stream
+	 * given by the outputID.
+	 * 
+	 * @param inputStream
+	 *            input data stream
+	 * @param outputID
+	 *            ID of the output
+	 * @param typeNumber
+	 *            Number of the type (used at co-functions)
+	 */
+	protected <X> void connectGraph(DataStream<X> inputStream, String outputID, int typeNumber) {
+		if (inputStream instanceof ConnectedDataStream) {
+			for (DataStream<X> stream : ((ConnectedDataStream<X>) inputStream).connectedStreams) {
+				jobGraphBuilder.setEdge(stream.getId(), outputID, stream.partitioner, typeNumber);
+			}
+		} else {
+			jobGraphBuilder.setEdge(inputStream.getId(), outputID, inputStream.partitioner,
+					typeNumber);
+		}
+
+	}
+
+	/**
+	 * Adds the given sink to this DataStream. Only streams with sinks added
+	 * will be executed once the {@link StreamExecutionEnvironment#execute()}
+	 * method is called.
+	 * 
+	 * @param sinkFunction
+	 *            The object containing the sink's invoke function.
+	 * @return The closed DataStream.
+	 */
+	public DataStream<OUT> addSink(SinkFunction<OUT> sinkFunction) {
+		return addSink(this.copy(), sinkFunction);
+	}
+
+	private DataStream<OUT> addSink(DataStream<OUT> inputStream, SinkFunction<OUT> sinkFunction) {
+		return addSink(inputStream, sinkFunction, new FunctionTypeWrapper<OUT, Tuple, OUT>(
+				sinkFunction, SinkFunction.class, 0, -1, 0));
+	}
+
+	private DataStream<OUT> addSink(DataStream<OUT> inputStream, SinkFunction<OUT> sinkFunction,
+			TypeSerializerWrapper<OUT, Tuple, OUT> typeWrapper) {
+		DataStream<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink");
+
+		try {
+			jobGraphBuilder.addSink(returnStream.getId(), new SinkInvokable<OUT>(sinkFunction),
+					typeWrapper, "sink", SerializationUtils.serialize(sinkFunction),
+					degreeOfParallelism);
+		} catch (SerializationException e) {
+			throw new RuntimeException("Cannot serialize SinkFunction");
+		}
+
+		inputStream.connectGraph(inputStream, returnStream.getId(), 0);
+
+		if (this.copy().userDefinedName != null) {
+			returnStream.name(getUserDefinedNames());
+		}
+
+		return returnStream;
+	}
+
+	/**
+	 * Creates a copy of the {@link DataStream}
+	 * 
+	 * @return The copy
+	 */
+	protected abstract DataStream<OUT> copy();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
new file mode 100755
index 0000000..ee6502f
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
@@ -0,0 +1,45 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * Represents the end of a DataStream.
+ *
+ * @param <IN>
+ *            The type of the DataStream closed by the sink.
+ */
+public class DataStreamSink<IN> extends DataStream<IN> {
+
+	protected DataStreamSink(StreamExecutionEnvironment environment, String operatorType) {
+		super(environment, operatorType);
+	}
+
+	protected DataStreamSink(DataStream<IN> dataStream) {
+		super(dataStream);
+	}
+
+	@Override
+	protected DataStream<IN> copy() {
+		throw new RuntimeException("Data stream sinks cannot be copied");
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
new file mode 100755
index 0000000..f939851
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
@@ -0,0 +1,39 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * The DataStreamSource represents the starting point of a DataStream.
+ *
+ * @param <OUT>
+ *            Type of the DataStream created.
+ */
+public class DataStreamSource<OUT> extends SingleOutputStreamOperator<OUT, DataStreamSource<OUT>> {
+
+	public DataStreamSource(StreamExecutionEnvironment environment, String operatorType) {
+		super(environment, operatorType);
+	}
+
+	public DataStreamSource(DataStream<OUT> dataStream) {
+		super(dataStream);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
new file mode 100644
index 0000000..b9aadcd
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
@@ -0,0 +1,103 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.streaming.partitioner.ForwardPartitioner;
+
+/**
+ * The iterative data stream represents the start of an iteration in a
+ * {@link DataStream}.
+ * 
+ * @param <T>
+ *            Type of the DataStream
+ */
+public class IterativeDataStream<T> extends SingleOutputStreamOperator<T, IterativeDataStream<T>> {
+
+	static Integer iterationCount = 0;
+	protected Integer iterationID;
+
+	protected IterativeDataStream(DataStream<T> dataStream) {
+		super(dataStream);
+		iterationID = iterationCount;
+		iterationCount++;
+	}
+
+	protected IterativeDataStream(DataStream<T> dataStream, Integer iterationID) {
+		super(dataStream);
+		this.iterationID = iterationID;
+	}
+
+	/**
+	 * Closes the iteration. This method defines the end of the iterative
+	 * program part. By default the DataStream represented by the parameter will
+	 * be fed back to the iteration head, however the user can explicitly select
+	 * which tuples should be iterated by {@code directTo(OutputSelector)}.
+	 * Tuples directed to 'iterate' will be fed back to the iteration head.
+	 * 
+	 * @param iterationResult
+	 *            The data stream that can be fed back to the next iteration.
+	 * 
+	 */
+	public DataStream<T> closeWith(DataStream<T> iterationResult) {
+		return closeWith(iterationResult, null);
+	}
+
+	/**
+	 * Closes the iteration. This method defines the end of the iterative
+	 * program part. By default the DataStream represented by the parameter will
+	 * be fed back to the iteration head, however the user can explicitly select
+	 * which tuples should be iterated by {@code directTo(OutputSelector)}.
+	 * Tuples directed to 'iterate' will be fed back to the iteration head.
+	 * 
+	 * @param iterationTail
+	 *            The data stream that can be fed back to the next iteration.
+	 * @param iterationName
+	 *            Name of the iteration edge (backward edge to iteration head)
+	 *            when used with directed emits
+	 * 
+	 */
+	public <R> DataStream<T> closeWith(DataStream<T> iterationTail, String iterationName) {
+		DataStream<R> returnStream = new DataStreamSink<R>(environment, "iterationSink");
+
+		jobGraphBuilder.addIterationSink(returnStream.getId(), iterationTail.getId(),
+				iterationID.toString(), iterationTail.getParallelism(), iterationName);
+
+		jobGraphBuilder.setIterationSourceParallelism(iterationID.toString(),
+				iterationTail.getParallelism());
+
+		if (iterationTail instanceof ConnectedDataStream) {
+			for (DataStream<T> stream : ((ConnectedDataStream<T>) iterationTail).connectedStreams) {
+				String inputID = stream.getId();
+				jobGraphBuilder.setEdge(inputID, returnStream.getId(), new ForwardPartitioner<T>(),
+						0);
+			}
+		} else {
+			jobGraphBuilder.setEdge(iterationTail.getId(), returnStream.getId(),
+					new ForwardPartitioner<T>(), 0);
+		}
+
+		return iterationTail;
+	}
+
+	@Override
+	protected IterativeDataStream<T> copy() {
+		return new IterativeDataStream<T>(this, iterationID);
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
new file mode 100755
index 0000000..9af4dc8
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -0,0 +1,148 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.commons.lang3.SerializationException;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.flink.streaming.api.collector.OutputSelector;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * The SingleOutputStreamOperator represents a user defined transformation
+ * applied on a {@link DataStream} with one predefined output type.
+ *
+ * @param <OUT>
+ *            Output type of the operator.
+ * @param <O>
+ *            Type of the operator.
+ */
+public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperator<OUT, O>> extends
+		DataStream<OUT> {
+
+	protected SingleOutputStreamOperator(StreamExecutionEnvironment environment, String operatorType) {
+		super(environment, operatorType);
+	}
+
+	protected SingleOutputStreamOperator(DataStream<OUT> dataStream) {
+		super(dataStream);
+	}
+
+	/**
+	 * Sets the degree of parallelism for this operator. The degree must be 1 or
+	 * more.
+	 * 
+	 * @param dop
+	 *            The degree of parallelism for this operator.
+	 * @return The operator with set degree of parallelism.
+	 */
+	public SingleOutputStreamOperator<OUT, O> setParallelism(int dop) {
+		if (dop < 1) {
+			throw new IllegalArgumentException("The parallelism of an operator must be at least 1.");
+		}
+		this.degreeOfParallelism = dop;
+
+		jobGraphBuilder.setParallelism(id, degreeOfParallelism);
+
+		return this;
+	}
+
+	/**
+	 * Sets the mutability of the operator. If the operator is set to mutable,
+	 * the tuples received in the user defined functions, will be reused after
+	 * the function call. Setting an operator to mutable reduces garbage
+	 * collection overhead and thus increases scalability. Please note that if a
+	 * {@link DataStream#batchReduce} or {@link DataStream#windowReduce} is used
+	 * as mutable, the user can only iterate through the iterator once in every
+	 * invoke.
+	 * 
+	 * @param isMutable
+	 *            The mutability of the operator.
+	 * @return The operator with mutability set.
+	 */
+	public DataStream<OUT> setMutability(boolean isMutable) {
+		jobGraphBuilder.setMutability(id, isMutable);
+		return this;
+	}
+
+	/**
+	 * Sets the maximum time frequency (ms) for the flushing of the output
+	 * buffer. By default the output buffers flush only when they are full.
+	 * 
+	 * @param timeoutMillis
+	 *            The maximum time between two output flushes.
+	 * @return The operator with buffer timeout set.
+	 */
+	public DataStream<OUT> setBufferTimeout(long timeoutMillis) {
+		jobGraphBuilder.setBufferTimeout(id, timeoutMillis);
+		return this;
+	}
+
+	/**
+	 * Operator used for directing tuples to specific named outputs using an
+	 * {@link OutputSelector}. Calling this method on an operator creates a new
+	 * {@link SplitDataStream}.
+	 * 
+	 * @param outputSelector
+	 *            The user defined {@link OutputSelector} for directing the
+	 *            tuples.
+	 * @return The {@link SplitDataStream}
+	 */
+	public SplitDataStream<OUT> split(OutputSelector<OUT> outputSelector) {
+		try {
+			jobGraphBuilder.setOutputSelector(id, SerializationUtils.serialize(outputSelector));
+
+		} catch (SerializationException e) {
+			throw new RuntimeException("Cannot serialize OutputSelector");
+		}
+
+		return new SplitDataStream<OUT>(this);
+	}
+
+	@SuppressWarnings("unchecked")
+	public SingleOutputStreamOperator<OUT, O> partitionBy(int keyposition) {
+		return (SingleOutputStreamOperator<OUT, O>) super.partitionBy(keyposition);
+	}
+
+	@SuppressWarnings("unchecked")
+	public SingleOutputStreamOperator<OUT, O> broadcast() {
+		return (SingleOutputStreamOperator<OUT, O>) super.broadcast();
+	}
+
+	@SuppressWarnings("unchecked")
+	public SingleOutputStreamOperator<OUT, O> shuffle() {
+		return (SingleOutputStreamOperator<OUT, O>) super.shuffle();
+	}
+
+	@SuppressWarnings("unchecked")
+	public SingleOutputStreamOperator<OUT, O> forward() {
+		return (SingleOutputStreamOperator<OUT, O>) super.forward();
+	}
+
+	@SuppressWarnings("unchecked")
+	public SingleOutputStreamOperator<OUT, O> distribute() {
+		return (SingleOutputStreamOperator<OUT, O>) super.distribute();
+	}
+
+	@Override
+	protected SingleOutputStreamOperator<OUT, O> copy() {
+		return new SingleOutputStreamOperator<OUT, O>(this);
+	}
+
+}