You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/04/01 12:34:35 UTC
flink git commit: [streaming] Streaming program cancellation bugfix
Repository: flink
Updated Branches:
refs/heads/master 50ffff0b0 -> 5691f2626
[streaming] Streaming program cancellation bugfix
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5691f262
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5691f262
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5691f262
Branch: refs/heads/master
Commit: 5691f262624bedb19e1d72f1caace48d5bab67e2
Parents: 50ffff0
Author: Gyula Fora <gy...@apache.org>
Authored: Wed Apr 1 11:01:37 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Wed Apr 1 12:33:36 2015 +0200
----------------------------------------------------------------------
.../api/function/source/FromElementsFunction.java | 10 +++++++++-
.../api/function/source/GenSequenceFunction.java | 6 +++++-
.../flink/streaming/io/StreamRecordWriter.java | 15 +++++++++------
3 files changed, 23 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5691f262/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
index 97a3a92..db452dc 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
@@ -27,6 +27,8 @@ public class FromElementsFunction<T> implements SourceFunction<T> {
Iterable<T> iterable;
+ private volatile boolean isRunning;
+
public FromElementsFunction(T... elements) {
this.iterable = Arrays.asList(elements);
}
@@ -41,13 +43,19 @@ public class FromElementsFunction<T> implements SourceFunction<T> {
@Override
public void run(Collector<T> collector) throws Exception {
+ isRunning = true;
for (T element : iterable) {
- collector.collect(element);
+ if (isRunning) {
+ collector.collect(element);
+ } else {
+ break;
+ }
}
}
@Override
public void cancel() {
+ isRunning = false;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5691f262/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
index eccc146..df3b462 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
@@ -32,13 +32,16 @@ public class GenSequenceFunction extends RichParallelSourceFunction<Long> {
private NumberSequenceIterator fullIterator;
private NumberSequenceIterator splitIterator;
+ private volatile boolean isRunning;
+
public GenSequenceFunction(long from, long to) {
fullIterator = new NumberSequenceIterator(from, to);
}
@Override
public void run(Collector<Long> collector) throws Exception {
- while (splitIterator.hasNext()) {
+ isRunning = true;
+ while (splitIterator.hasNext() && isRunning) {
collector.collect(splitIterator.next());
}
}
@@ -52,6 +55,7 @@ public class GenSequenceFunction extends RichParallelSourceFunction<Long> {
@Override
public void cancel() {
+ isRunning = false;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5691f262/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
index 45595df..a1fdc09 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
@@ -39,7 +39,8 @@ public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWrit
this(writer, channelSelector, 1000);
}
- public StreamRecordWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector, long timeout) {
+ public StreamRecordWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector,
+ long timeout) {
super(writer, channelSelector);
this.timeout = timeout;
@@ -57,26 +58,28 @@ public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWrit
flush();
} catch (IOException e) {
- e.printStackTrace();
+ throw new RuntimeException(e);
} catch (InterruptedException e) {
- e.printStackTrace();
+ // Do nothing here
}
}
private class OutputFlusher extends Thread {
private volatile boolean running = true;
-
+
public void terminate() {
running = false;
}
-
+
@Override
public void run() {
while (running) {
try {
flush();
Thread.sleep(timeout);
- } catch (Exception e) {
+ } catch (InterruptedException e) {
+ // Do nothing here
+ } catch (IOException e) {
throw new RuntimeException(e);
}
}