You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/11/19 23:37:13 UTC

[1/2] incubator-flink git commit: [streaming] Fixed async buffer sending at end of AbstractInvokable

Repository: incubator-flink
Updated Branches:
  refs/heads/master 3ac4df81e -> a03637820


[streaming] Fixed async buffer sending at end of AbstractInvokable


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/a0363782
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/a0363782
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/a0363782

Branch: refs/heads/master
Commit: a036378204826009574a25ee9abd16b9ac71a9c3
Parents: d92a24a
Author: ghermann <re...@gmail.com>
Authored: Mon Nov 17 16:14:50 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Wed Nov 19 21:50:45 2014 +0100

----------------------------------------------------------------------
 .../api/invokable/StreamInvokable.java          |   1 +
 .../api/streamvertex/OutputHandler.java         |   7 +-
 .../flink/streaming/io/StreamRecordWriter.java  | 307 ++++++++++---------
 3 files changed, 172 insertions(+), 143 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0363782/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
index 9b17360..71739c1 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
@@ -152,6 +152,7 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable {
 	 */
 	public void close() throws Exception {
 		isRunning = false;
+		collector.close();
 		if (userFunction instanceof RichFunction) {
 			((RichFunction) userFunction).close();
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0363782/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
index d8eb146..cc67d6e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
@@ -129,7 +129,6 @@ public class OutputHandler<OUT> {
 				LOG.trace("StreamRecordWriter initiated with {} bufferTimeout for {}",
 						bufferTimeout, streamVertex.getClass().getSimpleName());
 			}
-
 		} else {
 			output = new RecordWriter<SerializationDelegate<StreamRecord<OUT>>>(streamVertex,
 					outputPartitioner);
@@ -155,7 +154,11 @@ public class OutputHandler<OUT> {
 
 	public void flushOutputs() throws IOException, InterruptedException {
 		for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) {
-			output.flush();
+			if (output instanceof StreamRecordWriter) {
+				((StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>) output).close();
+			} else {
+				output.flush();
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0363782/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
index 1237020..d7369f7 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
@@ -1,145 +1,170 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
  * limitations under the License.
- */
-
-package org.apache.flink.streaming.io;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.Buffer;
-import org.apache.flink.runtime.io.network.api.ChannelSelector;
-import org.apache.flink.runtime.io.network.api.RecordWriter;
-import org.apache.flink.runtime.io.network.api.RoundRobinChannelSelector;
-import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider;
-import org.apache.flink.runtime.io.network.serialization.RecordSerializer;
-import org.apache.flink.runtime.io.network.serialization.SpanningRecordSerializer;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-
-public class StreamRecordWriter<T extends IOReadableWritable> extends
-		RecordWriter<T> {
-
-	private final BufferProvider bufferPool;
-
-	private final ChannelSelector<T> channelSelector;
-
-	private int numChannels;
-
-	private long timeout;
-
-	/** RecordSerializer per outgoing channel */
-	private RecordSerializer<T>[] serializers;
-
-	// -----------------------------------------------------------------------------------------------------------------
-
-	public StreamRecordWriter(AbstractInvokable invokable) {
-		this(invokable, new RoundRobinChannelSelector<T>(), 100);
-	}
-
-	public StreamRecordWriter(AbstractInvokable invokable,
-			ChannelSelector<T> channelSelector) {
-		this(invokable, channelSelector, 100);
-	}
-
-	public StreamRecordWriter(AbstractInvokable invokable,
-			ChannelSelector<T> channelSelector, long timeout) {
-		// initialize the gate
-		super(invokable);
-
-		this.timeout = timeout;
-		this.bufferPool = invokable.getEnvironment().getOutputBufferProvider();
-		this.channelSelector = channelSelector;
-	}
-
-	// -----------------------------------------------------------------------------------------------------------------
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public void initializeSerializers() {
-		this.numChannels = this.outputGate.getNumChannels();
-		this.serializers = new RecordSerializer[numChannels];
-		for (int i = 0; i < this.numChannels; i++) {
-			this.serializers[i] = new SpanningRecordSerializer<T>();
+ */
+
+package org.apache.flink.streaming.io;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.io.network.Buffer;
+import org.apache.flink.runtime.io.network.api.ChannelSelector;
+import org.apache.flink.runtime.io.network.api.RecordWriter;
+import org.apache.flink.runtime.io.network.api.RoundRobinChannelSelector;
+import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider;
+import org.apache.flink.runtime.io.network.serialization.RecordSerializer;
+import org.apache.flink.runtime.io.network.serialization.SpanningRecordSerializer;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWriter<T> {
+
+	private final BufferProvider bufferPool;
+
+	private final ChannelSelector<T> channelSelector;
+
+	private int numChannels;
+
+	private long timeout;
+
+	private OutputFlusher outputFlusher;
+
+	/** RecordSerializer per outgoing channel */
+	private RecordSerializer<T>[] serializers;
+
+	private ArrayList<TargetChannel> targetChannels;
+
+	// -----------------------------------------------------------------------------------------------------------------
+
+	public StreamRecordWriter(AbstractInvokable invokable) {
+		this(invokable, new RoundRobinChannelSelector<T>(), 1000);
+	}
+
+	public StreamRecordWriter(AbstractInvokable invokable, ChannelSelector<T> channelSelector) {
+		this(invokable, channelSelector, 1000);
+	}
+
+	public StreamRecordWriter(AbstractInvokable invokable, ChannelSelector<T> channelSelector,
+			long timeout) {
+		// initialize the gate
+		super(invokable);
+
+		this.timeout = timeout;
+		this.bufferPool = invokable.getEnvironment().getOutputBufferProvider();
+		this.channelSelector = channelSelector;
+	}
+
+	// -----------------------------------------------------------------------------------------------------------------
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void initializeSerializers() {
+		this.numChannels = this.outputGate.getNumChannels();
+		this.serializers = new RecordSerializer[numChannels];
+		this.targetChannels = new ArrayList<TargetChannel>(numChannels);
+
+		for (int i = 0; i < this.numChannels; i++) {
+			this.serializers[i] = new SpanningRecordSerializer<T>();
+			this.targetChannels.add(new TargetChannel(i));
 		}
-		
-		//start a separate thread to handle positive flush intervals
-		if (timeout > 0) {
-			(new OutputFlusher()).start();
-		}
-	}
-
-	@Override
-	public void emit(final T record) throws IOException, InterruptedException {
-		for (int targetChannel : this.channelSelector.selectChannels(record,
-				this.numChannels)) {
-			// serialize with corresponding serializer and send full buffer
-
-			RecordSerializer<T> serializer = this.serializers[targetChannel];
-
-			synchronized (serializer) {
-				RecordSerializer.SerializationResult result = serializer
-						.addRecord(record);
-				while (result.isFullBuffer()) {
-					Buffer buffer = serializer.getCurrentBuffer();
-					if (buffer != null) {
-						sendBuffer(buffer, targetChannel);
-					}
-
-					buffer = this.bufferPool
-							.requestBufferBlocking(this.bufferPool
-									.getBufferSize());
-					result = serializer.setNextBuffer(buffer);
-				}
+
+		outputFlusher = new OutputFlusher();
+		outputFlusher.start();
+	}
+
+	@Override
+	public void emit(final T record) throws IOException, InterruptedException {
+		for (int targetChannel : this.channelSelector.selectChannels(record, this.numChannels)) {
+			targetChannels.get(targetChannel).emit(record);
+		}
+	}
+
+	@Override
+	public void flush() throws IOException, InterruptedException {
+		for (int targetChannel = 0; targetChannel < this.numChannels; targetChannel++) {
+			targetChannels.get(targetChannel).flush();
+		}
+	}
+
+	public void close() {
+		try {
+			if (outputFlusher != null) {
+				outputFlusher.terminate();
+				outputFlusher.join();
+			}
+			flush();
+		} catch (IOException e) {
+			e.printStackTrace();
+		} catch (InterruptedException e) {
+			e.printStackTrace();
+		}
+	}
+
+	private class TargetChannel {
+
+		private int targetChannel;
+		private RecordSerializer<T> serializer;
+
+		public TargetChannel(int targetChannel) {
+			this.targetChannel = targetChannel;
+			this.serializer = serializers[targetChannel];
+		}
+
+		public synchronized void emit(final T record) throws IOException, InterruptedException {
+			RecordSerializer.SerializationResult result = serializer.addRecord(record);
+			while (result.isFullBuffer()) {
+				Buffer buffer = serializer.getCurrentBuffer();
+				if (buffer != null) {
+					sendBuffer(buffer, targetChannel);
+				}
+
+				buffer = bufferPool.requestBufferBlocking(bufferPool.getBufferSize());
+				result = serializer.setNextBuffer(buffer);
 			}
-			
-			if (timeout == 0){
-				flush();
-			}
-		}
-	}
-
-	@Override
-	public void flush() throws IOException, InterruptedException {
-		for (int targetChannel = 0; targetChannel < this.numChannels; targetChannel++) {
-			RecordSerializer<T> serializer = this.serializers[targetChannel];
-			synchronized (serializer) {
-				Buffer buffer = serializer.getCurrentBuffer();
-				if (buffer != null) {
-					sendBuffer(buffer, targetChannel);
-				}
-
-				serializer.clear();
-			}
-
-		}
-	}
-
-	private class OutputFlusher extends Thread {
-
-		@Override
-		public void run() {
-			while (!outputGate.isClosed()) {
-				try {
-					Thread.sleep(timeout);
-					flush();
-				} catch (Exception e) {
-					throw new RuntimeException(e);
-				}
-			}
-		}
-	}
-
-}
+		}
+
+		public synchronized void flush() throws IOException, InterruptedException {
+			Buffer buffer = serializer.getCurrentBuffer();
+			if (buffer != null) {
+				sendBuffer(buffer, targetChannel);
+			}
+
+			serializer.clear();
+		}
+	}
+	
+	private class OutputFlusher extends Thread {
+
+		private boolean running = true;
+
+		public void terminate() {
+			running = false;
+		}
+
+		@Override
+		public void run() {
+			while (running && !outputGate.isClosed()) {
+				try {
+					flush();
+					Thread.sleep(timeout);
+				} catch (Exception e) {
+					throw new RuntimeException(e);
+				}
+			}
+		}
+	}
+
+}


[2/2] incubator-flink git commit: [streaming] Source parallelism API update

Posted by mb...@apache.org.
[streaming] Source parallelism API update

The new preferred way is calling setParallelism() after adding the SourceFounction. This solution works any other operator.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/d92a24a1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/d92a24a1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/d92a24a1

Branch: refs/heads/master
Commit: d92a24a1e66d877529098ce10325933794ef4549
Parents: 3ac4df8
Author: mbalassi <mb...@apache.org>
Authored: Sun Nov 16 23:43:41 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Wed Nov 19 21:50:45 2014 +0100

----------------------------------------------------------------------
 .../connectors/kafka/KafkaTopology.java         |  2 +-
 .../connectors/twitter/TwitterLocal.java        |  4 +--
 .../connectors/twitter/TwitterStreaming.java    |  4 +--
 .../environment/StreamExecutionEnvironment.java | 38 +++++++++-----------
 .../flink/streaming/api/WriteAsCsvTest.java     | 10 +++---
 .../flink/streaming/api/WriteAsTextTest.java    | 10 +++---
 .../api/streamvertex/StreamVertexTest.java      |  2 +-
 7 files changed, 32 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d92a24a1/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
index 089efad..4d043c4 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
@@ -95,7 +95,7 @@ public class KafkaTopology {
 
 		@SuppressWarnings("unused")
 		DataStream<String> stream1 = env
-			.addSource(new MyKafkaSource("localhost:2181", "group", "test", 1), SOURCE_PARALELISM)
+			.addSource(new MyKafkaSource("localhost:2181", "group", "test", 1)).setParallelism(1)
 			.addSink(new MyKafkaPrintSink());
 
 		@SuppressWarnings("unused")

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d92a24a1/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
index e53df32..3058047 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
@@ -72,8 +72,8 @@ public class TwitterLocal {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment
 				.createLocalEnvironment(PARALLELISM);
 
-		DataStream<String> streamSource = env.addSource(new TwitterSource(path, NUMBEROFTWEETS),
-				SOURCE_PARALLELISM);
+		DataStream<String> streamSource = env.addSource(new TwitterSource(path, NUMBEROFTWEETS))
+				.setParallelism(SOURCE_PARALLELISM);
 
 
 		DataStream<Tuple2<String, Integer>> dataStream = streamSource

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d92a24a1/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
index 6de90a5..a32fe1b 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
@@ -86,8 +86,8 @@ public class TwitterStreaming {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment
 				.createLocalEnvironment(PARALLELISM);
 
-		DataStream<String> streamSource = env.addSource(new TwitterSource(path, NUMBEROFTWEETS),
-				SOURCE_PARALLELISM);
+		DataStream<String> streamSource = env.addSource(new TwitterSource(path, NUMBEROFTWEETS))
+				.setParallelism(SOURCE_PARALLELISM);
 
 		DataStream<Tuple5<Long, Integer, String, String, String>> selectedDataStream = streamSource
 				.flatMap(new SelectDataFlatMap());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d92a24a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 7baabf9..ce86516 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -130,6 +130,16 @@ public abstract class StreamExecutionEnvironment {
 	public long getBufferTimeout() {
 		return this.bufferTimeout;
 	}
+	
+	/**
+	 * Sets the default parallelism that will be used for the local execution environment created by
+	 * {@link #createLocalEnvironment()}.
+	 * 
+	 * @param degreeOfParallelism The degree of parallelism to use as the default local parallelism.
+	 */
+	public static void setDefaultLocalParallelism(int degreeOfParallelism) {
+		defaultLocalDop = degreeOfParallelism;
+	}
 
 	// --------------------------------------------------------------------------------------------
 	// Data stream creations
@@ -147,12 +157,7 @@ public abstract class StreamExecutionEnvironment {
 	 */
 	public DataStreamSource<String> readTextFile(String filePath) {
 		checkIfFileExists(filePath);
-		return addSource(new FileSourceFunction(filePath), 1);
-	}
-
-	public DataStreamSource<String> readTextFile(String filePath, int parallelism) {
-		checkIfFileExists(filePath);
-		return addSource(new FileSourceFunction(filePath), parallelism);
+		return addSource(new FileSourceFunction(filePath));
 	}
 
 	/**
@@ -167,12 +172,7 @@ public abstract class StreamExecutionEnvironment {
 	 */
 	public DataStreamSource<String> readTextStream(String filePath) {
 		checkIfFileExists(filePath);
-		return addSource(new FileStreamFunction(filePath), 1);
-	}
-
-	public DataStreamSource<String> readTextStream(String filePath, int parallelism) {
-		checkIfFileExists(filePath);
-		return addSource(new FileStreamFunction(filePath), parallelism);
+		return addSource(new FileStreamFunction(filePath));
 	}
 
 	private static void checkIfFileExists(String filePath) {
@@ -275,7 +275,7 @@ public abstract class StreamExecutionEnvironment {
 		if (from > to) {
 			throw new IllegalArgumentException("Start of sequence must not be greater than the end");
 		}
-		return addSource(new GenSequenceFunction(from, to), 1);
+		return addSource(new GenSequenceFunction(from, to));
 	}
 
 	/**
@@ -283,13 +283,11 @@ public abstract class StreamExecutionEnvironment {
 	 * 
 	 * @param function
 	 *            the user defined function
-	 * @param parallelism
-	 *            number of parallel instances of the function
 	 * @param <OUT>
 	 *            type of the returned stream
 	 * @return the data stream constructed
 	 */
-	public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, int parallelism) {
+	public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function) {
 		TypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(function,
 				SourceFunction.class, 0);
 		DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "source",
@@ -298,7 +296,7 @@ public abstract class StreamExecutionEnvironment {
 		try {
 			jobGraphBuilder.addStreamVertex(returnStream.getId(),
 					new SourceInvokable<OUT>(function), null, outTypeWrapper, "source",
-					SerializationUtils.serialize(function), parallelism);
+					SerializationUtils.serialize(function), 1);
 		} catch (SerializationException e) {
 			throw new RuntimeException("Cannot serialize SourceFunction");
 		}
@@ -306,10 +304,6 @@ public abstract class StreamExecutionEnvironment {
 		return returnStream;
 	}
 
-	public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> sourceFunction) {
-		return addSource(sourceFunction, 1);
-	}
-
 	// --------------------------------------------------------------------------------------------
 	// Instantiation of Execution Contexts
 	// --------------------------------------------------------------------------------------------
@@ -453,7 +447,7 @@ public abstract class StreamExecutionEnvironment {
 	/**
 	 * Getter of the {@link JobGraphBuilder} of the streaming job.
 	 * 
-	 * @return jobgraph
+	 * @return jobGraphBuilder
 	 */
 	public JobGraphBuilder getJobGraphBuilder() {
 		return jobGraphBuilder;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d92a24a1/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
index b05e4e8..6b1dd5a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
@@ -135,27 +135,27 @@ public class WriteAsCsvTest {
 		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 
 		@SuppressWarnings("unused")
-		DataStream<Tuple1<Integer>> dataStream1 = env.addSource(new MySource1(), 1).writeAsCsv(PREFIX + "test1.txt");
+		DataStream<Tuple1<Integer>> dataStream1 = env.addSource(new MySource1()).writeAsCsv(PREFIX + "test1.txt");
 
 		fillExpected1();
 
 		@SuppressWarnings("unused")
-		DataStream<Tuple1<Integer>> dataStream2 = env.addSource(new MySource1(), 1).writeAsCsv(PREFIX + "test2.txt", 5);
+		DataStream<Tuple1<Integer>> dataStream2 = env.addSource(new MySource1()).writeAsCsv(PREFIX + "test2.txt", 5);
 
 		fillExpected2();
 
 		@SuppressWarnings("unused")
-		DataStream<Tuple1<Integer>> dataStream3 = env.addSource(new MySource1(), 1).writeAsCsv(PREFIX + "test3.txt", 10);
+		DataStream<Tuple1<Integer>> dataStream3 = env.addSource(new MySource1()).writeAsCsv(PREFIX + "test3.txt", 10);
 
 		fillExpected3();
 
 		@SuppressWarnings("unused")
-		DataStream<Tuple1<Integer>> dataStream4 = env.addSource(new MySource1(), 1).writeAsCsv(PREFIX + "test4.txt", 10, new Tuple1<Integer>(26));
+		DataStream<Tuple1<Integer>> dataStream4 = env.addSource(new MySource1()).writeAsCsv(PREFIX + "test4.txt", 10, new Tuple1<Integer>(26));
 
 		fillExpected4();
 
 		@SuppressWarnings("unused")
-		DataStream<Tuple1<Integer>> dataStream5 = env.addSource(new MySource1(), 1).writeAsCsv(PREFIX + "test5.txt", 10, new Tuple1<Integer>(14));
+		DataStream<Tuple1<Integer>> dataStream5 = env.addSource(new MySource1()).writeAsCsv(PREFIX + "test5.txt", 10, new Tuple1<Integer>(14));
 
 		fillExpected5();
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d92a24a1/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
index 01dedcc..e21f21d 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
@@ -135,27 +135,27 @@ public class WriteAsTextTest {
 		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 		
 		@SuppressWarnings("unused")
-		DataStream<Tuple1<Integer>> dataStream1 = env.addSource(new MySource1(), 1).writeAsText(PREFIX + "test1.txt");
+		DataStream<Tuple1<Integer>> dataStream1 = env.addSource(new MySource1()).writeAsText(PREFIX + "test1.txt");
 
 		fillExpected1();
 
 		@SuppressWarnings("unused")
-		DataStream<Tuple1<Integer>> dataStream2 = env.addSource(new MySource1(), 1).writeAsText(PREFIX + "test2.txt", 5);
+		DataStream<Tuple1<Integer>> dataStream2 = env.addSource(new MySource1()).writeAsText(PREFIX + "test2.txt", 5);
 
 		fillExpected2();
 
 		@SuppressWarnings("unused")
-		DataStream<Tuple1<Integer>> dataStream3 = env.addSource(new MySource1(), 1).writeAsText(PREFIX + "test3.txt", 10);
+		DataStream<Tuple1<Integer>> dataStream3 = env.addSource(new MySource1()).writeAsText(PREFIX + "test3.txt", 10);
 
 		fillExpected3();
 
 		@SuppressWarnings("unused")
-		DataStream<Tuple1<Integer>> dataStream4 = env.addSource(new MySource1(), 1).writeAsText(PREFIX + "test4.txt", 10, new Tuple1<Integer>(26));
+		DataStream<Tuple1<Integer>> dataStream4 = env.addSource(new MySource1()).writeAsText(PREFIX + "test4.txt", 10, new Tuple1<Integer>(26));
 
 		fillExpected4();
 
 		@SuppressWarnings("unused")
-		DataStream<Tuple1<Integer>> dataStream5 = env.addSource(new MySource1(), 1).writeAsText(PREFIX + "test5.txt", 10, new Tuple1<Integer>(14));
+		DataStream<Tuple1<Integer>> dataStream5 = env.addSource(new MySource1()).writeAsText(PREFIX + "test5.txt", 10, new Tuple1<Integer>(14));
 
 		fillExpected5();
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d92a24a1/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
index 765de9c..0b73c3a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
@@ -172,7 +172,7 @@ public class StreamVertexTest {
 		LocalStreamEnvironment env = StreamExecutionEnvironment
 				.createLocalEnvironment(SOURCE_PARALELISM);
 
-		env.addSource(new MySource(), SOURCE_PARALELISM).map(new MyTask()).addSink(new MySink());
+		env.addSource(new MySource()).setParallelism(SOURCE_PARALELISM).map(new MyTask()).addSink(new MySink());
 
 		env.executeTest(MEMORYSIZE);
 		assertEquals(10, data.keySet().size());