You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2021/02/26 13:31:14 UTC

[flink] 02/05: [hotfix][task] Interrupt source legacy thread on failure.

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 25ef41943a46034a65e8cdd4d9171af23c7f827b
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Tue Feb 9 11:49:33 2021 +0100

    [hotfix][task] Interrupt source legacy thread on failure.
    
    If a legacy source task fails outside of the legacy thread, the legacy thread
    blocks proper cancellation (completion future never completed).
---
 .../streaming/runtime/tasks/SourceStreamTask.java  | 26 ++++++++++++++++------
 1 file changed, 19 insertions(+), 7 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 8d2edfc..0a11c8d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -179,6 +179,14 @@ public class SourceStreamTask<
     }
 
     @Override
+    protected void cleanUpInvoke() throws Exception {
+        if (isFailing()) {
+            interruptSourceThread(true);
+        }
+        super.cleanUpInvoke();
+    }
+
+    @Override
     protected void cancelTask() {
         cancelTask(true);
     }
@@ -202,14 +210,18 @@ public class SourceStreamTask<
                 mainOperator.cancel();
             }
         } finally {
-            if (sourceThread.isAlive()) {
-                if (interrupt) {
-                    sourceThread.interrupt();
-                }
-            } else if (!sourceThread.getCompletionFuture().isDone()) {
-                // source thread didn't start
-                sourceThread.getCompletionFuture().complete(null);
+            interruptSourceThread(interrupt);
+        }
+    }
+
+    private void interruptSourceThread(boolean interrupt) {
+        if (sourceThread.isAlive()) {
+            if (interrupt) {
+                sourceThread.interrupt();
             }
+        } else if (!sourceThread.getCompletionFuture().isDone()) {
+            // source thread didn't start
+            sourceThread.getCompletionFuture().complete(null);
         }
     }