You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2019/05/22 08:32:36 UTC
[flink] 02/02: [FLINK-12483][runtime] Support (legacy)
SourceFunction as special case in the mailbox model for stream tasks.
This is an automated email from the ASF dual-hosted git repository.
srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit ead9139680ea82c4fdfd1e9d19baf4d4a08ec845
Author: Stefan Richter <s....@data-artisans.com>
AuthorDate: Tue May 14 15:33:48 2019 +0200
[FLINK-12483][runtime] Support (legacy) SourceFunction as special case in the mailbox model for stream tasks.
This closes #8442.
---
.../streaming/runtime/tasks/SourceStreamTask.java | 66 +++++++++++++++++++++-
1 file changed, 64 insertions(+), 2 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index fd50a1a..e604f2c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -45,6 +45,8 @@ import org.apache.flink.util.FlinkException;
public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends StreamSource<OUT, SRC>>
extends StreamTask<OUT, OP> {
+ private static final Runnable SOURCE_POISON_LETTER = () -> {};
+
private volatile boolean externallyInducedCheckpoints;
public SourceStreamTask(Environment env) {
@@ -101,12 +103,43 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
protected void performDefaultAction(ActionContext context) throws Exception {
// Against the usual contract of this method, this implementation is not step-wise but blocking instead for
// compatibility reasons with the current source interface (source functions run as a loop, not in steps).
- headOperator.run(getCheckpointLock(), getStreamStatusMaintainer());
+ final LegacySourceFunctionThread sourceThread = new LegacySourceFunctionThread();
+ sourceThread.start();
+
+ // We run an alternative mailbox loop that does not involve default actions and synchronizes around actions.
+ try {
+ runAlternativeMailboxLoop();
+ } catch (Exception mailboxEx) {
+ // We cancel the source function if some runtime exception escaped the mailbox.
+ if (!isCanceled()) {
+ cancelTask();
+ }
+ throw mailboxEx;
+ }
+
+ sourceThread.join();
+ sourceThread.checkThrowSourceExecutionException();
+
context.allActionsCompleted();
}
+ private void runAlternativeMailboxLoop() throws InterruptedException {
+
+ while (true) {
+
+ Runnable letter = mailbox.takeMail();
+ if (letter == SOURCE_POISON_LETTER) {
+ break;
+ }
+
+ synchronized (getCheckpointLock()) {
+ letter.run();
+ }
+ }
+ }
+
@Override
- protected void cancelTask() throws Exception {
+ protected void cancelTask() {
if (headOperator != null) {
headOperator.cancel();
}
@@ -133,4 +166,33 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
}
}
}
+
+ /**
+ * Runnable that executes the the source function in the head operator.
+ */
+ private class LegacySourceFunctionThread extends Thread {
+
+ private Throwable sourceExecutionThrowable;
+
+ LegacySourceFunctionThread() {
+ this.sourceExecutionThrowable = null;
+ }
+
+ @Override
+ public void run() {
+ try {
+ headOperator.run(getCheckpointLock(), getStreamStatusMaintainer());
+ } catch (Throwable t) {
+ sourceExecutionThrowable = t;
+ } finally {
+ mailbox.clearAndPut(SOURCE_POISON_LETTER);
+ }
+ }
+
+ void checkThrowSourceExecutionException() throws Exception {
+ if (sourceExecutionThrowable != null) {
+ throw new Exception(sourceExecutionThrowable);
+ }
+ }
+ }
}