You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:26:01 UTC

[24/51] [abbrv] git commit: [streaming] Wrapped serializers to make component construction simpler

[streaming] Wrapped serializers to make component construction simpler


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/799424d1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/799424d1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/799424d1

Branch: refs/heads/master
Commit: 799424d1a3237a8d1b32451f4b3ad23b342cf59e
Parents: 2f704ae
Author: ghermann <re...@gmail.com>
Authored: Tue Jul 29 20:00:11 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 16:22:09 2014 +0200

----------------------------------------------------------------------
 .../apache/flink/streaming/api/DataStream.java  | 331 +++++++++++++---
 .../streaming/api/IterativeDataStream.java      |  21 +-
 .../flink/streaming/api/StreamConfig.java       |  87 +++--
 .../api/StreamExecutionEnvironment.java         | 380 ++-----------------
 .../api/function/source/FileSourceFunction.java |   6 +-
 .../api/function/source/FileStreamFunction.java |  10 +-
 .../api/function/source/SourceFunction.java     |   7 +-
 .../api/invokable/SourceInvokable.java          |  44 +++
 .../api/invokable/StreamComponentInvokable.java |  10 +-
 .../api/invokable/StreamRecordInvokable.java    |   7 +-
 .../api/invokable/UserSourceInvokable.java      |  34 --
 .../operator/StreamReduceInvokable.java         |   1 -
 .../api/invokable/operator/co/CoInvokable.java  |   2 +-
 .../AbstractStreamComponent.java                |  78 ++--
 .../streamcomponent/BlockingQueueBroker.java    |   5 +-
 .../api/streamcomponent/CoStreamTask.java       |  36 +-
 .../SingleInputAbstractStreamComponent.java     |  50 +--
 .../streamcomponent/StreamIterationSink.java    |   8 +-
 .../streamcomponent/StreamIterationSource.java  |   6 +-
 .../api/streamcomponent/StreamSink.java         |  16 +-
 .../api/streamcomponent/StreamSource.java       |  17 +-
 .../api/streamcomponent/StreamTask.java         |  17 +-
 .../util/serialization/FunctionTypeWrapper.java |  74 ++++
 .../util/serialization/ObjectTypeWrapper.java   |  61 +++
 .../serialization/TypeSerializerWrapper.java    |  57 +++
 .../flink/streaming/api/WriteAsTextTest.java    |  18 +-
 .../api/invokable/operator/FilterTest.java      |   2 +-
 .../serialization/TypeSerializationTest.java    |  91 +++++
 28 files changed, 846 insertions(+), 630 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
index d965bf2..1bde6a6 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
@@ -22,6 +22,9 @@ package org.apache.flink.streaming.api;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.commons.lang3.SerializationException;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.flink.api.common.functions.AbstractFunction;
 import org.apache.flink.api.java.functions.FilterFunction;
 import org.apache.flink.api.java.functions.FlatMapFunction;
 import org.apache.flink.api.java.functions.GroupReduceFunction;
@@ -29,14 +32,20 @@ import org.apache.flink.api.java.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.streaming.api.collector.OutputSelector;
 import org.apache.flink.streaming.api.function.co.CoMapFunction;
+import org.apache.flink.streaming.api.function.sink.PrintSinkFunction;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.streaming.api.function.sink.WriteFormatAsCsv;
 import org.apache.flink.streaming.api.function.sink.WriteFormatAsText;
+import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByBatches;
+import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByMillis;
+import org.apache.flink.streaming.api.invokable.SinkInvokable;
+import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
 import org.apache.flink.streaming.api.invokable.operator.BatchReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.FilterInvokable;
 import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable;
 import org.apache.flink.streaming.api.invokable.operator.MapInvokable;
 import org.apache.flink.streaming.api.invokable.operator.WindowReduceInvokable;
+import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
 import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
 import org.apache.flink.streaming.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.partitioner.DistributePartitioner;
@@ -44,6 +53,8 @@ import org.apache.flink.streaming.partitioner.FieldsPartitioner;
 import org.apache.flink.streaming.partitioner.ForwardPartitioner;
 import org.apache.flink.streaming.partitioner.ShufflePartitioner;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
+import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
 
 /**
  * A DataStream represents a stream of elements of the same type. A DataStream
@@ -66,12 +77,13 @@ public class DataStream<T extends Tuple> {
 	protected String id;
 	protected int degreeOfParallelism;
 	protected String userDefinedName;
-	protected OutputSelector<T> outputSelector;
 	protected List<String> connectIDs;
 	protected List<StreamPartitioner<T>> partitioners;
 	protected boolean iterationflag;
 	protected Integer iterationID;
 
+	protected JobGraphBuilder jobGraphBuilder;
+
 	/**
 	 * Create a new {@link DataStream} in the given execution environment with
 	 * partitioning set to shuffle by default.
@@ -91,8 +103,8 @@ public class DataStream<T extends Tuple> {
 		this.id = operatorType + "-" + counter.toString();
 		this.environment = environment;
 		this.degreeOfParallelism = environment.getDegreeOfParallelism();
+		this.jobGraphBuilder = environment.getJobGraphBuilder();
 		initConnections();
-
 	}
 
 	/**
@@ -106,11 +118,11 @@ public class DataStream<T extends Tuple> {
 		this.id = dataStream.id;
 		this.degreeOfParallelism = dataStream.degreeOfParallelism;
 		this.userDefinedName = dataStream.userDefinedName;
-		this.outputSelector = dataStream.outputSelector;
 		this.connectIDs = new ArrayList<String>(dataStream.connectIDs);
 		this.partitioners = new ArrayList<StreamPartitioner<T>>(dataStream.partitioners);
 		this.iterationflag = dataStream.iterationflag;
 		this.iterationID = dataStream.iterationID;
+		this.jobGraphBuilder = dataStream.jobGraphBuilder;
 	}
 
 	/**
@@ -144,7 +156,7 @@ public class DataStream<T extends Tuple> {
 	 * @return The DataStream with mutability set.
 	 */
 	public DataStream<T> setMutability(boolean isMutable) {
-		environment.setMutability(this, isMutable);
+		jobGraphBuilder.setMutability(id, isMutable);
 		return this;
 	}
 
@@ -157,7 +169,7 @@ public class DataStream<T extends Tuple> {
 	 * @return The DataStream with buffer timeout set.
 	 */
 	public DataStream<T> setBufferTimeout(long timeoutMillis) {
-		environment.setBufferTimeout(this, timeoutMillis);
+		jobGraphBuilder.setBufferTimeout(id, timeoutMillis);
 		return this;
 	}
 
@@ -175,10 +187,9 @@ public class DataStream<T extends Tuple> {
 		}
 		this.degreeOfParallelism = dop;
 
-		environment.setOperatorParallelism(this);
+		jobGraphBuilder.setParallelism(id, degreeOfParallelism);
 
 		return new DataStream<T>(this);
-
 	}
 
 	/**
@@ -200,13 +211,14 @@ public class DataStream<T extends Tuple> {
 	 * @return The named DataStream.
 	 */
 	public DataStream<T> name(String name) {
-		// copy?
+		// TODO copy DataStream?
 		if (name == "") {
 			throw new IllegalArgumentException("User defined name must not be empty string");
 		}
 
 		userDefinedName = name;
-		environment.setName(this, name);
+		jobGraphBuilder.setUserDefinedName(id, name);
+
 		return this;
 	}
 
