You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/29 21:03:48 UTC

[15/28] git commit: [streaming] Added exceptions for wrong usage of StreamExecutionEnvironment

[streaming] Added exceptions for wrong usage of StreamExecutionEnvironment


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/18502111
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/18502111
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/18502111

Branch: refs/heads/master
Commit: 185021112f180419aa7e1704ec60b2c515770876
Parents: f149197
Author: ghermann <re...@gmail.com>
Authored: Wed Aug 27 16:38:10 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Aug 29 21:01:57 2014 +0200

----------------------------------------------------------------------
 .../streaming/api/datastream/DataStream.java    | 245 +++++++++----------
 .../environment/StreamExecutionEnvironment.java |  39 ++-
 .../api/invokable/operator/co/CoInvokable.java  |   2 +-
 .../flink/streaming/util/ClusterUtil.java       |  23 +-
 .../streamcomponent/StreamComponentTest.java    | 150 ++++++++++--
 5 files changed, 291 insertions(+), 168 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/18502111/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index ead9c35..3da939c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -93,13 +93,11 @@ public abstract class DataStream<OUT> {
 	 * @param operatorType
 	 *            The type of the operator in the component
 	 */
-	public DataStream(StreamExecutionEnvironment environment,
-			String operatorType) {
+	public DataStream(StreamExecutionEnvironment environment, String operatorType) {
 		if (environment == null) {
 			throw new NullPointerException("context is null");
 		}
 
-		// TODO add name based on component number an preferable sequential id
 		counter++;
 		this.id = operatorType + "-" + counter.toString();
 		this.environment = environment;
@@ -120,8 +118,7 @@ public abstract class DataStream<OUT> {
 		this.environment = dataStream.environment;
 		this.id = dataStream.id;
 		this.degreeOfParallelism = dataStream.degreeOfParallelism;
-		this.userDefinedNames = new ArrayList<String>(
-				dataStream.userDefinedNames);
+		this.userDefinedNames = new ArrayList<String>(dataStream.userDefinedNames);
 		this.partitioner = dataStream.partitioner;
 		this.jobGraphBuilder = dataStream.jobGraphBuilder;
 
@@ -180,8 +177,7 @@ public abstract class DataStream<OUT> {
 	 * @return The {@link ConnectedDataStream}.
 	 */
 	public <R> ConnectedDataStream<OUT, R> connect(DataStream<R> dataStream) {
-		return new ConnectedDataStream<OUT, R>(environment, jobGraphBuilder,
-				this, dataStream);
+		return new ConnectedDataStream<OUT, R>(environment, jobGraphBuilder, this, dataStream);
 	}
 
 	/**
@@ -194,8 +190,7 @@ public abstract class DataStream<OUT> {
 	 */
 	public DataStream<OUT> partitionBy(int keyPosition) {
 		if (keyPosition < 0) {
-			throw new IllegalArgumentException(
-					"The position of the field must be non-negative");
+			throw new IllegalArgumentException("The position of the field must be non-negative");
 		}
 
 		return setConnectionType(new FieldsPartitioner<OUT>(keyPosition));
@@ -257,13 +252,13 @@ public abstract class DataStream<OUT> {
 	 * @return The transformed {@link DataStream}.
 	 */
 	public <R> SingleOutputStreamOperator<R, ?> map(MapFunction<OUT, R> mapper) {
-		FunctionTypeWrapper<OUT> inTypeWrapper = new FunctionTypeWrapper<OUT>(
-				mapper, MapFunction.class, 0);
-		FunctionTypeWrapper<R> outTypeWrapper = new FunctionTypeWrapper<R>(
-				mapper, MapFunction.class, 1);
+		FunctionTypeWrapper<OUT> inTypeWrapper = new FunctionTypeWrapper<OUT>(mapper,
+				MapFunction.class, 0);
+		FunctionTypeWrapper<R> outTypeWrapper = new FunctionTypeWrapper<R>(mapper,
+				MapFunction.class, 1);
 
-		return addFunction("map", mapper, inTypeWrapper, outTypeWrapper,
-				new MapInvokable<OUT, R>(mapper));
+		return addFunction("map", mapper, inTypeWrapper, outTypeWrapper, new MapInvokable<OUT, R>(
+				mapper));
 	}
 
 	/**
@@ -282,20 +277,20 @@ public abstract class DataStream<OUT> {
 	 *            output type
 	 * @return The transformed {@link DataStream}.
 	 */
-	public <R> SingleOutputStreamOperator<R, ?> flatMap(
-			FlatMapFunction<OUT, R> flatMapper) {
-		FunctionTypeWrapper<OUT> inTypeWrapper = new FunctionTypeWrapper<OUT>(
-				flatMapper, FlatMapFunction.class, 0);
-		FunctionTypeWrapper<R> outTypeWrapper = new FunctionTypeWrapper<R>(
-				flatMapper, FlatMapFunction.class, 1);
+	public <R> SingleOutputStreamOperator<R, ?> flatMap(FlatMapFunction<OUT, R> flatMapper) {
+		FunctionTypeWrapper<OUT> inTypeWrapper = new FunctionTypeWrapper<OUT>(flatMapper,
+				FlatMapFunction.class, 0);
+		FunctionTypeWrapper<R> outTypeWrapper = new FunctionTypeWrapper<R>(flatMapper,
+				FlatMapFunction.class, 1);
 
-		return addFunction("flatMap", flatMapper, inTypeWrapper,
-				outTypeWrapper, new FlatMapInvokable<OUT, R>(flatMapper));
+		return addFunction("flatMap", flatMapper, inTypeWrapper, outTypeWrapper,
+				new FlatMapInvokable<OUT, R>(flatMapper));
 	}
-	
+
 	/**
-	 * Applies a reduce transformation on the data stream. The user can also extend the {@link RichReduceFunction} to gain access to other features provided by
-	 * the {@link RichFuntion} interface.
+	 * Applies a reduce transformation on the data stream. The user can also
+	 * extend the {@link RichReduceFunction} to gain access to other features
+	 * provided by the {@link RichFuntion} interface.
 	 * 
 	 * @param reducer
 	 *            The {@link ReduceFunction} that will be called for every
@@ -303,12 +298,13 @@ public abstract class DataStream<OUT> {
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reducer) {
-		return addFunction("reduce", reducer, new FunctionTypeWrapper<OUT>(reducer, ReduceFunction.class, 0),
-				new FunctionTypeWrapper<OUT>(reducer, ReduceFunction.class, 0), new StreamReduceInvokable<OUT>(reducer));
+		return addFunction("reduce", reducer, new FunctionTypeWrapper<OUT>(reducer,
+				ReduceFunction.class, 0), new FunctionTypeWrapper<OUT>(reducer,
+				ReduceFunction.class, 0), new StreamReduceInvokable<OUT>(reducer));
 	}
 
 	public GroupedDataStream<OUT> groupBy(int keyPosition) {
-		return new GroupedDataStream<OUT>(this,	keyPosition);
+		return new GroupedDataStream<OUT>(this, keyPosition);
 	}
 
 	/**
@@ -328,8 +324,8 @@ public abstract class DataStream<OUT> {
 	 *            output type
 	 * @return The transformed {@link DataStream}.
 	 */
-	public <R> SingleOutputStreamOperator<R, ?> batchReduce(
-			GroupReduceFunction<OUT, R> reducer, long batchSize) {
+	public <R> SingleOutputStreamOperator<R, ?> batchReduce(GroupReduceFunction<OUT, R> reducer,
+			long batchSize) {
 		return batchReduce(reducer, batchSize, batchSize);
 	}
 
@@ -353,16 +349,22 @@ public abstract class DataStream<OUT> {
 	 *            output type
 	 * @return The transformed {@link DataStream}.
 	 */
-	public <R> SingleOutputStreamOperator<R, ?> batchReduce(
-			GroupReduceFunction<OUT, R> reducer, long batchSize, long slideSize) {
-		FunctionTypeWrapper<OUT> inTypeWrapper = new FunctionTypeWrapper<OUT>(
-				reducer, GroupReduceFunction.class, 0);
-		FunctionTypeWrapper<R> outTypeWrapper = new FunctionTypeWrapper<R>(
-				reducer, GroupReduceFunction.class, 1);
+	public <R> SingleOutputStreamOperator<R, ?> batchReduce(GroupReduceFunction<OUT, R> reducer,
+			long batchSize, long slideSize) {
+		if (batchSize < 1) {
+			throw new IllegalArgumentException("Batch size must be positive");
+		}
+		if (slideSize < 1) {
+			throw new IllegalArgumentException("Slide size must be positive");
+		}
 
-		return addFunction("batchReduce", reducer, inTypeWrapper,
-				outTypeWrapper, new BatchReduceInvokable<OUT, R>(reducer,
-						batchSize, slideSize));
+		FunctionTypeWrapper<OUT> inTypeWrapper = new FunctionTypeWrapper<OUT>(reducer,
+				GroupReduceFunction.class, 0);
+		FunctionTypeWrapper<R> outTypeWrapper = new FunctionTypeWrapper<R>(reducer,
+				GroupReduceFunction.class, 1);
+
+		return addFunction("batchReduce", reducer, inTypeWrapper, outTypeWrapper,
+				new BatchReduceInvokable<OUT, R>(reducer, batchSize, slideSize));
 	}
 
 	/**
@@ -384,8 +386,8 @@ public abstract class DataStream<OUT> {
 	 *            output type
 	 * @return The transformed DataStream.
 	 */
-	public <R> SingleOutputStreamOperator<R, ?> windowReduce(
-			GroupReduceFunction<OUT, R> reducer, long windowSize) {
+	public <R> SingleOutputStreamOperator<R, ?> windowReduce(GroupReduceFunction<OUT, R> reducer,
+			long windowSize) {
 		return windowReduce(reducer, windowSize, windowSize);
 	}
 
@@ -410,17 +412,22 @@ public abstract class DataStream<OUT> {
 	 *            output type
 	 * @return The transformed DataStream.
 	 */
-	public <R> SingleOutputStreamOperator<R, ?> windowReduce(
-			GroupReduceFunction<OUT, R> reducer, long windowSize,
-			long slideInterval) {
-		FunctionTypeWrapper<OUT> inTypeWrapper = new FunctionTypeWrapper<OUT>(
-				reducer, GroupReduceFunction.class, 0);
-		FunctionTypeWrapper<R> outTypeWrapper = new FunctionTypeWrapper<R>(
-				reducer, GroupReduceFunction.class, 1);
+	public <R> SingleOutputStreamOperator<R, ?> windowReduce(GroupReduceFunction<OUT, R> reducer,
+			long windowSize, long slideInterval) {
+		if (windowSize < 1) {
+			throw new IllegalArgumentException("Window size must be positive");
+		}
+		if (slideInterval < 1) {
+			throw new IllegalArgumentException("Slide interval must be positive");
+		}
 
-		return addFunction("batchReduce", reducer, inTypeWrapper,
-				outTypeWrapper, new WindowReduceInvokable<OUT, R>(reducer,
-						windowSize, slideInterval));
+		FunctionTypeWrapper<OUT> inTypeWrapper = new FunctionTypeWrapper<OUT>(reducer,
+				GroupReduceFunction.class, 0);
+		FunctionTypeWrapper<R> outTypeWrapper = new FunctionTypeWrapper<R>(reducer,
+				GroupReduceFunction.class, 1);
+
+		return addFunction("batchReduce", reducer, inTypeWrapper, outTypeWrapper,
+				new WindowReduceInvokable<OUT, R>(reducer, windowSize, slideInterval));
 	}
 
 	/**
@@ -437,11 +444,11 @@ public abstract class DataStream<OUT> {
 	 * @return The filtered DataStream.
 	 */
 	public SingleOutputStreamOperator<OUT, ?> filter(FilterFunction<OUT> filter) {
-		FunctionTypeWrapper<OUT> typeWrapper = new FunctionTypeWrapper<OUT>(
-				filter, FilterFunction.class, 0);
+		FunctionTypeWrapper<OUT> typeWrapper = new FunctionTypeWrapper<OUT>(filter,
+				FilterFunction.class, 0);
 
-		return addFunction("filter", filter, typeWrapper, typeWrapper,
-				new FilterInvokable<OUT>(filter));
+		return addFunction("filter", filter, typeWrapper, typeWrapper, new FilterInvokable<OUT>(
+				filter));
 	}
 
 	/**
@@ -454,8 +461,7 @@ public abstract class DataStream<OUT> {
 	public DataStreamSink<OUT> print() {
 		DataStream<OUT> inputStream = this.copy();
 		PrintSinkFunction<OUT> printFunction = new PrintSinkFunction<OUT>();
-		DataStreamSink<OUT> returnStream = addSink(inputStream, printFunction,
-				null);
+		DataStreamSink<OUT> returnStream = addSink(inputStream, printFunction, null);
 
 		jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
 
@@ -490,8 +496,7 @@ public abstract class DataStream<OUT> {
 	 * @return The closed DataStream
 	 */
 	public DataStreamSink<OUT> writeAsText(String path, long millis) {
-		return writeAsText(this, path, new WriteFormatAsText<OUT>(), millis,
-				null);
+		return writeAsText(this, path, new WriteFormatAsText<OUT>(), millis, null);
 	}
 
 	/**
@@ -509,8 +514,7 @@ public abstract class DataStream<OUT> {
 	 * @return The closed DataStream
 	 */
 	public DataStreamSink<OUT> writeAsText(String path, int batchSize) {
-		return writeAsText(this, path, new WriteFormatAsText<OUT>(), batchSize,
-				null);
+		return writeAsText(this, path, new WriteFormatAsText<OUT>(), batchSize, null);
 	}
 
 	/**
@@ -531,10 +535,8 @@ public abstract class DataStream<OUT> {
 	 * 
 	 * @return The closed DataStream
 	 */
-	public DataStreamSink<OUT> writeAsText(String path, long millis,
-			OUT endTuple) {
-		return writeAsText(this, path, new WriteFormatAsText<OUT>(), millis,
-				endTuple);
+	public DataStreamSink<OUT> writeAsText(String path, long millis, OUT endTuple) {
+		return writeAsText(this, path, new WriteFormatAsText<OUT>(), millis, endTuple);
 	}
 
 	/**
@@ -556,10 +558,8 @@ public abstract class DataStream<OUT> {
 	 * 
 	 * @return The closed DataStream
 	 */
-	public DataStreamSink<OUT> writeAsText(String path, int batchSize,
-			OUT endTuple) {
-		return writeAsText(this, path, new WriteFormatAsText<OUT>(), batchSize,
-				endTuple);
+	public DataStreamSink<OUT> writeAsText(String path, int batchSize, OUT endTuple) {
+		return writeAsText(this, path, new WriteFormatAsText<OUT>(), batchSize, endTuple);
 	}
 
 	/**
@@ -580,12 +580,10 @@ public abstract class DataStream<OUT> {
 	 * 
 	 * @return the data stream constructed
 	 */
-	private DataStreamSink<OUT> writeAsText(DataStream<OUT> inputStream,
-			String path, WriteFormatAsText<OUT> format, long millis,
-			OUT endTuple) {
-		DataStreamSink<OUT> returnStream = addSink(inputStream,
-				new WriteSinkFunctionByMillis<OUT>(path, format, millis,
-						endTuple), null);
+	private DataStreamSink<OUT> writeAsText(DataStream<OUT> inputStream, String path,
+			WriteFormatAsText<OUT> format, long millis, OUT endTuple) {
+		DataStreamSink<OUT> returnStream = addSink(inputStream, new WriteSinkFunctionByMillis<OUT>(
+				path, format, millis, endTuple), null);
 		jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
 		jobGraphBuilder.setMutability(returnStream.getId(), false);
 		return returnStream;
@@ -610,12 +608,10 @@ public abstract class DataStream<OUT> {
 	 * 
 	 * @return the data stream constructed
 	 */
-	private DataStreamSink<OUT> writeAsText(DataStream<OUT> inputStream,
-			String path, WriteFormatAsText<OUT> format, int batchSize,
-			OUT endTuple) {
+	private DataStreamSink<OUT> writeAsText(DataStream<OUT> inputStream, String path,
+			WriteFormatAsText<OUT> format, int batchSize, OUT endTuple) {
 		DataStreamSink<OUT> returnStream = addSink(inputStream,
-				new WriteSinkFunctionByBatches<OUT>(path, format, batchSize,
-						endTuple), null);
+				new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple), null);
 		jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
 		jobGraphBuilder.setMutability(returnStream.getId(), false);
 		return returnStream;
@@ -667,8 +663,7 @@ public abstract class DataStream<OUT> {
 	 * @return The closed DataStream
 	 */
 	public DataStreamSink<OUT> writeAsCsv(String path, int batchSize) {
-		return writeAsCsv(this, path, new WriteFormatAsCsv<OUT>(), batchSize,
-				null);
+		return writeAsCsv(this, path, new WriteFormatAsCsv<OUT>(), batchSize, null);
 	}
 
 	/**
@@ -690,8 +685,7 @@ public abstract class DataStream<OUT> {
 	 * @return The closed DataStream
 	 */
 	public DataStreamSink<OUT> writeAsCsv(String path, long millis, OUT endTuple) {
-		return writeAsCsv(this, path, new WriteFormatAsCsv<OUT>(), millis,
-				endTuple);
+		return writeAsCsv(this, path, new WriteFormatAsCsv<OUT>(), millis, endTuple);
 	}
 
 	/**
@@ -713,13 +707,11 @@ public abstract class DataStream<OUT> {
 	 * 
 	 * @return The closed DataStream
 	 */
-	public DataStreamSink<OUT> writeAsCsv(String path, int batchSize,
-			OUT endTuple) {
+	public DataStreamSink<OUT> writeAsCsv(String path, int batchSize, OUT endTuple) {
 		if (this instanceof SingleOutputStreamOperator) {
 			((SingleOutputStreamOperator<?, ?>) this).setMutability(false);
 		}
-		return writeAsCsv(this, path, new WriteFormatAsCsv<OUT>(), batchSize,
-				endTuple);
+		return writeAsCsv(this, path, new WriteFormatAsCsv<OUT>(), batchSize, endTuple);
 	}
 
 	/**
@@ -740,11 +732,10 @@ public abstract class DataStream<OUT> {
 	 * 
 	 * @return the data stream constructed
 	 */
-	private DataStreamSink<OUT> writeAsCsv(DataStream<OUT> inputStream,
-			String path, WriteFormatAsCsv<OUT> format, long millis, OUT endTuple) {
-		DataStreamSink<OUT> returnStream = addSink(inputStream,
-				new WriteSinkFunctionByMillis<OUT>(path, format, millis,
-						endTuple));
+	private DataStreamSink<OUT> writeAsCsv(DataStream<OUT> inputStream, String path,
+			WriteFormatAsCsv<OUT> format, long millis, OUT endTuple) {
+		DataStreamSink<OUT> returnStream = addSink(inputStream, new WriteSinkFunctionByMillis<OUT>(
+				path, format, millis, endTuple));
 		jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
 		jobGraphBuilder.setMutability(returnStream.getId(), false);
 		return returnStream;
@@ -769,12 +760,10 @@ public abstract class DataStream<OUT> {
 	 * 
 	 * @return the data stream constructed
 	 */
-	private DataStreamSink<OUT> writeAsCsv(DataStream<OUT> inputStream,
-			String path, WriteFormatAsCsv<OUT> format, int batchSize,
-			OUT endTuple) {
+	private DataStreamSink<OUT> writeAsCsv(DataStream<OUT> inputStream, String path,
+			WriteFormatAsCsv<OUT> format, int batchSize, OUT endTuple) {
 		DataStreamSink<OUT> returnStream = addSink(inputStream,
-				new WriteSinkFunctionByBatches<OUT>(path, format, batchSize,
-						endTuple), null);
+				new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple), null);
 		jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
 		jobGraphBuilder.setMutability(returnStream.getId(), false);
 		return returnStream;
@@ -806,14 +795,12 @@ public abstract class DataStream<OUT> {
 		return new IterativeDataStream<OUT>(this);
 	}
 
-	protected <R> DataStream<OUT> addIterationSource(String iterationID,
-			long waitTime) {
+	protected <R> DataStream<OUT> addIterationSource(String iterationID, long waitTime) {
 
-		DataStream<R> returnStream = new DataStreamSource<R>(environment,
-				"iterationSource");
+		DataStream<R> returnStream = new DataStreamSource<R>(environment, "iterationSource");
 
-		jobGraphBuilder.addIterationSource(returnStream.getId(), this.getId(),
-				iterationID, degreeOfParallelism, waitTime);
+		jobGraphBuilder.addIterationSource(returnStream.getId(), this.getId(), iterationID,
+				degreeOfParallelism, waitTime);
 
 		return this.copy();
 	}
@@ -837,17 +824,15 @@ public abstract class DataStream<OUT> {
 			TypeSerializerWrapper<OUT> inTypeWrapper,
 			TypeSerializerWrapper<R> outTypeWrapper,
 			StreamOperatorInvokable<OUT, R> functionInvokable) {
-
 		DataStream<OUT> inputStream = this.copy();
 		@SuppressWarnings({ "unchecked", "rawtypes" })
-		SingleOutputStreamOperator<R, ?> returnStream = new SingleOutputStreamOperator(
-				environment, functionName);
+		SingleOutputStreamOperator<R, ?> returnStream = new SingleOutputStreamOperator(environment,
+				functionName);
 
 		try {
-			jobGraphBuilder.addTask(returnStream.getId(), functionInvokable,
-					inTypeWrapper, outTypeWrapper, functionName,
-					SerializationUtils.serialize((Serializable) function),
-					degreeOfParallelism);
+			jobGraphBuilder.addTask(returnStream.getId(), functionInvokable, inTypeWrapper,
+					outTypeWrapper, functionName,
+					SerializationUtils.serialize((Serializable) function), degreeOfParallelism);
 		} catch (SerializationException e) {
 			throw new RuntimeException("Cannot serialize user defined function");
 		}
@@ -856,8 +841,7 @@ public abstract class DataStream<OUT> {
 
 		if (inputStream instanceof IterativeDataStream) {
 			IterativeDataStream<OUT> iterativeStream = (IterativeDataStream<OUT>) inputStream;
-			returnStream.addIterationSource(
-					iterativeStream.iterationID.toString(),
+			returnStream.addIterationSource(iterativeStream.iterationID.toString(),
 					iterativeStream.waitTime);
 		}
 
@@ -871,8 +855,7 @@ public abstract class DataStream<OUT> {
 	 *            Partitioner to set.
 	 * @return The modified DataStream.
 	 */
-	protected DataStream<OUT> setConnectionType(
-			StreamPartitioner<OUT> partitioner) {
+	protected DataStream<OUT> setConnectionType(StreamPartitioner<OUT> partitioner) {
 		DataStream<OUT> returnStream = this.copy();
 
 		returnStream.partitioner = partitioner;
@@ -893,18 +876,15 @@ public abstract class DataStream<OUT> {
 	 * @param typeNumber
 	 *            Number of the type (used at co-functions)
 	 */
-	protected <X> void connectGraph(DataStream<X> inputStream, String outputID,
-			int typeNumber) {
+	protected <X> void connectGraph(DataStream<X> inputStream, String outputID, int typeNumber) {
 		if (inputStream instanceof MergedDataStream) {
 			for (DataStream<X> stream : ((MergedDataStream<X>) inputStream).mergedStreams) {
-				jobGraphBuilder.setEdge(stream.getId(), outputID,
-						stream.partitioner, typeNumber,
+				jobGraphBuilder.setEdge(stream.getId(), outputID, stream.partitioner, typeNumber,
 						inputStream.userDefinedNames);
 			}
 		} else {
-			jobGraphBuilder.setEdge(inputStream.getId(), outputID,
-					inputStream.partitioner, typeNumber,
-					inputStream.userDefinedNames);
+			jobGraphBuilder.setEdge(inputStream.getId(), outputID, inputStream.partitioner,
+					typeNumber, inputStream.userDefinedNames);
 		}
 
 	}
@@ -922,23 +902,18 @@ public abstract class DataStream<OUT> {
 		return addSink(this.copy(), sinkFunction);
 	}
 
-	private DataStreamSink<OUT> addSink(DataStream<OUT> inputStream,
-			SinkFunction<OUT> sinkFunction) {
-		return addSink(inputStream, sinkFunction,
-				new FunctionTypeWrapper<OUT>(sinkFunction,
-						SinkFunction.class, 0));
+	private DataStreamSink<OUT> addSink(DataStream<OUT> inputStream, SinkFunction<OUT> sinkFunction) {
+		return addSink(inputStream, sinkFunction, new FunctionTypeWrapper<OUT>(sinkFunction,
+				SinkFunction.class, 0));
 	}
 
 	private DataStreamSink<OUT> addSink(DataStream<OUT> inputStream,
-			SinkFunction<OUT> sinkFunction,
-			TypeSerializerWrapper<OUT> typeWrapper) {
-		DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment,
-				"sink");
+			SinkFunction<OUT> sinkFunction, TypeSerializerWrapper<OUT> typeWrapper) {
+		DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink");
 
 		try {
-			jobGraphBuilder.addSink(returnStream.getId(),
-					new SinkInvokable<OUT>(sinkFunction), typeWrapper, "sink",
-					SerializationUtils.serialize(sinkFunction),
+			jobGraphBuilder.addSink(returnStream.getId(), new SinkInvokable<OUT>(sinkFunction),
+					typeWrapper, "sink", SerializationUtils.serialize(sinkFunction),
 					degreeOfParallelism);
 		} catch (SerializationException e) {
 			throw new RuntimeException("Cannot serialize SinkFunction");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/18502111/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 5d27786..7d983ad 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.api.environment;
 
+import java.io.File;
 import java.io.Serializable;
 import java.util.Collection;
 
@@ -117,6 +118,10 @@ public abstract class StreamExecutionEnvironment {
 	 *            The maximum time between two output flushes.
 	 */
 	public StreamExecutionEnvironment setBufferTimeout(long timeoutMillis) {
+		if (timeoutMillis < 0) {
+			throw new IllegalArgumentException("Timeout of buffer must be non-negative");
+		}
+
 		this.buffertimeout = timeoutMillis;
 		return this;
 	}
@@ -155,10 +160,12 @@ public abstract class StreamExecutionEnvironment {
 	 * @return The DataStream representing the text file.
 	 */
 	public DataStreamSource<String> readTextFile(String filePath) {
+		checkIfFileExists(filePath);
 		return addSource(new FileSourceFunction(filePath), 1);
 	}
 
 	public DataStreamSource<String> readTextFile(String filePath, int parallelism) {
+		checkIfFileExists(filePath);
 		return addSource(new FileSourceFunction(filePath), parallelism);
 	}
 
@@ -173,13 +180,31 @@ public abstract class StreamExecutionEnvironment {
 	 * @return The DataStream representing the text file.
 	 */
 	public DataStreamSource<String> readTextStream(String filePath) {
+		checkIfFileExists(filePath);
 		return addSource(new FileStreamFunction(filePath), 1);
 	}
 
 	public DataStreamSource<String> readTextStream(String filePath, int parallelism) {
+		checkIfFileExists(filePath);
 		return addSource(new FileStreamFunction(filePath), parallelism);
 	}
 
+
+	private static void checkIfFileExists(String filePath) {
+		File file = new File(filePath);
+		if (!file.exists()) {
+			throw new IllegalArgumentException("File not found: " + filePath);
+		}
+
+		if (!file.canRead()) {
+			throw new IllegalArgumentException("Cannot read file: " + filePath);
+		}
+		
+		if (file.isDirectory()) {
+			throw new IllegalArgumentException("Given path is a directory: " + filePath);
+		}
+	}
+	
 	/**
 	 * Creates a new DataStream that contains the given elements. The elements
 	 * must all be of the same type, for example, all of the String or Integer.
@@ -196,6 +221,11 @@ public abstract class StreamExecutionEnvironment {
 	public <OUT extends Serializable> DataStreamSource<OUT> fromElements(OUT... data) {
 		DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "elements");
 
+		if (data.length == 0) {
+			throw new IllegalArgumentException(
+					"fromElements needs at least one element as argument");
+		}
+
 		try {
 			SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
 			jobGraphBuilder.addSource(returnStream.getId(), new SourceInvokable<OUT>(function),
@@ -222,8 +252,12 @@ public abstract class StreamExecutionEnvironment {
 	public <OUT extends Serializable> DataStreamSource<OUT> fromCollection(Collection<OUT> data) {
 		DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "elements");
 
+		if (data == null) {
+			throw new NullPointerException("Collection must not be null");
+		}
+
 		if (data.isEmpty()) {
-			throw new RuntimeException("Collection must not be empty");
+			throw new IllegalArgumentException("Collection must not be empty");
 		}
 
 		try {
@@ -249,6 +283,9 @@ public abstract class StreamExecutionEnvironment {
 	 * @return A DataStrean, containing all number in the [from, to] interval.
 	 */
 	public DataStreamSource<Long> generateSequence(long from, long to) {
+		if (from > to) {
+			throw new IllegalArgumentException("Start of sequence must not be greater than the end");
+		}
 		return addSource(new GenSequenceFunction(from, to), 1);
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/18502111/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
index e9fefc3..c21e784 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
@@ -34,7 +34,7 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<OUT> {
 	}
 
 	private static final long serialVersionUID = 1L;
-	private static final Log LOG = LogFactory.getLog(StreamInvokable.class);
+	private static final Log LOG = LogFactory.getLog(CoInvokable.class);
 
 	protected CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> recordIterator;
 	protected StreamRecord<IN1> reuse1;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/18502111/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
index 588becf..50a78b3 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
@@ -23,11 +23,14 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.flink.client.minicluster.NepheleMiniCluster;
 import org.apache.flink.client.program.Client;
+import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 
 public class ClusterUtil {
+
 	private static final Log LOG = LogFactory.getLog(ClusterUtil.class);
+	public static final String CANNOT_EXECUTE_EMPTY_JOB = "Cannot execute empty job";
 
 	/**
 	 * Executes the given JobGraph locally, on a NepheleMiniCluster
@@ -49,24 +52,32 @@ public class ClusterUtil {
 		if (LOG.isInfoEnabled()) {
 			LOG.info("Running on mini cluster");
 		}
-		
+
 		try {
 			exec.start();
 
-			Client client = new Client(new InetSocketAddress("localhost", exec.getJobManagerRpcPort()), configuration, ClusterUtil.class.getClassLoader());
+			Client client = new Client(new InetSocketAddress("localhost",
+					exec.getJobManagerRpcPort()), configuration, ClusterUtil.class.getClassLoader());
 			client.run(jobGraph, true);
 
+		} catch (ProgramInvocationException e) {
+			if (e.getMessage().contains("GraphConversionException")) {
+				throw new RuntimeException(CANNOT_EXECUTE_EMPTY_JOB, e);
+			} else {
+				throw new RuntimeException(e.getMessage(), e);
+			}
 		} catch (Exception e) {
-			throw new RuntimeException(e);
+			throw new RuntimeException(e.getMessage(), e);
 		} finally {
 			try {
 				exec.stop();
-			} catch (Throwable t) {}
+			} catch (Throwable t) {
+			}
 		}
 	}
 
 	public static void runOnMiniCluster(JobGraph jobGraph, int numberOfTaskTrackers) {
 		runOnMiniCluster(jobGraph, numberOfTaskTrackers, -1);
-	}
+	}
 
-}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/18502111/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
index b16ffee..b0799cd 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
@@ -18,28 +18,32 @@
 package org.apache.flink.streaming.api.streamcomponent;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 
 import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.co.CoMapFunction;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.util.ClusterUtil;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.LogUtils;
+import org.apache.log4j.Level;
+import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
 public class StreamComponentTest {
 
-	@SuppressWarnings("unused")
-	private static final int PARALLELISM = 1;
-	private static final int SOURCE_PARALELISM = 1;
-	private static final long MEMORYSIZE = 32;
-
 	private static Map<Integer, Integer> data = new HashMap<Integer, Integer>();
 
 	public static class MySource implements SourceFunction<Tuple1<Integer>> {
@@ -66,19 +70,6 @@ public class StreamComponentTest {
 		}
 	}
 
-	// TODO test multiple tasks
-	// public static class MyOtherTask extends MapFunction<Tuple1<Integer>,
-	// Tuple2<Integer, Integer>> {
-	// private static final long serialVersionUID = 1L;
-	//
-	// @Override
-	// public Tuple2<Integer, Integer> map(Tuple1<Integer> value) throws
-	// Exception {
-	// Integer i = value.f0;
-	// return new Tuple2<Integer, Integer>(-i - 1, -i - 2);
-	// }
-	// }
-
 	public static class MySink implements SinkFunction<Tuple2<Integer, Integer>> {
 		private static final long serialVersionUID = 1L;
 
@@ -90,19 +81,128 @@ public class StreamComponentTest {
 		}
 	}
 
+	@SuppressWarnings("unused")
+	private static final int PARALLELISM = 1;
+	private static final int SOURCE_PARALELISM = 1;
+	private static final long MEMORYSIZE = 32;
+
+	@Before
+	public void before() {
+		LogUtils.initializeDefaultConsoleLogger(Level.OFF);
+	}
+
+	@Ignore
 	@Test
-	public void runStream() {
+	public void wrongJobGraph() {
+		LocalStreamEnvironment env = StreamExecutionEnvironment
+				.createLocalEnvironment(SOURCE_PARALELISM);
+
+		try {
+			env.execute();
+			fail();
+		} catch (RuntimeException e) {
+			assertEquals(e.getMessage(), ClusterUtil.CANNOT_EXECUTE_EMPTY_JOB);
+		}
+
+		env.fromCollection(Arrays.asList("a", "b"));
+
+		try {
+			env.execute();
+			fail();
+		} catch (RuntimeException e) {
+			System.out.println(e.getMessage());
+		}
+
+		try {
+			env.fromCollection(null);
+			fail();
+		} catch (NullPointerException e) {
+		}
+
+		try {
+			env.fromElements();
+			fail();
+		} catch (IllegalArgumentException e) {
+		}
+
+		try {
+			env.generateSequence(-10, -30);
+			fail();
+		} catch (IllegalArgumentException e) {
+		}
+
+		try {
+			env.setBufferTimeout(-10);
+			fail();
+		} catch (IllegalArgumentException e) {
+		}
+
+		try {
+			env.setExecutionParallelism(-10);
+			fail();
+		} catch (IllegalArgumentException e) {
+		}
+
+		try {
+			env.readTextFile("random/path/that/is/not/valid");
+			fail();
+		} catch (IllegalArgumentException e) {
+		}
+	}
+
+	private static class CoMap implements CoMapFunction<String, Long, String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public String map1(String value) {
+			return value;
+		}
+
+		@Override
+		public String map2(Long value) {
+			return value.toString();
+		}
+	}
+
+	static HashSet<String> resultSet;
+	private static class SetSink implements SinkFunction<String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void invoke(String value) {
+			resultSet.add(value);
+		}
+	}
+	
+	@Test
+	public void coTest() {
+		LocalStreamEnvironment env = StreamExecutionEnvironment
+				.createLocalEnvironment(SOURCE_PARALELISM);
+
+		DataStream<String> fromStringElements = env.fromElements("aa", "bb", "cc");
+		DataStream<Long> generatedSequence = env.generateSequence(0, 3);
 		
-		LogUtils.initializeDefaultTestConsoleLogger();
+		fromStringElements.connect(generatedSequence).map(new CoMap()).addSink(new SetSink());
 		
-		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(SOURCE_PARALELISM);
+		resultSet = new HashSet<String>();
+		env.execute();
+		
+		HashSet<String> expectedSet = new HashSet<String>(Arrays.asList("aa", "bb", "cc", "0", "1", "2", "3"));
+		assertEquals(expectedSet, resultSet);
+	}
+
+	@Test
+	public void runStream() {
+
+		LogUtils.initializeDefaultTestConsoleLogger();
 
-		env
-				.addSource(new MySource(), SOURCE_PARALELISM).map(new MyTask())
-				.addSink(new MySink());
+		LocalStreamEnvironment env = StreamExecutionEnvironment
+				.createLocalEnvironment(SOURCE_PARALELISM);
+
+		env.addSource(new MySource(), SOURCE_PARALELISM).map(new MyTask()).addSink(new MySink());
 
 		env.executeTest(MEMORYSIZE);
-		
+
 		assertEquals(10, data.keySet().size());
 
 		for (Integer k : data.keySet()) {