You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/04/01 12:34:35 UTC

flink git commit: [streaming] Streaming program cancellation bugfix

Repository: flink
Updated Branches:
  refs/heads/master 50ffff0b0 -> 5691f2626


[streaming] Streaming program cancellation bugfix


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5691f262
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5691f262
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5691f262

Branch: refs/heads/master
Commit: 5691f262624bedb19e1d72f1caace48d5bab67e2
Parents: 50ffff0
Author: Gyula Fora <gy...@apache.org>
Authored: Wed Apr 1 11:01:37 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Wed Apr 1 12:33:36 2015 +0200

----------------------------------------------------------------------
 .../api/function/source/FromElementsFunction.java    | 10 +++++++++-
 .../api/function/source/GenSequenceFunction.java     |  6 +++++-
 .../flink/streaming/io/StreamRecordWriter.java       | 15 +++++++++------
 3 files changed, 23 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5691f262/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
index 97a3a92..db452dc 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
@@ -27,6 +27,8 @@ public class FromElementsFunction<T> implements SourceFunction<T> {
 
 	Iterable<T> iterable;
 
+	private volatile boolean isRunning;
+
 	public FromElementsFunction(T... elements) {
 		this.iterable = Arrays.asList(elements);
 	}
@@ -41,13 +43,19 @@ public class FromElementsFunction<T> implements SourceFunction<T> {
 
 	@Override
 	public void run(Collector<T> collector) throws Exception {
+		isRunning = true;
 		for (T element : iterable) {
-			collector.collect(element);
+			if (isRunning) {
+				collector.collect(element);
+			} else {
+				break;
+			}
 		}
 	}
 
 	@Override
 	public void cancel() {
+		isRunning = false;
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5691f262/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
index eccc146..df3b462 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
@@ -32,13 +32,16 @@ public class GenSequenceFunction extends RichParallelSourceFunction<Long> {
 	private NumberSequenceIterator fullIterator;
 	private NumberSequenceIterator splitIterator;
 
+	private volatile boolean isRunning;
+
 	public GenSequenceFunction(long from, long to) {
 		fullIterator = new NumberSequenceIterator(from, to);
 	}
 
 	@Override
 	public void run(Collector<Long> collector) throws Exception {
-		while (splitIterator.hasNext()) {
+		isRunning = true;
+		while (splitIterator.hasNext() && isRunning) {
 			collector.collect(splitIterator.next());
 		}
 	}
@@ -52,6 +55,7 @@ public class GenSequenceFunction extends RichParallelSourceFunction<Long> {
 
 	@Override
 	public void cancel() {
+		isRunning = false;
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5691f262/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
index 45595df..a1fdc09 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
@@ -39,7 +39,8 @@ public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWrit
 		this(writer, channelSelector, 1000);
 	}
 
-	public StreamRecordWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector, long timeout) {
+	public StreamRecordWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector,
+			long timeout) {
 		super(writer, channelSelector);
 
 		this.timeout = timeout;
@@ -57,26 +58,28 @@ public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWrit
 
 			flush();
 		} catch (IOException e) {
-			e.printStackTrace();
+			throw new RuntimeException(e);
 		} catch (InterruptedException e) {
-			e.printStackTrace();
+			// Do nothing here
 		}
 	}
 
 	private class OutputFlusher extends Thread {
 		private volatile boolean running = true;
-		
+
 		public void terminate() {
 			running = false;
 		}
-		
+
 		@Override
 		public void run() {
 			while (running) {
 				try {
 					flush();
 					Thread.sleep(timeout);
-				} catch (Exception e) {
+				} catch (InterruptedException e) {
+					// Do nothing here
+				} catch (IOException e) {
 					throw new RuntimeException(e);
 				}
 			}