You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/07/23 10:14:55 UTC
[2/4] flink git commit: [FLINK-9873][runtime] Log task state when
aborting checkpoint
[FLINK-9873][runtime] Log task state when aborting checkpoint
This closes #6350.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aa770ba6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aa770ba6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aa770ba6
Branch: refs/heads/release-1.5
Commit: aa770ba6f325b85c7242e535d45a6080d2703232
Parents: 9baca1b
Author: zentol <ch...@apache.org>
Authored: Tue Jul 17 09:34:45 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Jul 23 09:15:21 2018 +0200
----------------------------------------------------------------------
.../runtime/checkpoint/CheckpointCoordinator.java | 13 ++++++++++---
1 file changed, 10 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/aa770ba6/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 55e1ffe..82227cd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -456,13 +456,20 @@ public class CheckpointCoordinator {
Execution[] executions = new Execution[tasksToTrigger.length];
for (int i = 0; i < tasksToTrigger.length; i++) {
Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
- if (ee != null && ee.getState() == ExecutionState.RUNNING) {
- executions[i] = ee;
- } else {
+ if (ee == null) {
LOG.info("Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.",
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
job);
return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
+ } else if (ee.getState() == ExecutionState.RUNNING) {
+ executions[i] = ee;
+ } else {
+ LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.",
+ tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
+ job,
+ ExecutionState.RUNNING,
+ ee.getState());
+ return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
}
}