You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ta...@apache.org on 2022/03/15 16:25:38 UTC
[flink] branch release-1.15 updated: [FLINK-26650][checkpoint] Avoid to print stack trace for checkpoint trigger failure if not all tasks are started
This is an automated email from the ASF dual-hosted git repository.
tangyun pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new 0562fc3 [FLINK-26650][checkpoint] Avoid to print stack trace for checkpoint trigger failure if not all tasks are started
0562fc3 is described below
commit 0562fc3df7492e21e577c169f4a65c2a1dce693a
Author: Yun Tang <my...@live.com>
AuthorDate: Tue Mar 15 17:43:10 2022 +0800
[FLINK-26650][checkpoint] Avoid to print stack trace for checkpoint trigger failure if not all tasks are started
---
.../runtime/checkpoint/CheckpointFailureManager.java | 20 ++++++++++++++------
1 file changed, 14 insertions(+), 6 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
index 5b73fe2..08cf49e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
@@ -104,12 +104,20 @@ public class CheckpointFailureManager {
: pendingCheckpoint.getCheckpointID();
updateStatsAfterCheckpointFailed(pendingCheckpointStats, statsTracker, exception);
- LOG.warn(
- "Failed to trigger or complete checkpoint {} for job {}. ({} consecutive failed attempts so far)",
- checkpointId == UNKNOWN_CHECKPOINT_ID ? "UNKNOWN_CHECKPOINT_ID" : checkpointId,
- job,
- continuousFailureCounter.get(),
- exception);
+ if (CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING.equals(
+ exception.getCheckpointFailureReason())) {
+ LOG.info(
+ "Failed to trigger checkpoint for job {} since {}.",
+ job,
+ exception.getMessage());
+ } else {
+ LOG.warn(
+ "Failed to trigger or complete checkpoint {} for job {}. ({} consecutive failed attempts so far)",
+ checkpointId == UNKNOWN_CHECKPOINT_ID ? "UNKNOWN_CHECKPOINT_ID" : checkpointId,
+ job,
+ continuousFailureCounter.get(),
+ exception);
+ }
if (isJobManagerFailure(exception, executionAttemptID)) {
handleJobLevelCheckpointException(checkpointProperties, exception, checkpointId);
} else {