You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/07/23 10:14:55 UTC

[2/4] flink git commit: [FLINK-9873][runtime] Log task state when aborting checkpoint

[FLINK-9873][runtime] Log task state when aborting checkpoint

This closes #6350.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aa770ba6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aa770ba6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aa770ba6

Branch: refs/heads/release-1.5
Commit: aa770ba6f325b85c7242e535d45a6080d2703232
Parents: 9baca1b
Author: zentol <ch...@apache.org>
Authored: Tue Jul 17 09:34:45 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Jul 23 09:15:21 2018 +0200

----------------------------------------------------------------------
 .../runtime/checkpoint/CheckpointCoordinator.java      | 13 ++++++++++---
 1 file changed, 10 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/aa770ba6/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 55e1ffe..82227cd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -456,13 +456,20 @@ public class CheckpointCoordinator {
 		Execution[] executions = new Execution[tasksToTrigger.length];
 		for (int i = 0; i < tasksToTrigger.length; i++) {
 			Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
-			if (ee != null && ee.getState() == ExecutionState.RUNNING) {
-				executions[i] = ee;
-			} else {
+			if (ee == null) {
 				LOG.info("Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.",
 						tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
 						job);
 				return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
+			} else if (ee.getState() == ExecutionState.RUNNING) {
+				executions[i] = ee;
+			} else {
+				LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.",
+						tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
+						job,
+						ExecutionState.RUNNING,
+						ee.getState());
+				return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
 			}
 		}