You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2019/05/22 08:32:36 UTC

[flink] 02/02: [FLINK-12483][runtime] Support (legacy) SourceFunction as special case in the mailbox model for stream tasks.

This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ead9139680ea82c4fdfd1e9d19baf4d4a08ec845
Author: Stefan Richter <s....@data-artisans.com>
AuthorDate: Tue May 14 15:33:48 2019 +0200

    [FLINK-12483][runtime] Support (legacy) SourceFunction as special case in the mailbox model for stream tasks.
    
    This closes #8442.
---
 .../streaming/runtime/tasks/SourceStreamTask.java  | 66 +++++++++++++++++++++-
 1 file changed, 64 insertions(+), 2 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index fd50a1a..e604f2c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -45,6 +45,8 @@ import org.apache.flink.util.FlinkException;
 public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends StreamSource<OUT, SRC>>
 	extends StreamTask<OUT, OP> {
 
+	private static final Runnable SOURCE_POISON_LETTER = () -> {};
+
 	private volatile boolean externallyInducedCheckpoints;
 
 	public SourceStreamTask(Environment env) {
@@ -101,12 +103,43 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
 	protected void performDefaultAction(ActionContext context) throws Exception {
 		// Against the usual contract of this method, this implementation is not step-wise but blocking instead for
 		// compatibility reasons with the current source interface (source functions run as a loop, not in steps).
-		headOperator.run(getCheckpointLock(), getStreamStatusMaintainer());
+		final LegacySourceFunctionThread sourceThread = new LegacySourceFunctionThread();
+		sourceThread.start();
+
+		// We run an alternative mailbox loop that does not involve default actions and synchronizes around actions.
+		try {
+			runAlternativeMailboxLoop();
+		} catch (Exception mailboxEx) {
+			// We cancel the source function if some runtime exception escaped the mailbox.
+			if (!isCanceled()) {
+				cancelTask();
+			}
+			throw mailboxEx;
+		}
+
+		sourceThread.join();
+		sourceThread.checkThrowSourceExecutionException();
+
 		context.allActionsCompleted();
 	}
 
+	private void runAlternativeMailboxLoop() throws InterruptedException {
+
+		while (true) {
+
+			Runnable letter = mailbox.takeMail();
+			if (letter == SOURCE_POISON_LETTER) {
+				break;
+			}
+
+			synchronized (getCheckpointLock()) {
+				letter.run();
+			}
+		}
+	}
+
 	@Override
-	protected void cancelTask() throws Exception {
+	protected void cancelTask() {
 		if (headOperator != null) {
 			headOperator.cancel();
 		}
@@ -133,4 +166,33 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
 			}
 		}
 	}
+
+	/**
+	 * Runnable that executes the the source function in the head operator.
+	 */
+	private class LegacySourceFunctionThread extends Thread {
+
+		private Throwable sourceExecutionThrowable;
+
+		LegacySourceFunctionThread() {
+			this.sourceExecutionThrowable = null;
+		}
+
+		@Override
+		public void run() {
+			try {
+				headOperator.run(getCheckpointLock(), getStreamStatusMaintainer());
+			} catch (Throwable t) {
+				sourceExecutionThrowable = t;
+			} finally {
+				mailbox.clearAndPut(SOURCE_POISON_LETTER);
+			}
+		}
+
+		void checkThrowSourceExecutionException() throws Exception {
+			if (sourceExecutionThrowable != null) {
+				throw new Exception(sourceExecutionThrowable);
+			}
+		}
+	}
 }