You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/05/05 15:15:34 UTC

[flink] branch release-1.13 updated: [FLINK-22488][hotfix] Update SubtaskGatewayImpl to specify the cause of sendEvent() failure when triggering task failover

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 2b71b12  [FLINK-22488][hotfix] Update SubtaskGatewayImpl to specify the cause of sendEvent() failure when triggering task failover
2b71b12 is described below

commit 2b71b12b1f9f32899509298e8fa3795851e1ea98
Author: Dong Lin <li...@gmail.com>
AuthorDate: Tue Apr 27 13:57:35 2021 +0800

    [FLINK-22488][hotfix] Update SubtaskGatewayImpl to specify the cause of sendEvent() failure when triggering task failover
---
 .../java/org/apache/flink/runtime/executiongraph/Execution.java    | 3 ++-
 .../flink/runtime/operators/coordination/SubtaskGatewayImpl.java   | 2 +-
 .../src/main/java/org/apache/flink/runtime/taskmanager/Task.java   | 7 ++++---
 3 files changed, 7 insertions(+), 5 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 6ce4a41..10e073f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -891,7 +891,8 @@ public class Execution
                     new TaskNotRunningException(
                             '"'
                                     + vertex.getTaskNameWithSubtaskIndex()
-                                    + "\" is currently not running or ready."));
+                                    + "\" is not running, but in state "
+                                    + getState()));
         }
     }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java
index 11f33f5..af9727c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java
@@ -78,7 +78,7 @@ class SubtaskGatewayImpl implements OperatorCoordinator.SubtaskGateway {
                                                 EVENT_LOSS_ERROR_MESSAGE,
                                                 evt,
                                                 subtaskAccess.subtaskName());
-                                subtaskAccess.triggerTaskFailover(new FlinkException(msg));
+                                subtaskAccess.triggerTaskFailover(new FlinkException(msg, failure));
                             }
                             return null;
                         },
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 3b7f419..8056ca4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -1403,11 +1403,12 @@ public class Task
     public void deliverOperatorEvent(OperatorID operator, SerializedValue<OperatorEvent> evt)
             throws FlinkException {
         final AbstractInvokable invokable = this.invokable;
+        final ExecutionState currentState = this.executionState;
 
         if (invokable == null
-                || (executionState != ExecutionState.RUNNING
-                        && executionState != ExecutionState.INITIALIZING)) {
-            throw new TaskNotRunningException("Task is not yet running.");
+                || (currentState != ExecutionState.RUNNING
+                        && currentState != ExecutionState.INITIALIZING)) {
+            throw new TaskNotRunningException("Task is not running, but in state " + currentState);
         }
 
         try {