@@ -236,13 +248,10 @@ public class DataStream<T extends Tuple> {
 	 *            The other DataStream will connected to this
 	 * @param stream
 	 *            This DataStream will be connected to returnStream
-	 * @return Connected DataStream
 	 */
-	private DataStream<T> addConnection(DataStream<T> returnStream, DataStream<T> stream) {
+	private void addConnection(DataStream<T> returnStream, DataStream<T> stream) {
 		returnStream.connectIDs.addAll(stream.connectIDs);
 		returnStream.partitioners.addAll(stream.partitioners);
-
-		return returnStream;
 	}
 
 	/**
@@ -256,8 +265,12 @@ public class DataStream<T extends Tuple> {
 	 * @return The directed DataStream.
 	 */
 	public DataStream<T> directTo(OutputSelector<T> outputSelector) {
-		this.outputSelector = outputSelector;
-		environment.addDirectedEmit(id, outputSelector);
+		try {
+			jobGraphBuilder.setOutputSelector(id, SerializationUtils.serialize(outputSelector));
+		} catch (SerializationException e) {
+			throw new RuntimeException("Cannot serialize OutputSelector");
+		}
+
 		return this;
 	}
 
@@ -323,10 +336,10 @@ public class DataStream<T extends Tuple> {
 		for (int i = 0; i < returnStream.partitioners.size(); i++) {
 			returnStream.partitioners.set(i, partitioner);
 		}
-		
+
 		return returnStream;
 	}
-	
+
 	/**
 	 * Applies a Map transformation on a {@link DataStream}. The transformation
 	 * calls a {@link MapFunction} for each element of the DataStream. Each
@@ -340,9 +353,8 @@ public class DataStream<T extends Tuple> {
 	 * @return The transformed DataStream.
 	 */
 	public <R extends Tuple> StreamOperator<T, R> map(MapFunction<T, R> mapper) {
-		return environment.addFunction("map", new DataStream<T>(this), mapper,
-				new MapInvokable<T, R>(mapper));
-
+		return addFunction("map", mapper, new FunctionTypeWrapper<T, Tuple, R>(mapper,
+				MapFunction.class, 0, -1, 1), new MapInvokable<T, R>(mapper));
 	}
 
 	/**
@@ -362,8 +374,10 @@ public class DataStream<T extends Tuple> {
 	 */
 	public <T2 extends Tuple, R extends Tuple> DataStream<R> coMapWith(
 			CoMapFunction<T, T2, R> coMapper, DataStream<T2> otherStream) {
-		return environment.addCoFunction("coMap", new DataStream<T>(this), new DataStream<T2>(
-				otherStream), coMapper, new CoMapInvokable<T, T2, R>(coMapper));
+		return addCoFunction("coMap", new DataStream<T>(this), new DataStream<T2>(otherStream),
+				coMapper,
+				new FunctionTypeWrapper<T, T2, R>(coMapper, CoMapFunction.class, 0, 1, 2),
+				new CoMapInvokable<T, T2, R>(coMapper));
 	}
 
 	/**
@@ -381,8 +395,8 @@ public class DataStream<T extends Tuple> {
 	 * @return The transformed DataStream.
 	 */
 	public <R extends Tuple> StreamOperator<T, R> flatMap(FlatMapFunction<T, R> flatMapper) {
-		return environment.addFunction("flatMap", new DataStream<T>(this), flatMapper,
-				new FlatMapInvokable<T, R>(flatMapper));
+		return addFunction("flatMap", flatMapper, new FunctionTypeWrapper<T, Tuple, R>(flatMapper,
+				FlatMapFunction.class, 0, -1, 1), new FlatMapInvokable<T, R>(flatMapper));
 	}
 
 	/**
@@ -397,8 +411,8 @@ public class DataStream<T extends Tuple> {
 	 * @return The filtered DataStream.
 	 */
 	public StreamOperator<T, T> filter(FilterFunction<T> filter) {
-		return environment.addFunction("filter", new DataStream<T>(this), filter,
-				new FilterInvokable<T>(filter));
+		return addFunction("filter", filter, new FunctionTypeWrapper<T, Tuple, T>(filter,
+				FilterFunction.class, 0, -1, 0), new FilterInvokable<T>(filter));
 	}
 
 	/**
@@ -418,8 +432,9 @@ public class DataStream<T extends Tuple> {
 	 */
 	public <R extends Tuple> StreamOperator<T, R> batchReduce(GroupReduceFunction<T, R> reducer,
 			int batchSize) {
-		return environment.addFunction("batchReduce", new DataStream<T>(this), reducer,
-				new BatchReduceInvokable<T, R>(reducer, batchSize));
+		return addFunction("batchReduce", reducer, new FunctionTypeWrapper<T, Tuple, R>(reducer,
+				GroupReduceFunction.class, 0, -1, 1), new BatchReduceInvokable<T, R>(reducer,
+				batchSize));
 	}
 
 	/**
@@ -440,8 +455,93 @@ public class DataStream<T extends Tuple> {
 	 */
 	public <R extends Tuple> StreamOperator<T, R> windowReduce(GroupReduceFunction<T, R> reducer,
 			long windowSize) {
-		return environment.addFunction("batchReduce", new DataStream<T>(this), reducer,
-				new WindowReduceInvokable<T, R>(reducer, windowSize));
+		return addFunction("batchReduce", reducer, new FunctionTypeWrapper<T, Tuple, R>(reducer,
+				GroupReduceFunction.class, 0, -1, 1), new WindowReduceInvokable<T, R>(reducer,
+				windowSize));
+	}
+
+	/**
+	 * Internal function for passing the user defined functions to the JobGraph
+	 * of the job.
+	 * 
+	 * @param functionName
+	 *            name of the function
+	 * @param function
+	 *            the user defined function
+	 * @param functionInvokable
+	 *            the wrapping JobVertex instance
+	 * @param <T>
+	 *            type of the input stream
+	 * @param <R>
+	 *            type of the return stream
+	 * @return the data stream constructed
+	 */
+	private <R extends Tuple> StreamOperator<T, R> addFunction(String functionName,
+			final AbstractFunction function, TypeSerializerWrapper<T, Tuple, R> typeWrapper,
+			UserTaskInvokable<T, R> functionInvokable) {
+
+		DataStream<T> inputStream = new DataStream<T>(this);
+		StreamOperator<T, R> returnStream = new StreamOperator<T, R>(environment, functionName);
+
+		try {
+			jobGraphBuilder.addTask(returnStream.getId(), functionInvokable, typeWrapper,
+					functionName, SerializationUtils.serialize(function), degreeOfParallelism);
+		} catch (SerializationException e) {
+			throw new RuntimeException("Cannot serialize user defined function");
+		}
+
+		connectGraph(inputStream, returnStream.getId(), 0);
+
+		if (inputStream.iterationflag) {
+			returnStream.addIterationSource(inputStream.iterationID.toString());
+			inputStream.iterationflag = false;
+		}
+
+		return returnStream;
+	}
+
+	protected <T1 extends Tuple, T2 extends Tuple, R extends Tuple> DataStream<R> addCoFunction(
+			String functionName, DataStream<T1> inputStream1, DataStream<T2> inputStream2,
+			final AbstractFunction function, TypeSerializerWrapper<T1, T2, R> typeWrapper,
+			CoInvokable<T1, T2, R> functionInvokable) {
+
+		DataStream<R> returnStream = new DataStream<R>(environment, functionName);
+
+		try {
+			jobGraphBuilder.addCoTask(returnStream.getId(), functionInvokable, typeWrapper,
+					functionName, SerializationUtils.serialize(function), degreeOfParallelism);
+		} catch (SerializationException e) {
+			throw new RuntimeException("Cannot serialize user defined function");
+		}
+
+		connectGraph(inputStream1, returnStream.getId(), 1);
+		connectGraph(inputStream2, returnStream.getId(), 2);
+
+		// TODO consider iteration
+
+		return returnStream;
+	}
+
+	/**
+	 * Internal function for assembling the underlying
+	 * {@link org.apache.flink.nephele.jobgraph.JobGraph} of the job. Connects
+	 * the outputs of the given input stream to the specified output stream
+	 * given by the outputID.
+	 * 
+	 * @param inputStream
+	 *            input data stream
+	 * @param outputID
+	 *            ID of the output
+	 * @param typeNumber
+	 *            Number of the type (used at co-functions)
+	 */
+	<X extends Tuple> void connectGraph(DataStream<X> inputStream, String outputID, int typeNumber) {
+		for (int i = 0; i < inputStream.connectIDs.size(); i++) {
+			String inputID = inputStream.connectIDs.get(i);
+			StreamPartitioner<X> partitioner = inputStream.partitioners.get(i);
+
+			jobGraphBuilder.setEdge(inputID, outputID, partitioner, typeNumber);
+		}
 	}
 
 	/**
@@ -454,7 +554,7 @@ public class DataStream<T extends Tuple> {
 	 * @return The modified DataStream.
 	 */
 	public DataStream<T> addSink(SinkFunction<T> sinkFunction) {
-		return environment.addSink(new DataStream<T>(this), sinkFunction);
+		return addSink(new DataStream<T>(this), sinkFunction);
 	}
 
 	/**
@@ -465,7 +565,35 @@ public class DataStream<T extends Tuple> {
 	 * @return The closed DataStream.
 	 */
 	public DataStream<T> print() {
-		return environment.print(new DataStream<T>(this));
+		DataStream<T> inputStream = new DataStream<T>(this);
+		PrintSinkFunction<T> printFunction = new PrintSinkFunction<T>();
+		DataStream<T> returnStream = addSink(inputStream, printFunction, null);
+
+		jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
+
+		return returnStream;
+	}
+
+	private DataStream<T> addSink(DataStream<T> inputStream, SinkFunction<T> sinkFunction) {
+		return addSink(inputStream, sinkFunction, new FunctionTypeWrapper<T, Tuple, T>(
+				sinkFunction, SinkFunction.class, 0, -1, 0));
+	}
+
+	private DataStream<T> addSink(DataStream<T> inputStream, SinkFunction<T> sinkFunction,
+			TypeSerializerWrapper<T, Tuple, T> typeWrapper) {
+		DataStream<T> returnStream = new DataStream<T>(environment, "sink");
+
+		try {
+			jobGraphBuilder.addSink(returnStream.getId(), new SinkInvokable<T>(sinkFunction),
+					typeWrapper, "sink", SerializationUtils.serialize(sinkFunction),
+					degreeOfParallelism);
+		} catch (SerializationException e) {
+			throw new RuntimeException("Cannot serialize SinkFunction");
+		}
+
+		inputStream.connectGraph(inputStream, returnStream.getId(), 0);
+
+		return returnStream;
 	}
 
 	/**
@@ -479,7 +607,7 @@ public class DataStream<T extends Tuple> {
 	 * @return The closed DataStream
 	 */
 	public DataStream<T> writeAsText(String path) {
-		environment.writeAsText(this, path, new WriteFormatAsText<T>(), 1, null);
+		writeAsText(this, path, new WriteFormatAsText<T>(), 1, null);
 		return new DataStream<T>(this);
 	}
 
@@ -497,7 +625,7 @@ public class DataStream<T extends Tuple> {
 	 * @return The closed DataStream
 	 */
 	public DataStream<T> writeAsText(String path, long millis) {
-		environment.writeAsText(this, path, new WriteFormatAsText<T>(), millis, null);
+		writeAsText(this, path, new WriteFormatAsText<T>(), millis, null);
 		return new DataStream<T>(this);
 	}
 
@@ -516,7 +644,7 @@ public class DataStream<T extends Tuple> {
 	 * @return The closed DataStream
 	 */
 	public DataStream<T> writeAsText(String path, int batchSize) {
-		environment.writeAsText(this, path, new WriteFormatAsText<T>(), batchSize, null);
+		writeAsText(this, path, new WriteFormatAsText<T>(), batchSize, null);
 		return new DataStream<T>(this);
 	}
 
@@ -539,7 +667,7 @@ public class DataStream<T extends Tuple> {
 	 * @return The closed DataStream
 	 */
 	public DataStream<T> writeAsText(String path, long millis, T endTuple) {
-		environment.writeAsText(this, path, new WriteFormatAsText<T>(), millis, endTuple);
+		writeAsText(this, path, new WriteFormatAsText<T>(), millis, endTuple);
 		return new DataStream<T>(this);
 	}
 
@@ -563,11 +691,66 @@ public class DataStream<T extends Tuple> {
 	 * @return The closed DataStream
 	 */
 	public DataStream<T> writeAsText(String path, int batchSize, T endTuple) {
-		environment.writeAsText(this, path, new WriteFormatAsText<T>(), batchSize, endTuple);
+		writeAsText(this, path, new WriteFormatAsText<T>(), batchSize, endTuple);
 		return new DataStream<T>(this);
 	}
 
 	/**
+	 * Writes a DataStream to the file specified by path in text format. The
+	 * writing is performed periodically, in every millis milliseconds. For
+	 * every element of the DataStream the result of {@link Object#toString()}
+	 * is written.
+	 * 
+	 * @param path
+	 *            is the path to the location where the tuples are written
+	 * @param millis
+	 *            is the file update frequency
+	 * @param endTuple
+	 *            is a special tuple indicating the end of the stream. If an
+	 *            endTuple is caught, the last pending batch of tuples will be
+	 *            immediately appended to the target file regardless of the
+	 *            system time.
+	 * 
+	 * @return the data stream constructed
+	 */
+	private DataStream<T> writeAsText(DataStream<T> inputStream, String path,
+			WriteFormatAsText<T> format, long millis, T endTuple) {
+		DataStream<T> returnStream = addSink(inputStream, new WriteSinkFunctionByMillis<T>(path,
+				format, millis, endTuple), null);
+		jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
+		jobGraphBuilder.setMutability(returnStream.getId(), false);
+		return returnStream;
+	}
+
+	/**
+	 * Writes a DataStream to the file specified by path in text format. The
+	 * writing is performed periodically in equally sized batches. For every
+	 * element of the DataStream the result of {@link Object#toString()} is
+	 * written.
+	 * 
+	 * @param path
+	 *            is the path to the location where the tuples are written
+	 * @param batchSize
+	 *            is the size of the batches, i.e. the number of tuples written
+	 *            to the file at a time
+	 * @param endTuple
+	 *            is a special tuple indicating the end of the stream. If an
+	 *            endTuple is caught, the last pending batch of tuples will be
+	 *            immediately appended to the target file regardless of the
+	 *            batchSize.
+	 * 
+	 * @return the data stream constructed
+	 */
+	private DataStream<T> writeAsText(DataStream<T> inputStream, String path,
+			WriteFormatAsText<T> format, int batchSize, T endTuple) {
+		DataStream<T> returnStream = addSink(inputStream, new WriteSinkFunctionByBatches<T>(path,
+				format, batchSize, endTuple), null);
+		jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
+		jobGraphBuilder.setMutability(returnStream.getId(), false);
+		return returnStream;
+	}
+
+	/**
 	 * Writes a DataStream to the file specified by path in text format. For
 	 * every element of the DataStream the result of {@link Object#toString()}
 	 * is written.
@@ -578,7 +761,7 @@ public class DataStream<T extends Tuple> {
 	 * @return The closed DataStream
 	 */
 	public DataStream<T> writeAsCsv(String path) {
-		environment.writeAsCsv(this, path, new WriteFormatAsCsv<T>(), 1, null);
+		writeAsCsv(this, path, new WriteFormatAsCsv<T>(), 1, null);
 		return new DataStream<T>(this);
 	}
 
@@ -596,7 +779,7 @@ public class DataStream<T extends Tuple> {
 	 * @return The closed DataStream
 	 */
 	public DataStream<T> writeAsCsv(String path, long millis) {
-		environment.writeAsCsv(this, path, new WriteFormatAsCsv<T>(), millis, null);
+		writeAsCsv(this, path, new WriteFormatAsCsv<T>(), millis, null);
 		return new DataStream<T>(this);
 	}
 
@@ -615,7 +798,7 @@ public class DataStream<T extends Tuple> {
 	 * @return The closed DataStream
 	 */
 	public DataStream<T> writeAsCsv(String path, int batchSize) {
-		environment.writeAsCsv(this, path, new WriteFormatAsCsv<T>(), batchSize, null);
+		writeAsCsv(this, path, new WriteFormatAsCsv<T>(), batchSize, null);
 		return new DataStream<T>(this);
 	}
 
@@ -638,7 +821,7 @@ public class DataStream<T extends Tuple> {
 	 * @return The closed DataStream
 	 */
 	public DataStream<T> writeAsCsv(String path, long millis, T endTuple) {
-		environment.writeAsCsv(this, path, new WriteFormatAsCsv<T>(), millis, endTuple);
+		writeAsCsv(this, path, new WriteFormatAsCsv<T>(), millis, endTuple);
 		return new DataStream<T>(this);
 	}
 
@@ -663,11 +846,66 @@ public class DataStream<T extends Tuple> {
 	 */
 	public DataStream<T> writeAsCsv(String path, int batchSize, T endTuple) {
 		setMutability(false);
-		environment.writeAsCsv(this, path, new WriteFormatAsCsv<T>(), batchSize, endTuple);
+		writeAsCsv(this, path, new WriteFormatAsCsv<T>(), batchSize, endTuple);
 		return new DataStream<T>(this);
 	}
 
 	/**
+	 * Writes a DataStream to the file specified by path in csv format. The
+	 * writing is performed periodically, in every millis milliseconds. For
+	 * every element of the DataStream the result of {@link Object#toString()}
+	 * is written.
+	 * 
+	 * @param path
+	 *            is the path to the location where the tuples are written
+	 * @param millis
+	 *            is the file update frequency
+	 * @param endTuple
+	 *            is a special tuple indicating the end of the stream. If an
+	 *            endTuple is caught, the last pending batch of tuples will be
+	 *            immediately appended to the target file regardless of the
+	 *            system time.
+	 * 
+	 * @return the data stream constructed
+	 */
+	private DataStream<T> writeAsCsv(DataStream<T> inputStream, String path,
+			WriteFormatAsCsv<T> format, long millis, T endTuple) {
+		DataStream<T> returnStream = addSink(inputStream, new WriteSinkFunctionByMillis<T>(path,
+				format, millis, endTuple));
+		jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
+		jobGraphBuilder.setMutability(returnStream.getId(), false);
+		return returnStream;
+	}
+
+	/**
+	 * Writes a DataStream to the file specified by path in csv format. The
+	 * writing is performed periodically in equally sized batches. For every
+	 * element of the DataStream the result of {@link Object#toString()} is
+	 * written.
+	 * 
+	 * @param path
+	 *            is the path to the location where the tuples are written
+	 * @param batchSize
+	 *            is the size of the batches, i.e. the number of tuples written
+	 *            to the file at a time
+	 * @param endTuple
+	 *            is a special tuple indicating the end of the stream. If an
+	 *            endTuple is caught, the last pending batch of tuples will be
+	 *            immediately appended to the target file regardless of the
+	 *            batchSize.
+	 * 
+	 * @return the data stream constructed
+	 */
+	private DataStream<T> writeAsCsv(DataStream<T> inputStream, String path,
+			WriteFormatAsCsv<T> format, int batchSize, T endTuple) {
+		DataStream<T> returnStream = addSink(inputStream, new WriteSinkFunctionByBatches<T>(path,
+				format, batchSize, endTuple), null);
+		jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
+		jobGraphBuilder.setMutability(returnStream.getId(), false);
+		return returnStream;
+	}
+
+	/**
 	 * Initiates an iterative part of the program that executes multiple times
 	 * and feeds back data streams. The iterative part needs to be closed by
 	 * calling {@link IterativeDataStream#closeWith(DataStream)}. The data
@@ -688,9 +926,12 @@ public class DataStream<T extends Tuple> {
 		return new IterativeDataStream<T>(this);
 	}
 
-	protected DataStream<T> addIterationSource(String iterationID) {
-		environment.addIterationSource(this, iterationID);
+	protected <R extends Tuple> DataStream<T> addIterationSource(String iterationID) {
+		DataStream<R> returnStream = new DataStream<R>(environment, "iterationSource");
+
+		jobGraphBuilder.addIterationSource(returnStream.getId(), this.getId(), iterationID,
+				degreeOfParallelism);
+
 		return new DataStream<T>(this);
 	}
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/IterativeDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/IterativeDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/IterativeDataStream.java
index c179f83..1cfb625 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/IterativeDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/IterativeDataStream.java
@@ -20,6 +20,7 @@
 package org.apache.flink.streaming.api;
 
 import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.partitioner.ForwardPartitioner;
 
 /**
  * The iterative data stream represents the start of an iteration in a
@@ -63,14 +64,26 @@ public class IterativeDataStream<T extends Tuple> extends StreamOperator<T, T> {
 	 * 
 	 * @param iterationResult
 	 *            The data stream that can be fed back to the next iteration.
-	 * @param directName
+	 * @param iterationName
 	 *            Name of the iteration edge (backward edge to iteration head)
 	 *            when used with directed emits
 	 * 
 	 */
-	public DataStream<T> closeWith(DataStream<T> iterationResult, String directName) {
-		environment.addIterationSink(iterationResult, iterationID.toString(), directName);
+	public <R extends Tuple> DataStream<T> closeWith(DataStream<T> iterationResult,
+			String iterationName) {
+		DataStream<R> returnStream = new DataStream<R>(environment, "iterationSink");
+
+		jobGraphBuilder.addIterationSink(returnStream.getId(), iterationResult.getId(),
+				iterationID.toString(), iterationResult.getParallelism(), iterationName);
+
+		jobGraphBuilder.setIterationSourceParallelism(iterationID.toString(),
+				iterationResult.getParallelism());
+
+		for (int i = 0; i < iterationResult.connectIDs.size(); i++) {
+			String inputID = iterationResult.connectIDs.get(i);
+			jobGraphBuilder.setEdge(inputID, returnStream.getId(), new ForwardPartitioner<T>(), 0);
+		}
+
 		return iterationResult;
 	}
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index fc4a1dd..3d49928 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 
 import org.apache.commons.lang3.SerializationException;
 import org.apache.commons.lang3.SerializationUtils;
+import org.apache.flink.api.common.functions.AbstractFunction;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.collector.OutputSelector;
@@ -30,6 +31,7 @@ import org.apache.flink.streaming.api.invokable.StreamComponentInvokable;
 import org.apache.flink.streaming.api.streamcomponent.StreamComponentException;
 import org.apache.flink.streaming.partitioner.ShufflePartitioner;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
 
 public class StreamConfig {
 	private static final String INPUT_TYPE = "inputType_";
@@ -71,6 +73,23 @@ public class StreamConfig {
 
 	// CONFIGS
 
+	public void setTypeWrapper(
+			TypeSerializerWrapper<? extends Tuple, ? extends Tuple, ? extends Tuple> typeWrapper) {
+		config.setBytes("typeWrapper", SerializationUtils.serialize(typeWrapper));
+	}
+
+	@SuppressWarnings("unchecked")
+	public <IN1 extends Tuple, IN2 extends Tuple, OUT extends Tuple> TypeSerializerWrapper<IN1, IN2, OUT> getTypeWrapper() {
+		byte[] serializedWrapper = config.getBytes("typeWrapper", null);
+
+		if (serializedWrapper == null) {
+			throw new RuntimeException("TypeSerializationWrapper must be set");
+		}
+
+		return (TypeSerializerWrapper<IN1, IN2, OUT>) SerializationUtils
+				.deserialize(serializedWrapper);
+	}
+
 	public void setMutability(boolean isMutable) {
 		config.setBoolean(MUTABILITY, isMutable);
 	}
@@ -87,25 +106,26 @@ public class StreamConfig {
 		return config.getLong(BUFFER_TIMEOUT, DEFAULT_TIMEOUT);
 	}
 
-	public void setUserInvokableClass(Class<? extends StreamComponentInvokable> clazz) {
-		config.setClass(USER_FUNCTION, clazz);
-	}
+	public void setUserInvokable(StreamComponentInvokable<? extends Tuple> invokableObject) {
+		if (invokableObject != null) {
+			config.setClass(USER_FUNCTION, invokableObject.getClass());
 
-	@SuppressWarnings("unchecked")
-	public <T extends StreamComponentInvokable> Class<? extends T> getUserInvokableClass() {
-		return (Class<? extends T>) config.getClass(USER_FUNCTION, null);
-	}
-
-	public void setUserInvokableObject(StreamComponentInvokable invokableObject) {
-		try {
-			config.setBytes(SERIALIZEDUDF, SerializationUtils.serialize(invokableObject));
-		} catch (SerializationException e) {
-			throw new RuntimeException("Cannot serialize invokable object "
-					+ invokableObject.getClass(), e);
+			try {
+				config.setBytes(SERIALIZEDUDF, SerializationUtils.serialize(invokableObject));
+			} catch (SerializationException e) {
+				throw new RuntimeException("Cannot serialize invokable object "
+						+ invokableObject.getClass(), e);
+			}
 		}
 	}
 
-	public <T extends StreamComponentInvokable> T getUserInvokableObject() {
+	// @SuppressWarnings("unchecked")
+	// public <T extends StreamComponentInvokable> Class<? extends T>
+	// getUserInvokableClass() {
+	// return (Class<? extends T>) config.getClass(USER_FUNCTION, null);
+	// }
+
+	public <T extends Tuple> StreamComponentInvokable<T> getUserInvokableObject() {
 		try {
 			return deserializeObject(config.getBytes(SERIALIZEDUDF, null));
 		} catch (Exception e) {
@@ -122,27 +142,29 @@ public class StreamConfig {
 		return config.getString(COMPONENT_NAME, null);
 	}
 
-	public void setFunction(byte[] serializedFunction) {
-		config.setBytes(FUNCTION, serializedFunction);
+	public void setFunction(byte[] serializedFunction, String functionName) {
+		if (serializedFunction != null) {
+			config.setBytes(FUNCTION, serializedFunction);
+			config.setString(FUNCTION_NAME, functionName);
+		}
 	}
 
 	public Object getFunction() {
 		try {
-			return SerializationUtils.deserialize(config
-					.getBytes(FUNCTION, null));
+			return SerializationUtils.deserialize(config.getBytes(FUNCTION, null));
 		} catch (SerializationException e) {
 			throw new RuntimeException("Cannot deserialize invokable object", e);
 		}
 	}
 
-	public void setFunctionName(String functionName) {
-		config.setString(FUNCTION_NAME, functionName);
-	}
+	// public void setFunctionName(String functionName) {
+	// config.setString(FUNCTION_NAME, functionName);
+	// }
 
 	public String getFunctionName() {
 		return config.getString(FUNCTION_NAME, "");
 	}
-	
+
 	public void setUserDefinedName(String userDefinedName) {
 		if (userDefinedName != null) {
 			config.setString(USER_DEFINED_NAME, userDefinedName);
@@ -158,8 +180,10 @@ public class StreamConfig {
 	}
 
 	public void setOutputSelector(byte[] outputSelector) {
-		config.setBytes(OUTPUT_SELECTOR, outputSelector);
-
+		if (outputSelector != null) {
+			setDirectedEmit(true);
+			config.setBytes(OUTPUT_SELECTOR, outputSelector);
+		}
 	}
 
 	public <T extends Tuple> OutputSelector<T> getOutputSelector() {
@@ -174,7 +198,7 @@ public class StreamConfig {
 	public void setIterationId(String iterationId) {
 		config.setString(ITERATION_ID, iterationId);
 	}
-	
+
 	public String getIterationId() {
 		return config.getString(ITERATION_ID, "iteration-0");
 	}
@@ -233,7 +257,16 @@ public class StreamConfig {
 	public int getInputType(int inputNumber) {
 		return config.getInteger(INPUT_TYPE + inputNumber, 0);
 	}
-	
+
+	public void setFunctionClass(Class<? extends AbstractFunction> functionClass) {
+		config.setClass("functionClass", functionClass);
+	}
+
+	@SuppressWarnings("unchecked")
+	public Class<? extends AbstractFunction> getFunctionClass() {
+		return (Class<? extends AbstractFunction>) config.getClass("functionClass", null);
+	}
+
 	@SuppressWarnings("unchecked")
 	protected static <T> T deserializeObject(byte[] serializedObject) throws IOException,
 			ClassNotFoundException {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
index f56614d..4539126 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
@@ -24,27 +24,17 @@ import java.util.Collection;
 
 import org.apache.commons.lang3.SerializationException;
 import org.apache.commons.lang3.SerializationUtils;
-import org.apache.flink.api.common.functions.AbstractFunction;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.streaming.api.collector.OutputSelector;
-import org.apache.flink.streaming.api.function.sink.PrintSinkFunction;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.streaming.api.function.sink.WriteFormatAsCsv;
-import org.apache.flink.streaming.api.function.sink.WriteFormatAsText;
-import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByBatches;
-import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByMillis;
 import org.apache.flink.streaming.api.function.source.FileSourceFunction;
 import org.apache.flink.streaming.api.function.source.FileStreamFunction;
 import org.apache.flink.streaming.api.function.source.FromElementsFunction;
 import org.apache.flink.streaming.api.function.source.GenSequenceFunction;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.streaming.api.invokable.SinkInvokable;
-import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
-import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
-import org.apache.flink.streaming.partitioner.ForwardPartitioner;
-import org.apache.flink.streaming.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.api.invokable.SourceInvokable;
+import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
+import org.apache.flink.streaming.util.serialization.ObjectTypeWrapper;
 
 /**
  * {@link ExecutionEnvironment} for streaming jobs. An instance of it is
@@ -124,13 +114,14 @@ public abstract class StreamExecutionEnvironment {
 		this.degreeOfParallelism = degreeOfParallelism;
 	}
 
-	protected void setMutability(DataStream<?> stream, boolean isMutable) {
-		jobGraphBuilder.setMutability(stream.getId(), isMutable);
-	}
-	
-	protected void setBufferTimeout(DataStream<?> stream, long bufferTimeout) {
-		jobGraphBuilder.setBufferTimeout(stream.getId(), bufferTimeout);
-	}
+	// protected void setMutability(DataStream<?> stream, boolean isMutable) {
+	// jobGraphBuilder.setMutability(stream.getId(), isMutable);
+	// }
+	//
+	// protected void setBufferTimeout(DataStream<?> stream, long bufferTimeout)
+	// {
+	// jobGraphBuilder.setBufferTimeout(stream.getId(), bufferTimeout);
+	// }
 
 	/**
 	 * Sets the number of hardware contexts (CPU cores / threads) used when
@@ -204,8 +195,11 @@ public abstract class StreamExecutionEnvironment {
 		DataStream<Tuple1<X>> returnStream = new DataStream<Tuple1<X>>(this, "elements");
 
 		try {
-			jobGraphBuilder.addSource(returnStream.getId(), new FromElementsFunction<X>(data),
-					"elements", SerializationUtils.serialize(data[0]), 1);
+			SourceFunction<Tuple1<X>> function = new FromElementsFunction<X>(data);
+			jobGraphBuilder.addSource(returnStream.getId(),
+					new SourceInvokable<Tuple1<X>>(function),
+					new ObjectTypeWrapper<Tuple1<X>, Tuple, Tuple1<X>>(data[0], null, data[0]),
+					"source", SerializationUtils.serialize(function), 1);
 		} catch (SerializationException e) {
 			throw new RuntimeException("Cannot serialize elements");
 		}
@@ -232,8 +226,14 @@ public abstract class StreamExecutionEnvironment {
 		}
 
 		try {
-			jobGraphBuilder.addSource(returnStream.getId(), new FromElementsFunction<X>(data),
-					"elements", SerializationUtils.serialize((Serializable) data.toArray()[0]), 1);
+			SourceFunction<Tuple1<X>> function = new FromElementsFunction<X>(data);
+
+			jobGraphBuilder
+					.addSource(returnStream.getId(), new SourceInvokable<Tuple1<X>>(
+							new FromElementsFunction<X>(data)),
+							new ObjectTypeWrapper<Tuple1<X>, Tuple, Tuple1<X>>(data.toArray()[0],
+									null, data.toArray()[0]), "source", SerializationUtils
+									.serialize(function), 1);
 		} catch (SerializationException e) {
 			throw new RuntimeException("Cannot serialize collection");
 		}
@@ -257,7 +257,7 @@ public abstract class StreamExecutionEnvironment {
 	/**
 	 * Ads a data source thus opening a {@link DataStream}.
 	 * 
-	 * @param sourceFunction
+	 * @param function
 	 *            the user defined function
 	 * @param parallelism
 	 *            number of parallel instances of the function
@@ -265,13 +265,13 @@ public abstract class StreamExecutionEnvironment {
 	 *            type of the returned stream
 	 * @return the data stream constructed
 	 */
-	public <T extends Tuple> DataStream<T> addSource(SourceFunction<T> sourceFunction,
-			int parallelism) {
+	public <T extends Tuple> DataStream<T> addSource(SourceFunction<T> function, int parallelism) {
 		DataStream<T> returnStream = new DataStream<T>(this, "source");
 
 		try {
-			jobGraphBuilder.addSource(returnStream.getId(), sourceFunction, "source",
-					SerializationUtils.serialize(sourceFunction), parallelism);
+			jobGraphBuilder.addSource(returnStream.getId(), new SourceInvokable<T>(function),
+					new FunctionTypeWrapper<T, Tuple, T>(function, SourceFunction.class, 0, -1, 0),
+					"source", SerializationUtils.serialize(function), parallelism);
 		} catch (SerializationException e) {
 			throw new RuntimeException("Cannot serialize SourceFunction");
 		}
@@ -284,326 +284,6 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	// --------------------------------------------------------------------------------------------
-	// Data stream operators and sinks
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * Internal function for passing the user defined functions to the JobGraph
-	 * of the job.
-	 * 
-	 * @param functionName
-	 *            name of the function
-	 * @param inputStream
-	 *            input data stream
-	 * @param function
-	 *            the user defined function
-	 * @param functionInvokable
-	 *            the wrapping JobVertex instance
-	 * @param <T>
-	 *            type of the input stream
-	 * @param <R>
-	 *            type of the return stream
-	 * @return the data stream constructed
-	 */
-	protected <T extends Tuple, R extends Tuple> StreamOperator<T, R> addFunction(
-			String functionName, DataStream<T> inputStream, final AbstractFunction function,
-			UserTaskInvokable<T, R> functionInvokable) {
-		StreamOperator<T, R> returnStream = new StreamOperator<T, R>(this, functionName);
-
-		try {
-			jobGraphBuilder.addTask(returnStream.getId(), functionInvokable, functionName,
-					SerializationUtils.serialize(function), degreeOfParallelism);
-		} catch (SerializationException e) {
-			throw new RuntimeException("Cannot serialize user defined function");
-		}
-
-		connectGraph(inputStream, returnStream.getId(), 0);
-
-		if (inputStream.iterationflag) {
-			returnStream.addIterationSource(inputStream.iterationID.toString());
-			inputStream.iterationflag = false;
-		}
-
-		return returnStream;
-	}
-	
-	protected <T1 extends Tuple, T2 extends Tuple, R extends Tuple> DataStream<R> addCoFunction(String functionName, DataStream<T1> inputStream1, DataStream<T2> inputStream2, final AbstractFunction function,
-			CoInvokable<T1, T2, R> functionInvokable) {
-		
-		DataStream<R> returnStream = new DataStream<R>(this, functionName);
-		
-		try {
-			jobGraphBuilder.addCoTask(returnStream.getId(), functionInvokable, functionName,
-					SerializationUtils.serialize(function), degreeOfParallelism);
-		} catch (SerializationException e) {
-			throw new RuntimeException("Cannot serialize user defined function");
-		}
-
-		connectGraph(inputStream1, returnStream.getId(), 1);
-		connectGraph(inputStream2, returnStream.getId(), 2);
-
-		// TODO consider iteration
-//		if (inputStream.iterationflag) {
-//			returnStream.addIterationSource(inputStream.iterationID.toString());
-//			inputStream.iterationflag = false;
-//		}
-
-		return returnStream; 
-	}
-
-	protected <T extends Tuple, R extends Tuple> void addIterationSource(DataStream<T> inputStream,
-			String iterationID) {
-		DataStream<R> returnStream = new DataStream<R>(this, "iterationSource");
-
-		jobGraphBuilder.addIterationSource(returnStream.getId(), inputStream.getId(), iterationID,
-				degreeOfParallelism);
-
-	}
-
-	protected <T extends Tuple, R extends Tuple> void addIterationSink(DataStream<T> inputStream,
-			String iterationID, String iterationName) {
-		DataStream<R> returnStream = new DataStream<R>(this, "iterationSink");
-
-		jobGraphBuilder.addIterationSink(returnStream.getId(), inputStream.getId(), iterationID,
-				inputStream.getParallelism(), iterationName);
-
-		jobGraphBuilder.setIterationSourceParallelism(iterationID, inputStream.getParallelism());
-
-		for (int i = 0; i < inputStream.connectIDs.size(); i++) {
-			String inputID = inputStream.connectIDs.get(i);
-			jobGraphBuilder.setEdge(inputID, returnStream.getId(), new ForwardPartitioner<T>(), 0);
-		}
-	}
-
-	/**
-	 * Adds the given sink to this environment. Only streams with sinks added
-	 * will be executed once the {@link #execute()} method is called.
-	 * 
-	 * @param inputStream
-	 *            input data stream
-	 * @param sinkFunction
-	 *            the user defined function
-	 * @param <T>
-	 *            type of the returned stream
-	 * @return the data stream constructed
-	 */
-	protected <T extends Tuple> DataStream<T> addSink(DataStream<T> inputStream,
-			SinkFunction<T> sinkFunction) {
-		DataStream<T> returnStream = new DataStream<T>(this, "sink");
-
-		try {
-			jobGraphBuilder.addSink(returnStream.getId(), new SinkInvokable<T>(sinkFunction),
-					"sink", SerializationUtils.serialize(sinkFunction), degreeOfParallelism);
-		} catch (SerializationException e) {
-			throw new RuntimeException("Cannot serialize SinkFunction");
-		}
-
-		connectGraph(inputStream, returnStream.getId(), 0);
-
-		return returnStream;
-	}
-
-	<T extends Tuple> void addDirectedEmit(String id, OutputSelector<T> outputSelector) {
-		try {
-			jobGraphBuilder.setOutputSelector(id, SerializationUtils.serialize(outputSelector));
-		} catch (SerializationException e) {
-			throw new RuntimeException("Cannot serialize OutputSelector");
-		}
-	}
-
-	/**
-	 * Writes a DataStream to the standard output stream (stdout). For each
-	 * element of the DataStream the result of {@link Object#toString()} is
-	 * written.
-	 * 
-	 * @param inputStream
-	 *            the input data stream
-	 * 
-	 * @param <T>
-	 *            type of the returned stream
-	 * @return the data stream constructed
-	 */
-	protected <T extends Tuple> DataStream<T> print(DataStream<T> inputStream) {
-		DataStream<T> returnStream = addSink(inputStream, new PrintSinkFunction<T>());
-
-		jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
-
-		return returnStream;
-	}
-
-	/**
-	 * Writes a DataStream to the file specified by path in text format. The
-	 * writing is performed periodically, in every millis milliseconds. For
-	 * every element of the DataStream the result of {@link Object#toString()}
-	 * is written.
-	 * 
-	 * @param path
-	 *            is the path to the location where the tuples are written
-	 * @param millis
-	 *            is the file update frequency
-	 * @param endTuple
-	 *            is a special tuple indicating the end of the stream. If an
-	 *            endTuple is caught, the last pending batch of tuples will be
-	 *            immediately appended to the target file regardless of the
-	 *            system time.
-	 * 
-	 * @return the data stream constructed
-	 */
-	protected <T extends Tuple> DataStream<T> writeAsText(DataStream<T> inputStream, String path,
-			WriteFormatAsText<T> format, long millis, T endTuple) {
-		DataStream<T> returnStream = addSink(inputStream, new WriteSinkFunctionByMillis<T>(path,
-				format, millis, endTuple));
-		jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
-		jobGraphBuilder.setMutability(returnStream.getId(), false);
-		return returnStream;
-	}
-
-	/**
-	 * Writes a DataStream to the file specified by path in text format. The
-	 * writing is performed periodically in equally sized batches. For every
-	 * element of the DataStream the result of {@link Object#toString()} is
-	 * written.
-	 * 
-	 * @param path
-	 *            is the path to the location where the tuples are written
-	 * @param batchSize
-	 *            is the size of the batches, i.e. the number of tuples written
-	 *            to the file at a time
-	 * @param endTuple
-	 *            is a special tuple indicating the end of the stream. If an
-	 *            endTuple is caught, the last pending batch of tuples will be
-	 *            immediately appended to the target file regardless of the
-	 *            batchSize.
-	 * 
-	 * @return the data stream constructed
-	 */
-	protected <T extends Tuple> DataStream<T> writeAsText(DataStream<T> inputStream, String path,
-			WriteFormatAsText<T> format, int batchSize, T endTuple) {
-		DataStream<T> returnStream = addSink(inputStream, new WriteSinkFunctionByBatches<T>(path,
-				format, batchSize, endTuple));
-		jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
-		jobGraphBuilder.setMutability(returnStream.getId(), false);
-		return returnStream;
-	}
-
-	/**
-	 * Writes a DataStream to the file specified by path in csv format. The
-	 * writing is performed periodically, in every millis milliseconds. For
-	 * every element of the DataStream the result of {@link Object#toString()}
-	 * is written.
-	 * 
-	 * @param path
-	 *            is the path to the location where the tuples are written
-	 * @param millis
-	 *            is the file update frequency
-	 * @param endTuple
-	 *            is a special tuple indicating the end of the stream. If an
-	 *            endTuple is caught, the last pending batch of tuples will be
-	 *            immediately appended to the target file regardless of the
-	 *            system time.
-	 * 
-	 * @return the data stream constructed
-	 */
-	protected <T extends Tuple> DataStream<T> writeAsCsv(DataStream<T> inputStream, String path,
-			WriteFormatAsCsv<T> format, long millis, T endTuple) {
-		DataStream<T> returnStream = addSink(inputStream, new WriteSinkFunctionByMillis<T>(path,
-				format, millis, endTuple));
-		jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
-		jobGraphBuilder.setMutability(returnStream.getId(), false);
-		return returnStream;
-	}
-
-	/**
-	 * Writes a DataStream to the file specified by path in csv format. The
-	 * writing is performed periodically in equally sized batches. For every
-	 * element of the DataStream the result of {@link Object#toString()} is
-	 * written.
-	 * 
-	 * @param path
-	 *            is the path to the location where the tuples are written
-	 * @param batchSize
-	 *            is the size of the batches, i.e. the number of tuples written
-	 *            to the file at a time
-	 * @param endTuple
-	 *            is a special tuple indicating the end of the stream. If an
-	 *            endTuple is caught, the last pending batch of tuples will be
-	 *            immediately appended to the target file regardless of the
-	 *            batchSize.
-	 * 
-	 * @return the data stream constructed
-	 */
-	protected <T extends Tuple> DataStream<T> writeAsCsv(DataStream<T> inputStream, String path,
-			WriteFormatAsCsv<T> format, int batchSize, T endTuple) {
-		DataStream<T> returnStream = addSink(inputStream, new WriteSinkFunctionByBatches<T>(path,
-				format, batchSize, endTuple));
-		jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
-		jobGraphBuilder.setMutability(returnStream.getId(), false);
-		return returnStream;
-	}
-
-	/**
-	 * Internal function for assembling the underlying
-	 * {@link org.apache.flink.nephele.jobgraph.JobGraph} of the job. Connects
-	 * the outputs of the given input stream to the specified output stream
-	 * given by the outputID.
-	 * 
-	 * @param inputStream
-	 *            input data stream
-	 * @param outputID
-	 *            ID of the output
-	 * @param <T>
-	 *            type of the input stream
-	 * @param typeNumber
-	 *            Number of the type (used at co-functions)
-	 */
-	private <T extends Tuple> void connectGraph(DataStream<T> inputStream, String outputID, int typeNumber) {
-		
-		for (int i = 0; i < inputStream.connectIDs.size(); i++) {
-			String inputID = inputStream.connectIDs.get(i);
-			StreamPartitioner<T> partitioner = inputStream.partitioners.get(i);
-			
-			jobGraphBuilder.setEdge(inputID, outputID, partitioner,
-					typeNumber);
-		}
-	}
-
-	protected <T extends Tuple> void setName(DataStream<T> stream, String name) {
-		jobGraphBuilder.setUserDefinedName(stream.getId(), name);
-	}
-
-	/**
-	 * Sets the proper parallelism for the given operator in the JobGraph
-	 * 
-	 * @param inputStream
-	 *            DataStream corresponding to the operator
-	 * @param <T>
-	 *            type of the operator
-	 */
-	protected <T extends Tuple> void setOperatorParallelism(DataStream<T> inputStream) {
-		jobGraphBuilder.setParallelism(inputStream.getId(), inputStream.degreeOfParallelism);
-	}
-
-	// /**
-	// * Converts object to byte array using default java serialization
-	// *
-	// * @param object
-	// * Object to be serialized
-	// * @return Serialized object
-	// */
-	// static byte[] serializeToByteArray(Serializable object) {
-	// SerializationUtils.serialize(object);
-	// ByteArrayOutputStream baos = new ByteArrayOutputStream();
-	// ObjectOutputStream oos;
-	// try {
-	// oos = new ObjectOutputStream(baos);
-	// oos.writeObject(object);
-	// } catch (IOException e) {
-	// throw new RuntimeException("Cannot serialize object: " + object);
-	// }
-	// return baos.toByteArray();
-	// }
-
-	// --------------------------------------------------------------------------------------------
 	// Instantiation of Execution Contexts
 	// --------------------------------------------------------------------------------------------
 
@@ -741,7 +421,7 @@ public abstract class StreamExecutionEnvironment {
 	 * 
 	 * @return jobgraph
 	 */
-	public JobGraphBuilder jobGB() {
+	public JobGraphBuilder getJobGraphBuilder() {
 		return jobGraphBuilder;
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
index f016cbc..f6c2c72 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
@@ -28,14 +28,14 @@ import org.apache.flink.util.Collector;
 
 public class FileSourceFunction extends SourceFunction<Tuple1<String>> {
 	private static final long serialVersionUID = 1L;
-	
+
 	private final String path;
 	private Tuple1<String> outTuple = new Tuple1<String>();
-	
+
 	public FileSourceFunction(String path) {
 		this.path = path;
 	}
-	
+
 	@Override
 	public void invoke(Collector<Tuple1<String>> collector) throws IOException {
 		BufferedReader br = new BufferedReader(new FileReader(path));

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
index c3ccedf..edadfc3 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
@@ -26,19 +26,19 @@ import java.io.IOException;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.util.Collector;
 
-public class FileStreamFunction extends SourceFunction<Tuple1<String>>{
+public class FileStreamFunction extends SourceFunction<Tuple1<String>> {
 	private static final long serialVersionUID = 1L;
-	
+
 	private final String path;
 	private Tuple1<String> outTuple = new Tuple1<String>();
-	
+
 	public FileStreamFunction(String path) {
 		this.path = path;
 	}
-	
+
 	@Override
 	public void invoke(Collector<Tuple1<String>> collector) throws IOException {
-		while(true){
+		while (true) {
 			BufferedReader br = new BufferedReader(new FileReader(path));
 			String line = br.readLine();
 			while (line != null) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
index 70553bf..971533f 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
@@ -19,12 +19,13 @@
 
 package org.apache.flink.streaming.api.function.source;
 
-import org.apache.flink.streaming.api.invokable.UserSourceInvokable;
-
+import org.apache.flink.api.common.functions.AbstractFunction;
 import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.util.Collector;
 
-public abstract class SourceFunction<OUT extends Tuple> extends UserSourceInvokable<OUT> {
+public abstract class SourceFunction<OUT extends Tuple> extends AbstractFunction {
 
 	private static final long serialVersionUID = 1L;
 
+	public abstract void invoke(Collector<OUT> collector) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
new file mode 100644
index 0000000..992a25e
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
@@ -0,0 +1,44 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.invokable;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+
+public class SourceInvokable<OUT extends Tuple> extends StreamComponentInvokable<OUT> implements
+		Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	private SourceFunction<OUT> sourceFunction;
+
+	public SourceInvokable() {
+	}
+
+	public SourceInvokable(SourceFunction<OUT> sourceFunction) {
+		this.sourceFunction = sourceFunction;
+	}
+
+	public void invoke() throws Exception {
+		sourceFunction.invoke(collector);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java
index 0e8ea98..daa7378 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java
@@ -21,7 +21,10 @@ package org.apache.flink.streaming.api.invokable;
 
 import java.io.Serializable;
 
-public abstract class StreamComponentInvokable implements Serializable {
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.util.Collector;
+
+public abstract class StreamComponentInvokable<OUT extends Tuple> implements Serializable {
 
 	private static final long serialVersionUID = 1L;
 
@@ -29,6 +32,11 @@ public abstract class StreamComponentInvokable implements Serializable {
 	private String componentName;
 	@SuppressWarnings("unused")
 	private int channelID;
+	protected Collector<OUT> collector;
+
+	public void setCollector(Collector<OUT> collector) {
+		this.collector = collector;
+	}
 
 	public void setAttributes(String componentName, int channelID) {
 		this.componentName = componentName;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
index 6beec27..5be3c30 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
@@ -28,11 +28,10 @@ import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
 public abstract class StreamRecordInvokable<IN extends Tuple, OUT extends Tuple> extends
-		StreamComponentInvokable {
+		StreamComponentInvokable<OUT> {
 
 	private static final long serialVersionUID = 1L;
 
-	protected Collector<OUT> collector;
 	protected MutableObjectIterator<StreamRecord<IN>> recordIterator;
 	StreamRecordSerializer<IN> serializer;
 	protected StreamRecord<IN> reuse;
@@ -41,7 +40,7 @@ public abstract class StreamRecordInvokable<IN extends Tuple, OUT extends Tuple>
 	public void initialize(Collector<OUT> collector,
 			MutableObjectIterator<StreamRecord<IN>> recordIterator,
 			StreamRecordSerializer<IN> serializer, boolean isMutable) {
-		this.collector = collector;
+		setCollector(collector);
 		this.recordIterator = recordIterator;
 		this.serializer = serializer;
 		this.reuse = serializer.createInstance();
@@ -51,7 +50,7 @@ public abstract class StreamRecordInvokable<IN extends Tuple, OUT extends Tuple>
 	protected void resetReuse() {
 		this.reuse = serializer.createInstance();
 	}
-	
+
 	protected StreamRecord<IN> loadNextRecord() {
 		try {
 			reuse = recordIterator.next(reuse);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/UserSourceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/UserSourceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/UserSourceInvokable.java
deleted file mode 100644
index e85b563..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/UserSourceInvokable.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.api.invokable;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.util.Collector;
-
-public abstract class UserSourceInvokable<OUT extends Tuple> extends StreamComponentInvokable implements
-		Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-	public abstract void invoke(Collector<OUT> collector) throws Exception;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
index e881d57..f7ea566 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
@@ -28,5 +28,4 @@ public abstract class StreamReduceInvokable<IN extends Tuple, OUT extends Tuple>
 	private static final long serialVersionUID = 1L;
 	protected GroupReduceFunction<IN, OUT> reducer;
 
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
index 85086f9..884e361 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
@@ -27,7 +27,7 @@ import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
 public abstract class CoInvokable<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tuple> extends
-		StreamComponentInvokable {
+		StreamComponentInvokable<OUT> {
 
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
index 22c079c..fcf87e2 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
@@ -26,13 +26,8 @@ import org.apache.commons.lang.SerializationUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.FilterFunction;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.runtime.io.network.api.MutableReader;
 import org.apache.flink.runtime.io.network.api.RecordWriter;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -43,17 +38,18 @@ import org.apache.flink.streaming.api.StreamConfig;
 import org.apache.flink.streaming.api.collector.DirectedStreamCollector;
 import org.apache.flink.streaming.api.collector.OutputSelector;
 import org.apache.flink.streaming.api.collector.StreamCollector;
-import org.apache.flink.streaming.api.function.co.CoMapFunction;
 import org.apache.flink.streaming.api.invokable.StreamComponentInvokable;
-import org.apache.flink.streaming.api.invokable.UserSourceInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
 public abstract class AbstractStreamComponent<OUT extends Tuple> extends AbstractInvokable {
 
+	protected static final String SOURCE = "source";
+
 	private static final Log LOG = LogFactory.getLog(AbstractStreamComponent.class);
 
 	protected TupleTypeInfo<OUT> outTupleTypeInfo = null;
@@ -61,6 +57,7 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
 	protected SerializationDelegate<StreamRecord<OUT>> outSerializationDelegate = null;
 
 	protected StreamConfig configuration;
+	protected TypeSerializerWrapper<? extends Tuple, ? extends Tuple, OUT> typeWrapper;
 	protected StreamCollector<OUT> collector;
 	protected int instanceID;
 	protected String name;
@@ -68,19 +65,27 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
 	protected boolean isMutable;
 	protected Object function;
 	protected String functionName;
-	
+
 	protected static int newComponent() {
 		numComponents++;
 		return numComponents;
 	}
 
+	@Override
+	public void registerInputOutput() {
+		initialize();
+		setInputsOutputs();
+		setInvokable();
+		setCollector();
+	}
+
 	protected void initialize() {
 		this.configuration = new StreamConfig(getTaskConfiguration());
 		this.name = configuration.getComponentName();
 		this.isMutable = configuration.getMutability();
 		this.functionName = configuration.getFunctionName();
 		this.function = configuration.getFunction();
-
+		this.typeWrapper = configuration.getTypeWrapper();
 	}
 
 	protected Collector<OUT> setCollector() {
@@ -96,40 +101,12 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
 	}
 
 	protected void setSerializers() {
-		try {
-			if (functionName.equals("flatMap")) {
-				setSerializer(function, FlatMapFunction.class, 1);
-			} else if (functionName.equals("map")) {
-				setSerializer(function, MapFunction.class, 1);
-			} else if (functionName.equals("batchReduce")) {
-				setSerializer(function, GroupReduceFunction.class, 1);
-			} else if (functionName.equals("filter")) {
-				setSerializer(function, FilterFunction.class, 0);
-			} else if (functionName.equals("source")) {
-				setSerializer(function, UserSourceInvokable.class, 0);
-			} else if (functionName.equals("coMap")) {
-				setSerializer(function, CoMapFunction.class, 2);
-			} else if (functionName.equals("elements")) {
-				outTupleTypeInfo = new TupleTypeInfo<OUT>(TypeExtractor.getForObject(function));
-
-				outTupleSerializer = new StreamRecordSerializer<OUT>(outTupleTypeInfo.createSerializer());
-				outSerializationDelegate = new SerializationDelegate<StreamRecord<OUT>>(
-						outTupleSerializer);
-			} else {
-				throw new Exception("Wrong operator name: " + functionName);
-			}
-		} catch (Exception e) {
-			throw new StreamComponentException(e);
-		}
-	
+		setSerializer();
 	}
-	
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	protected void setSerializer(Object function, Class<?> clazz, int typeParameter) {
-		outTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(clazz, function.getClass(),
-				typeParameter, null, null);
 
-		outTupleSerializer = new StreamRecordSerializer(outTupleTypeInfo.createSerializer());
+	protected void setSerializer() {
+		outTupleTypeInfo = typeWrapper.getOutputTupleTypeInfo();
+		outTupleSerializer = new StreamRecordSerializer<OUT>(outTupleTypeInfo.createSerializer());
 		outSerializationDelegate = new SerializationDelegate<StreamRecord<OUT>>(outTupleSerializer);
 	}
 
@@ -137,7 +114,7 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
 			List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs) {
 		setSerializers();
 		setCollector();
-		
+
 		int numberOfOutputs = configuration.getNumberOfOutputs();
 
 		for (int i = 0; i < numberOfOutputs; i++) {
@@ -148,7 +125,7 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
 	private void setPartitioner(int outputNumber,
 			List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs) {
 		StreamPartitioner<OUT> outputPartitioner = null;
-		
+
 		try {
 			outputPartitioner = configuration.getPartitioner(outputNumber);
 
@@ -189,8 +166,9 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
 	 *            Class of the invokable function
 	 * @return The StreamComponent object
 	 */
-	protected <T extends StreamComponentInvokable> T getInvokable() {
-		return configuration.getUserInvokableObject();
+	@SuppressWarnings("unchecked")
+	protected <T extends StreamComponentInvokable<OUT>> T getInvokable() {
+		return (T) configuration.getUserInvokableObject();
 	}
 
 	protected <IN extends Tuple> MutableObjectIterator<StreamRecord<IN>> createInputIterator(
@@ -210,16 +188,8 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
 		return (T) SerializationUtils.deserialize(serializedObject);
 	}
 
-
-	@Override
-	public void registerInputOutput() {
-		initialize();
-		setInputsOutputs();		
-		setInvokable();
-	}
-	
 	protected abstract void setInputsOutputs();
-	
+
 	protected abstract void setInvokable();
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/BlockingQueueBroker.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/BlockingQueueBroker.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/BlockingQueueBroker.java
index c8eca70..52de3d1 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/BlockingQueueBroker.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/BlockingQueueBroker.java
@@ -25,13 +25,14 @@ import org.apache.flink.runtime.iterative.concurrent.Broker;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 
 @SuppressWarnings("rawtypes")
-public class BlockingQueueBroker extends Broker<BlockingQueue<StreamRecord>>{
+public class BlockingQueueBroker extends Broker<BlockingQueue<StreamRecord>> {
 	/**
 	 * Singleton instance
 	 */
 	private static final BlockingQueueBroker INSTANCE = new BlockingQueueBroker();
 
-	private BlockingQueueBroker() {}
+	private BlockingQueueBroker() {
+	}
 
 	/**
 	 * retrieve singleton instance

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
index 4d531f7..0c02c16 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
@@ -28,7 +28,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.flink.api.common.functions.AbstractFunction;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.io.network.api.MutableReader;
 import org.apache.flink.runtime.io.network.api.MutableRecordReader;
@@ -45,8 +44,8 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
 		AbstractStreamComponent<OUT> {
 	private static final Log LOG = LogFactory.getLog(CoStreamTask.class);
 
-	protected StreamRecordSerializer<IN1> inTupleSerializer1 = null;
-	protected StreamRecordSerializer<IN2> inTupleSerializer2 = null;
+	protected StreamRecordSerializer<IN1> inTupleDeserializer1 = null;
+	protected StreamRecordSerializer<IN2> inTupleDeserializer2 = null;
 
 	private MutableReader<IOReadableWritable> inputs1;
 	private MutableReader<IOReadableWritable> inputs2;
@@ -54,24 +53,25 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
 	MutableObjectIterator<StreamRecord<IN2>> inputIter2;
 
 	private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
-	private CoInvokable<IN1, IN2, OUT> userFunction;
+	private CoInvokable<IN1, IN2, OUT> userInvokable;
 	private static int numTasks;
 
 	public CoStreamTask() {
 
 		outputs = new LinkedList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
-		userFunction = null;
+		userInvokable = null;
 		numTasks = newComponent();
 		instanceID = numTasks;
 	}
 
+	@Override
 	protected void setSerializers() {
 		String operatorName = configuration.getFunctionName();
 
 		Object function = configuration.getFunction();
 		try {
 			if (operatorName.equals("coMap")) {
-				setSerializer(function, CoMapFunction.class, 2);
+				setSerializer();
 				setDeserializers(function, CoMapFunction.class);
 			} else {
 				throw new Exception("Wrong operator name!");
@@ -83,13 +83,11 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
 
 	@SuppressWarnings({ "unchecked", "rawtypes" })
 	private void setDeserializers(Object function, Class<? extends AbstractFunction> clazz) {
-		TupleTypeInfo<IN1> inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(clazz,
-				function.getClass(), 0, null, null);
-		inTupleSerializer1 = new StreamRecordSerializer(inTupleTypeInfo.createSerializer());
+		TupleTypeInfo<IN1> inTupleTypeInfo = (TupleTypeInfo<IN1>) typeWrapper.getInputTupleTypeInfo1();
+		inTupleDeserializer1 = new StreamRecordSerializer<IN1>(inTupleTypeInfo.createSerializer());
 
-		inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(clazz, function.getClass(),
-				1, null, null);
-		inTupleSerializer2 = new StreamRecordSerializer(inTupleTypeInfo.createSerializer());
+		inTupleTypeInfo = (TupleTypeInfo<IN1>) typeWrapper.getInputTupleTypeInfo2();
+		inTupleDeserializer2 = new StreamRecordSerializer(inTupleTypeInfo.createSerializer());
 	}
 
 	@Override
@@ -97,15 +95,15 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
 		setConfigOutputs(outputs);
 		setConfigInputs();
 
-		inputIter1 = createInputIterator(inputs1, inTupleSerializer1);
-		inputIter2 = createInputIterator(inputs2, inTupleSerializer2);
+		inputIter1 = createInputIterator(inputs1, inTupleDeserializer1);
+		inputIter2 = createInputIterator(inputs2, inTupleDeserializer2);
 	}
-	
+
 	@Override
 	protected void setInvokable() {
-		userFunction = getInvokable();
-		userFunction.initialize(collector, inputIter1, inTupleSerializer1, inputIter2,
-				inTupleSerializer2, isMutable);
+		userInvokable = getInvokable();
+		userInvokable.initialize(collector, inputIter1, inTupleDeserializer1, inputIter2,
+				inTupleDeserializer2, isMutable);
 	}
 
 	protected void setConfigInputs() throws StreamComponentException {
@@ -156,7 +154,7 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
 			output.initializeSerializers();
 		}
 
-		userFunction.invoke();
+		userInvokable.invoke();
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("TASK " + name + " invoke finished with instance id " + instanceID);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java
index 0b5b377..8355b78 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java
@@ -19,62 +19,38 @@
 
 package org.apache.flink.streaming.api.streamcomponent;
 
-import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.java.functions.FilterFunction;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.io.network.api.MutableReader;
 import org.apache.flink.runtime.io.network.api.MutableRecordReader;
 import org.apache.flink.runtime.io.network.api.MutableUnionRecordReader;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.streaming.api.invokable.UserSourceInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.util.MutableObjectIterator;
 
-public abstract class SingleInputAbstractStreamComponent<IN extends Tuple, OUT extends Tuple> extends
-		AbstractStreamComponent<OUT> {
+public abstract class SingleInputAbstractStreamComponent<IN extends Tuple, OUT extends Tuple>
+		extends AbstractStreamComponent<OUT> {
 
 	protected StreamRecordSerializer<IN> inTupleSerializer = null;
 	protected MutableObjectIterator<StreamRecord<IN>> inputIter;
 	protected MutableReader<IOReadableWritable> inputs;
 
 	protected void setDeserializers() {
-		try {
-			if (functionName.equals("flatMap")) {
-				setDeserializer(function, FlatMapFunction.class);
-			} else if (functionName.equals("map")) {
-				setDeserializer(function, MapFunction.class);
-			} else if (functionName.equals("batchReduce")) {
-				setDeserializer(function, GroupReduceFunction.class);
-			} else if (functionName.equals("filter")) {
-				setDeserializer(function, FilterFunction.class);
-			} else if (functionName.equals("source")) {
-				setSerializer(function, UserSourceInvokable.class, 0);
-			} else if (functionName.equals("sink")) {
-				setDeserializer(function, SinkFunction.class);
-			} else {
-				throw new Exception("Wrong operator name: " + functionName);
-			}
-
-		} catch (Exception e) {
-			throw new StreamComponentException(e);
+		if (functionName.equals(SOURCE)) {
+			setSerializer();
+		} else {
+			setDeserializer();
 		}
 	}
 
-	@SuppressWarnings({ "unchecked", "rawtypes" })
-	private void setDeserializer(Object function, Class<? extends AbstractFunction> clazz) {
-		TupleTypeInfo<IN> inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(clazz, function.getClass(),
-				0, null, null);
-
-		inTupleSerializer = new StreamRecordSerializer(inTupleTypeInfo.createSerializer());
+	@SuppressWarnings("unchecked")
+	private void setDeserializer() {
+		TupleTypeInfo<IN> inTupleTypeInfo = (TupleTypeInfo<IN>) typeWrapper
+				.getInputTupleTypeInfo1();
+		inTupleSerializer = new StreamRecordSerializer<IN>(inTupleTypeInfo.createSerializer());
 	}
-	
+
 	@SuppressWarnings("unchecked")
 	protected void setSinkSerializer() {
 		if (outSerializationDelegate != null) {
@@ -87,7 +63,7 @@ public abstract class SingleInputAbstractStreamComponent<IN extends Tuple, OUT e
 	@SuppressWarnings("unchecked")
 	protected void setConfigInputs() throws StreamComponentException {
 		setDeserializers();
-		
+
 		int numberOfInputs = configuration.getNumberOfInputs();
 
 		if (numberOfInputs < 2) {