You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/05/05 15:15:34 UTC
[flink] branch release-1.13 updated: [FLINK-22488][hotfix] Update
SubtaskGatewayImpl to specify the cause of sendEvent() failure when
triggering task failover
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 2b71b12 [FLINK-22488][hotfix] Update SubtaskGatewayImpl to specify the cause of sendEvent() failure when triggering task failover
2b71b12 is described below
commit 2b71b12b1f9f32899509298e8fa3795851e1ea98
Author: Dong Lin <li...@gmail.com>
AuthorDate: Tue Apr 27 13:57:35 2021 +0800
[FLINK-22488][hotfix] Update SubtaskGatewayImpl to specify the cause of sendEvent() failure when triggering task failover
---
.../java/org/apache/flink/runtime/executiongraph/Execution.java | 3 ++-
.../flink/runtime/operators/coordination/SubtaskGatewayImpl.java | 2 +-
.../src/main/java/org/apache/flink/runtime/taskmanager/Task.java | 7 ++++---
3 files changed, 7 insertions(+), 5 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 6ce4a41..10e073f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -891,7 +891,8 @@ public class Execution
new TaskNotRunningException(
'"'
+ vertex.getTaskNameWithSubtaskIndex()
- + "\" is currently not running or ready."));
+ + "\" is not running, but in state "
+ + getState()));
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java
index 11f33f5..af9727c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java
@@ -78,7 +78,7 @@ class SubtaskGatewayImpl implements OperatorCoordinator.SubtaskGateway {
EVENT_LOSS_ERROR_MESSAGE,
evt,
subtaskAccess.subtaskName());
- subtaskAccess.triggerTaskFailover(new FlinkException(msg));
+ subtaskAccess.triggerTaskFailover(new FlinkException(msg, failure));
}
return null;
},
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 3b7f419..8056ca4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -1403,11 +1403,12 @@ public class Task
public void deliverOperatorEvent(OperatorID operator, SerializedValue<OperatorEvent> evt)
throws FlinkException {
final AbstractInvokable invokable = this.invokable;
+ final ExecutionState currentState = this.executionState;
if (invokable == null
- || (executionState != ExecutionState.RUNNING
- && executionState != ExecutionState.INITIALIZING)) {
- throw new TaskNotRunningException("Task is not yet running.");
+ || (currentState != ExecutionState.RUNNING
+ && currentState != ExecutionState.INITIALIZING)) {
+ throw new TaskNotRunningException("Task is not running, but in state " + currentState);
}
try {