You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2021/02/26 13:31:14 UTC
[flink] 02/05: [hotfix][task] Interrupt source legacy thread on
failure.
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 25ef41943a46034a65e8cdd4d9171af23c7f827b
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Tue Feb 9 11:49:33 2021 +0100
[hotfix][task] Interrupt source legacy thread on failure.
If a legacy source task fails outside of the legacy thread, the legacy thread
blocks proper cancellation (completion future never completed).
---
.../streaming/runtime/tasks/SourceStreamTask.java | 26 ++++++++++++++++------
1 file changed, 19 insertions(+), 7 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 8d2edfc..0a11c8d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -179,6 +179,14 @@ public class SourceStreamTask<
}
@Override
+ protected void cleanUpInvoke() throws Exception {
+ if (isFailing()) {
+ interruptSourceThread(true);
+ }
+ super.cleanUpInvoke();
+ }
+
+ @Override
protected void cancelTask() {
cancelTask(true);
}
@@ -202,14 +210,18 @@ public class SourceStreamTask<
mainOperator.cancel();
}
} finally {
- if (sourceThread.isAlive()) {
- if (interrupt) {
- sourceThread.interrupt();
- }
- } else if (!sourceThread.getCompletionFuture().isDone()) {
- // source thread didn't start
- sourceThread.getCompletionFuture().complete(null);
+ interruptSourceThread(interrupt);
+ }
+ }
+
+ private void interruptSourceThread(boolean interrupt) {
+ if (sourceThread.isAlive()) {
+ if (interrupt) {
+ sourceThread.interrupt();
}
+ } else if (!sourceThread.getCompletionFuture().isDone()) {
+ // source thread didn't start
+ sourceThread.getCompletionFuture().complete(null);
}
}