You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/11/19 23:37:13 UTC
[1/2] incubator-flink git commit: [streaming] Fixed async buffer
sending at end of AbstractInvokable
Repository: incubator-flink
Updated Branches:
refs/heads/master 3ac4df81e -> a03637820
[streaming] Fixed async buffer sending at end of AbstractInvokable
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/a0363782
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/a0363782
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/a0363782
Branch: refs/heads/master
Commit: a036378204826009574a25ee9abd16b9ac71a9c3
Parents: d92a24a
Author: ghermann <re...@gmail.com>
Authored: Mon Nov 17 16:14:50 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Wed Nov 19 21:50:45 2014 +0100
----------------------------------------------------------------------
.../api/invokable/StreamInvokable.java | 1 +
.../api/streamvertex/OutputHandler.java | 7 +-
.../flink/streaming/io/StreamRecordWriter.java | 307 ++++++++++---------
3 files changed, 172 insertions(+), 143 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0363782/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
index 9b17360..71739c1 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
@@ -152,6 +152,7 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable {
*/
public void close() throws Exception {
isRunning = false;
+ collector.close();
if (userFunction instanceof RichFunction) {
((RichFunction) userFunction).close();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0363782/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
index d8eb146..cc67d6e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
@@ -129,7 +129,6 @@ public class OutputHandler<OUT> {
LOG.trace("StreamRecordWriter initiated with {} bufferTimeout for {}",
bufferTimeout, streamVertex.getClass().getSimpleName());
}
-
} else {
output = new RecordWriter<SerializationDelegate<StreamRecord<OUT>>>(streamVertex,
outputPartitioner);
@@ -155,7 +154,11 @@ public class OutputHandler<OUT> {
public void flushOutputs() throws IOException, InterruptedException {
for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) {
- output.flush();
+ if (output instanceof StreamRecordWriter) {
+ ((StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>) output).close();
+ } else {
+ output.flush();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0363782/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
index 1237020..d7369f7 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
@@ -1,145 +1,170 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
* limitations under the License.
- */
-
-package org.apache.flink.streaming.io;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.Buffer;
-import org.apache.flink.runtime.io.network.api.ChannelSelector;
-import org.apache.flink.runtime.io.network.api.RecordWriter;
-import org.apache.flink.runtime.io.network.api.RoundRobinChannelSelector;
-import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider;
-import org.apache.flink.runtime.io.network.serialization.RecordSerializer;
-import org.apache.flink.runtime.io.network.serialization.SpanningRecordSerializer;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-
-public class StreamRecordWriter<T extends IOReadableWritable> extends
- RecordWriter<T> {
-
- private final BufferProvider bufferPool;
-
- private final ChannelSelector<T> channelSelector;
-
- private int numChannels;
-
- private long timeout;
-
- /** RecordSerializer per outgoing channel */
- private RecordSerializer<T>[] serializers;
-
- // -----------------------------------------------------------------------------------------------------------------
-
- public StreamRecordWriter(AbstractInvokable invokable) {
- this(invokable, new RoundRobinChannelSelector<T>(), 100);
- }
-
- public StreamRecordWriter(AbstractInvokable invokable,
- ChannelSelector<T> channelSelector) {
- this(invokable, channelSelector, 100);
- }
-
- public StreamRecordWriter(AbstractInvokable invokable,
- ChannelSelector<T> channelSelector, long timeout) {
- // initialize the gate
- super(invokable);
-
- this.timeout = timeout;
- this.bufferPool = invokable.getEnvironment().getOutputBufferProvider();
- this.channelSelector = channelSelector;
- }
-
- // -----------------------------------------------------------------------------------------------------------------
-
- @SuppressWarnings("unchecked")
- @Override
- public void initializeSerializers() {
- this.numChannels = this.outputGate.getNumChannels();
- this.serializers = new RecordSerializer[numChannels];
- for (int i = 0; i < this.numChannels; i++) {
- this.serializers[i] = new SpanningRecordSerializer<T>();
+ */
+
+package org.apache.flink.streaming.io;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.io.network.Buffer;
+import org.apache.flink.runtime.io.network.api.ChannelSelector;
+import org.apache.flink.runtime.io.network.api.RecordWriter;
+import org.apache.flink.runtime.io.network.api.RoundRobinChannelSelector;
+import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider;
+import org.apache.flink.runtime.io.network.serialization.RecordSerializer;
+import org.apache.flink.runtime.io.network.serialization.SpanningRecordSerializer;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWriter<T> {
+
+ private final BufferProvider bufferPool;
+
+ private final ChannelSelector<T> channelSelector;
+
+ private int numChannels;
+
+ private long timeout;
+
+ private OutputFlusher outputFlusher;
+
+ /** RecordSerializer per outgoing channel */
+ private RecordSerializer<T>[] serializers;
+
+ private ArrayList<TargetChannel> targetChannels;
+
+ // -----------------------------------------------------------------------------------------------------------------
+
+ public StreamRecordWriter(AbstractInvokable invokable) {
+ this(invokable, new RoundRobinChannelSelector<T>(), 1000);
+ }
+
+ public StreamRecordWriter(AbstractInvokable invokable, ChannelSelector<T> channelSelector) {
+ this(invokable, channelSelector, 1000);
+ }
+
+ public StreamRecordWriter(AbstractInvokable invokable, ChannelSelector<T> channelSelector,
+ long timeout) {
+ // initialize the gate
+ super(invokable);
+
+ this.timeout = timeout;
+ this.bufferPool = invokable.getEnvironment().getOutputBufferProvider();
+ this.channelSelector = channelSelector;
+ }
+
+ // -----------------------------------------------------------------------------------------------------------------
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void initializeSerializers() {
+ this.numChannels = this.outputGate.getNumChannels();
+ this.serializers = new RecordSerializer[numChannels];
+ this.targetChannels = new ArrayList<TargetChannel>(numChannels);
+
+ for (int i = 0; i < this.numChannels; i++) {
+ this.serializers[i] = new SpanningRecordSerializer<T>();
+ this.targetChannels.add(new TargetChannel(i));
}
-
- //start a separate thread to handle positive flush intervals
- if (timeout > 0) {
- (new OutputFlusher()).start();
- }
- }
-
- @Override
- public void emit(final T record) throws IOException, InterruptedException {
- for (int targetChannel : this.channelSelector.selectChannels(record,
- this.numChannels)) {
- // serialize with corresponding serializer and send full buffer
-
- RecordSerializer<T> serializer = this.serializers[targetChannel];
-
- synchronized (serializer) {
- RecordSerializer.SerializationResult result = serializer
- .addRecord(record);
- while (result.isFullBuffer()) {
- Buffer buffer = serializer.getCurrentBuffer();
- if (buffer != null) {
- sendBuffer(buffer, targetChannel);
- }
-
- buffer = this.bufferPool
- .requestBufferBlocking(this.bufferPool
- .getBufferSize());
- result = serializer.setNextBuffer(buffer);
- }
+
+ outputFlusher = new OutputFlusher();
+ outputFlusher.start();
+ }
+
+ @Override
+ public void emit(final T record) throws IOException, InterruptedException {
+ for (int targetChannel : this.channelSelector.selectChannels(record, this.numChannels)) {
+ targetChannels.get(targetChannel).emit(record);
+ }
+ }
+
+ @Override
+ public void flush() throws IOException, InterruptedException {
+ for (int targetChannel = 0; targetChannel < this.numChannels; targetChannel++) {
+ targetChannels.get(targetChannel).flush();
+ }
+ }
+
+ public void close() {
+ try {
+ if (outputFlusher != null) {
+ outputFlusher.terminate();
+ outputFlusher.join();
+ }
+ flush();
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private class TargetChannel {
+
+ private int targetChannel;
+ private RecordSerializer<T> serializer;
+
+ public TargetChannel(int targetChannel) {
+ this.targetChannel = targetChannel;
+ this.serializer = serializers[targetChannel];
+ }
+
+ public synchronized void emit(final T record) throws IOException, InterruptedException {
+ RecordSerializer.SerializationResult result = serializer.addRecord(record);
+ while (result.isFullBuffer()) {
+ Buffer buffer = serializer.getCurrentBuffer();
+ if (buffer != null) {
+ sendBuffer(buffer, targetChannel);
+ }
+
+ buffer = bufferPool.requestBufferBlocking(bufferPool.getBufferSize());
+ result = serializer.setNextBuffer(buffer);
}
-
- if (timeout == 0){
- flush();
- }
- }
- }
-
- @Override
- public void flush() throws IOException, InterruptedException {
- for (int targetChannel = 0; targetChannel < this.numChannels; targetChannel++) {
- RecordSerializer<T> serializer = this.serializers[targetChannel];
- synchronized (serializer) {
- Buffer buffer = serializer.getCurrentBuffer();
- if (buffer != null) {
- sendBuffer(buffer, targetChannel);
- }
-
- serializer.clear();
- }
-
- }
- }
-
- private class OutputFlusher extends Thread {
-
- @Override
- public void run() {
- while (!outputGate.isClosed()) {
- try {
- Thread.sleep(timeout);
- flush();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
- }
-
-}
+ }
+
+ public synchronized void flush() throws IOException, InterruptedException {
+ Buffer buffer = serializer.getCurrentBuffer();
+ if (buffer != null) {
+ sendBuffer(buffer, targetChannel);
+ }
+
+ serializer.clear();
+ }
+ }
+
+ private class OutputFlusher extends Thread {
+
+ private boolean running = true;
+
+ public void terminate() {
+ running = false;
+ }
+
+ @Override
+ public void run() {
+ while (running && !outputGate.isClosed()) {
+ try {
+ flush();
+ Thread.sleep(timeout);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+}
[2/2] incubator-flink git commit: [streaming] Source parallelism API
update
Posted by mb...@apache.org.
[streaming] Source parallelism API update
The new preferred way is calling setParallelism() after adding the SourceFounction. This solution works any other operator.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/d92a24a1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/d92a24a1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/d92a24a1
Branch: refs/heads/master
Commit: d92a24a1e66d877529098ce10325933794ef4549
Parents: 3ac4df8
Author: mbalassi <mb...@apache.org>
Authored: Sun Nov 16 23:43:41 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Wed Nov 19 21:50:45 2014 +0100
----------------------------------------------------------------------
.../connectors/kafka/KafkaTopology.java | 2 +-
.../connectors/twitter/TwitterLocal.java | 4 +--
.../connectors/twitter/TwitterStreaming.java | 4 +--
.../environment/StreamExecutionEnvironment.java | 38 +++++++++-----------
.../flink/streaming/api/WriteAsCsvTest.java | 10 +++---
.../flink/streaming/api/WriteAsTextTest.java | 10 +++---
.../api/streamvertex/StreamVertexTest.java | 2 +-
7 files changed, 32 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d92a24a1/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
index 089efad..4d043c4 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
@@ -95,7 +95,7 @@ public class KafkaTopology {
@SuppressWarnings("unused")
DataStream<String> stream1 = env
- .addSource(new MyKafkaSource("localhost:2181", "group", "test", 1), SOURCE_PARALELISM)
+ .addSource(new MyKafkaSource("localhost:2181", "group", "test", 1)).setParallelism(1)
.addSink(new MyKafkaPrintSink());
@SuppressWarnings("unused")
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d92a24a1/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
index e53df32..3058047 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
@@ -72,8 +72,8 @@ public class TwitterLocal {
StreamExecutionEnvironment env = StreamExecutionEnvironment
.createLocalEnvironment(PARALLELISM);
- DataStream<String> streamSource = env.addSource(new TwitterSource(path, NUMBEROFTWEETS),
- SOURCE_PARALLELISM);
+ DataStream<String> streamSource = env.addSource(new TwitterSource(path, NUMBEROFTWEETS))
+ .setParallelism(SOURCE_PARALLELISM);
DataStream<Tuple2<String, Integer>> dataStream = streamSource
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d92a24a1/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
index 6de90a5..a32fe1b 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
@@ -86,8 +86,8 @@ public class TwitterStreaming {
StreamExecutionEnvironment env = StreamExecutionEnvironment
.createLocalEnvironment(PARALLELISM);
- DataStream<String> streamSource = env.addSource(new TwitterSource(path, NUMBEROFTWEETS),
- SOURCE_PARALLELISM);
+ DataStream<String> streamSource = env.addSource(new TwitterSource(path, NUMBEROFTWEETS))
+ .setParallelism(SOURCE_PARALLELISM);
DataStream<Tuple5<Long, Integer, String, String, String>> selectedDataStream = streamSource
.flatMap(new SelectDataFlatMap());
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d92a24a1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 7baabf9..ce86516 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -130,6 +130,16 @@ public abstract class StreamExecutionEnvironment {
public long getBufferTimeout() {
return this.bufferTimeout;
}
+
+ /**
+ * Sets the default parallelism that will be used for the local execution environment created by
+ * {@link #createLocalEnvironment()}.
+ *
+ * @param degreeOfParallelism The degree of parallelism to use as the default local parallelism.
+ */
+ public static void setDefaultLocalParallelism(int degreeOfParallelism) {
+ defaultLocalDop = degreeOfParallelism;
+ }
// --------------------------------------------------------------------------------------------
// Data stream creations
@@ -147,12 +157,7 @@ public abstract class StreamExecutionEnvironment {
*/
public DataStreamSource<String> readTextFile(String filePath) {
checkIfFileExists(filePath);
- return addSource(new FileSourceFunction(filePath), 1);
- }
-
- public DataStreamSource<String> readTextFile(String filePath, int parallelism) {
- checkIfFileExists(filePath);
- return addSource(new FileSourceFunction(filePath), parallelism);
+ return addSource(new FileSourceFunction(filePath));
}
/**
@@ -167,12 +172,7 @@ public abstract class StreamExecutionEnvironment {
*/
public DataStreamSource<String> readTextStream(String filePath) {
checkIfFileExists(filePath);
- return addSource(new FileStreamFunction(filePath), 1);
- }
-
- public DataStreamSource<String> readTextStream(String filePath, int parallelism) {
- checkIfFileExists(filePath);
- return addSource(new FileStreamFunction(filePath), parallelism);
+ return addSource(new FileStreamFunction(filePath));
}
private static void checkIfFileExists(String filePath) {
@@ -275,7 +275,7 @@ public abstract class StreamExecutionEnvironment {
if (from > to) {
throw new IllegalArgumentException("Start of sequence must not be greater than the end");
}
- return addSource(new GenSequenceFunction(from, to), 1);
+ return addSource(new GenSequenceFunction(from, to));
}
/**
@@ -283,13 +283,11 @@ public abstract class StreamExecutionEnvironment {
*
* @param function
* the user defined function
- * @param parallelism
- * number of parallel instances of the function
* @param <OUT>
* type of the returned stream
* @return the data stream constructed
*/
- public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, int parallelism) {
+ public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function) {
TypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(function,
SourceFunction.class, 0);
DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "source",
@@ -298,7 +296,7 @@ public abstract class StreamExecutionEnvironment {
try {
jobGraphBuilder.addStreamVertex(returnStream.getId(),
new SourceInvokable<OUT>(function), null, outTypeWrapper, "source",
- SerializationUtils.serialize(function), parallelism);
+ SerializationUtils.serialize(function), 1);
} catch (SerializationException e) {
throw new RuntimeException("Cannot serialize SourceFunction");
}
@@ -306,10 +304,6 @@ public abstract class StreamExecutionEnvironment {
return returnStream;
}
- public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> sourceFunction) {
- return addSource(sourceFunction, 1);
- }
-
// --------------------------------------------------------------------------------------------
// Instantiation of Execution Contexts
// --------------------------------------------------------------------------------------------
@@ -453,7 +447,7 @@ public abstract class StreamExecutionEnvironment {
/**
* Getter of the {@link JobGraphBuilder} of the streaming job.
*
- * @return jobgraph
+ * @return jobGraphBuilder
*/
public JobGraphBuilder getJobGraphBuilder() {
return jobGraphBuilder;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d92a24a1/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
index b05e4e8..6b1dd5a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
@@ -135,27 +135,27 @@ public class WriteAsCsvTest {
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
@SuppressWarnings("unused")
- DataStream<Tuple1<Integer>> dataStream1 = env.addSource(new MySource1(), 1).writeAsCsv(PREFIX + "test1.txt");
+ DataStream<Tuple1<Integer>> dataStream1 = env.addSource(new MySource1()).writeAsCsv(PREFIX + "test1.txt");
fillExpected1();
@SuppressWarnings("unused")
- DataStream<Tuple1<Integer>> dataStream2 = env.addSource(new MySource1(), 1).writeAsCsv(PREFIX + "test2.txt", 5);
+ DataStream<Tuple1<Integer>> dataStream2 = env.addSource(new MySource1()).writeAsCsv(PREFIX + "test2.txt", 5);
fillExpected2();
@SuppressWarnings("unused")
- DataStream<Tuple1<Integer>> dataStream3 = env.addSource(new MySource1(), 1).writeAsCsv(PREFIX + "test3.txt", 10);
+ DataStream<Tuple1<Integer>> dataStream3 = env.addSource(new MySource1()).writeAsCsv(PREFIX + "test3.txt", 10);
fillExpected3();
@SuppressWarnings("unused")
- DataStream<Tuple1<Integer>> dataStream4 = env.addSource(new MySource1(), 1).writeAsCsv(PREFIX + "test4.txt", 10, new Tuple1<Integer>(26));
+ DataStream<Tuple1<Integer>> dataStream4 = env.addSource(new MySource1()).writeAsCsv(PREFIX + "test4.txt", 10, new Tuple1<Integer>(26));
fillExpected4();
@SuppressWarnings("unused")
- DataStream<Tuple1<Integer>> dataStream5 = env.addSource(new MySource1(), 1).writeAsCsv(PREFIX + "test5.txt", 10, new Tuple1<Integer>(14));
+ DataStream<Tuple1<Integer>> dataStream5 = env.addSource(new MySource1()).writeAsCsv(PREFIX + "test5.txt", 10, new Tuple1<Integer>(14));
fillExpected5();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d92a24a1/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
index 01dedcc..e21f21d 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
@@ -135,27 +135,27 @@ public class WriteAsTextTest {
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
@SuppressWarnings("unused")
- DataStream<Tuple1<Integer>> dataStream1 = env.addSource(new MySource1(), 1).writeAsText(PREFIX + "test1.txt");
+ DataStream<Tuple1<Integer>> dataStream1 = env.addSource(new MySource1()).writeAsText(PREFIX + "test1.txt");
fillExpected1();
@SuppressWarnings("unused")
- DataStream<Tuple1<Integer>> dataStream2 = env.addSource(new MySource1(), 1).writeAsText(PREFIX + "test2.txt", 5);
+ DataStream<Tuple1<Integer>> dataStream2 = env.addSource(new MySource1()).writeAsText(PREFIX + "test2.txt", 5);
fillExpected2();
@SuppressWarnings("unused")
- DataStream<Tuple1<Integer>> dataStream3 = env.addSource(new MySource1(), 1).writeAsText(PREFIX + "test3.txt", 10);
+ DataStream<Tuple1<Integer>> dataStream3 = env.addSource(new MySource1()).writeAsText(PREFIX + "test3.txt", 10);
fillExpected3();
@SuppressWarnings("unused")
- DataStream<Tuple1<Integer>> dataStream4 = env.addSource(new MySource1(), 1).writeAsText(PREFIX + "test4.txt", 10, new Tuple1<Integer>(26));
+ DataStream<Tuple1<Integer>> dataStream4 = env.addSource(new MySource1()).writeAsText(PREFIX + "test4.txt", 10, new Tuple1<Integer>(26));
fillExpected4();
@SuppressWarnings("unused")
- DataStream<Tuple1<Integer>> dataStream5 = env.addSource(new MySource1(), 1).writeAsText(PREFIX + "test5.txt", 10, new Tuple1<Integer>(14));
+ DataStream<Tuple1<Integer>> dataStream5 = env.addSource(new MySource1()).writeAsText(PREFIX + "test5.txt", 10, new Tuple1<Integer>(14));
fillExpected5();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d92a24a1/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
index 765de9c..0b73c3a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
@@ -172,7 +172,7 @@ public class StreamVertexTest {
LocalStreamEnvironment env = StreamExecutionEnvironment
.createLocalEnvironment(SOURCE_PARALELISM);
- env.addSource(new MySource(), SOURCE_PARALELISM).map(new MyTask()).addSink(new MySink());
+ env.addSource(new MySource()).setParallelism(SOURCE_PARALELISM).map(new MyTask()).addSink(new MySink());
env.executeTest(MEMORYSIZE);
assertEquals(10, data.keySet().size());