You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ta...@apache.org on 2022/03/15 16:25:38 UTC

[flink] branch release-1.15 updated: [FLINK-26650][checkpoint] Avoid to print stack trace for checkpoint trigger failure if not all tasks are started

This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new 0562fc3  [FLINK-26650][checkpoint] Avoid to print stack trace for checkpoint trigger failure if not all tasks are started
0562fc3 is described below

commit 0562fc3df7492e21e577c169f4a65c2a1dce693a
Author: Yun Tang <my...@live.com>
AuthorDate: Tue Mar 15 17:43:10 2022 +0800

    [FLINK-26650][checkpoint] Avoid to print stack trace for checkpoint trigger failure if not all tasks are started
---
 .../runtime/checkpoint/CheckpointFailureManager.java | 20 ++++++++++++++------
 1 file changed, 14 insertions(+), 6 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
index 5b73fe2..08cf49e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
@@ -104,12 +104,20 @@ public class CheckpointFailureManager {
                         : pendingCheckpoint.getCheckpointID();
         updateStatsAfterCheckpointFailed(pendingCheckpointStats, statsTracker, exception);
 
-        LOG.warn(
-                "Failed to trigger or complete checkpoint {} for job {}. ({} consecutive failed attempts so far)",
-                checkpointId == UNKNOWN_CHECKPOINT_ID ? "UNKNOWN_CHECKPOINT_ID" : checkpointId,
-                job,
-                continuousFailureCounter.get(),
-                exception);
+        if (CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING.equals(
+                exception.getCheckpointFailureReason())) {
+            LOG.info(
+                    "Failed to trigger checkpoint for job {} since {}.",
+                    job,
+                    exception.getMessage());
+        } else {
+            LOG.warn(
+                    "Failed to trigger or complete checkpoint {} for job {}. ({} consecutive failed attempts so far)",
+                    checkpointId == UNKNOWN_CHECKPOINT_ID ? "UNKNOWN_CHECKPOINT_ID" : checkpointId,
+                    job,
+                    continuousFailureCounter.get(),
+                    exception);
+        }
         if (isJobManagerFailure(exception, executionAttemptID)) {
             handleJobLevelCheckpointException(checkpointProperties, exception, checkpointId);
         } else {