You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/03/03 17:00:34 UTC

[flink] 02/03: [FLINK-26049][checkpoint] Moving checkpoint failure log and report failed checkpoint to CheckpointFailureManager

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 33e3b75bb74dad5f75ad0bea788906e24c3df4e8
Author: fanrui <19...@gmail.com>
AuthorDate: Mon Feb 28 17:45:12 2022 +0800

    [FLINK-26049][checkpoint] Moving checkpoint failure log and report failed checkpoint to CheckpointFailureManager
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 27 +++---------
 .../checkpoint/CheckpointFailureManager.java       | 50 +++++++++++++++++++---
 .../runtime/checkpoint/PendingCheckpoint.java      | 14 ------
 .../runtime/checkpoint/PendingCheckpointTest.java  |  7 ---
 4 files changed, 48 insertions(+), 50 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 4efff80..2efc034 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -79,7 +79,6 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiFunction;
 import java.util.function.Predicate;
 import java.util.stream.Stream;
@@ -177,9 +176,6 @@ public class CheckpointCoordinator {
     /** Actor that receives status updates from the execution graph this coordinator works for. */
     private JobStatusListener jobStatusListener;
 
-    /** The number of consecutive failed trigger attempts. */
-    private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger(0);
-
     /** A handle to the current periodic trigger, to cancel it when necessary. */
     private ScheduledFuture<?> currentPeriodicTrigger;
 
@@ -886,7 +882,6 @@ public class CheckpointCoordinator {
     /** Trigger request is successful. NOTE, it must be invoked if trigger request is successful. */
     private void onTriggerSuccess() {
         isTriggering = false;
-        numUnsuccessfulCheckpointsTriggers.set(0);
         executeQueuedRequest();
     }
 
@@ -935,25 +930,12 @@ public class CheckpointCoordinator {
                             CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, throwable);
 
             if (checkpoint != null && !checkpoint.isDisposed()) {
-                int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
-                LOG.warn(
-                        "Failed to trigger checkpoint {} for job {}. ({} consecutive failed attempts so far)",
-                        checkpoint.getCheckpointId(),
-                        job,
-                        numUnsuccessful,
-                        throwable);
-
                 synchronized (lock) {
                     abortPendingCheckpoint(checkpoint, cause);
                 }
             } else {
-                LOG.info(
-                        "Failed to trigger checkpoint for job {} because {}",
-                        job,
-                        throwable.getMessage());
-
                 failureManager.handleCheckpointException(
-                        checkpoint, checkpointProperties, cause, null);
+                        checkpoint, checkpointProperties, cause, null, job, null, statsTracker);
             }
         } finally {
             isTriggering = false;
@@ -1917,8 +1899,6 @@ public class CheckpointCoordinator {
             final CheckpointException reason =
                     new CheckpointException(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SUSPEND);
             abortPendingAndQueuedCheckpoints(reason);
-
-            numUnsuccessfulCheckpointsTriggers.set(0);
         }
     }
 
@@ -2098,7 +2078,10 @@ public class CheckpointCoordinator {
                         pendingCheckpoint,
                         pendingCheckpoint.getProps(),
                         exception,
-                        executionAttemptID);
+                        executionAttemptID,
+                        job,
+                        getStatsCallback(pendingCheckpoint),
+                        statsTracker);
             } finally {
                 sendAbortedMessages(
                         pendingCheckpoint.getCheckpointPlan().getTasksToCommitTo(),
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
index 8ccccc6..df429a4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
@@ -17,10 +17,14 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nullable;
 
 import java.util.Set;
@@ -34,6 +38,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 /** The checkpoint failure manager which centralized manage checkpoint failure processing logic. */
 public class CheckpointFailureManager {
 
+    private static final Logger LOG = LoggerFactory.getLogger(CheckpointFailureManager.class);
+
     public static final int UNLIMITED_TOLERABLE_FAILURE_NUMBER = Integer.MAX_VALUE;
     public static final String EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE =
             "Exceeded checkpoint tolerable failure threshold.";
@@ -80,25 +86,55 @@ public class CheckpointFailureManager {
      *     strategy can be used.
      * @param exception the checkpoint exception.
      * @param executionAttemptID the execution attempt id, as a safe guard.
+     * @param job the JobID.
+     * @param pendingCheckpointStats the pending checkpoint statistics.
+     * @param statsTracker the tracker for checkpoint statistics.
      */
     public void handleCheckpointException(
             @Nullable PendingCheckpoint pendingCheckpoint,
             CheckpointProperties checkpointProperties,
             CheckpointException exception,
-            @Nullable ExecutionAttemptID executionAttemptID) {
+            @Nullable ExecutionAttemptID executionAttemptID,
+            JobID job,
+            @Nullable PendingCheckpointStats pendingCheckpointStats,
+            CheckpointStatsTracker statsTracker) {
+        long checkpointId =
+                pendingCheckpoint == null
+                        ? UNKNOWN_CHECKPOINT_ID
+                        : pendingCheckpoint.getCheckpointID();
+        updateStatsAfterCheckpointFailed(pendingCheckpointStats, statsTracker, exception);
+
+        LOG.warn(
+                "Failed to trigger checkpoint {} for job {}. ({} consecutive failed attempts so far)",
+                checkpointId == UNKNOWN_CHECKPOINT_ID ? "UNKNOWN_CHECKPOINT_ID" : checkpointId,
+                job,
+                continuousFailureCounter.get(),
+                exception);
         if (isJobManagerFailure(exception, executionAttemptID)) {
-            handleJobLevelCheckpointException(
-                    checkpointProperties,
-                    exception,
-                    pendingCheckpoint == null
-                            ? UNKNOWN_CHECKPOINT_ID
-                            : pendingCheckpoint.getCheckpointID());
+            handleJobLevelCheckpointException(checkpointProperties, exception, checkpointId);
         } else {
             handleTaskLevelCheckpointException(
                     checkNotNull(pendingCheckpoint), exception, checkNotNull(executionAttemptID));
         }
     }
 
+    /**
+     * Updating checkpoint statistics after checkpoint failed.
+     *
+     * @param pendingCheckpointStats the pending checkpoint statistics.
+     * @param exception the checkpoint exception.
+     */
+    private void updateStatsAfterCheckpointFailed(
+            @Nullable PendingCheckpointStats pendingCheckpointStats,
+            CheckpointStatsTracker statsTracker,
+            CheckpointException exception) {
+        if (pendingCheckpointStats != null) {
+            long failureTimestamp = System.currentTimeMillis();
+            statsTracker.reportFailedCheckpoint(
+                    pendingCheckpointStats.toFailedCheckpoint(failureTimestamp, exception));
+        }
+    }
+
     private boolean isJobManagerFailure(
             CheckpointException exception, @Nullable ExecutionAttemptID executionAttemptID) {
         // TODO: Try to get rid of checking nullability of executionAttemptID because false value of
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 9229024..b4bd8ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -544,7 +544,6 @@ public class PendingCheckpoint implements Checkpoint {
         try {
             failureCause = new CheckpointException(reason, cause);
             onCompletionPromise.completeExceptionally(failureCause);
-            reportFailedCheckpoint(statsTracker, failureCause);
             assertAbortSubsumedForced(reason);
         } finally {
             dispose(true, checkpointsCleaner, postCleanup, executor);
@@ -596,19 +595,6 @@ public class PendingCheckpoint implements Checkpoint {
         }
     }
 
-    /**
-     * Reports a failed checkpoint with the given optional cause.
-     *
-     * @param cause The failure cause or <code>null</code>.
-     */
-    private void reportFailedCheckpoint(CheckpointStatsTracker statsTracker, Exception cause) {
-        // to prevent null-pointers from concurrent modification, copy reference onto stack
-        if (pendingCheckpointStats != null) {
-            statsTracker.reportFailedCheckpoint(
-                    pendingCheckpointStats.toFailedCheckpoint(System.currentTimeMillis(), cause));
-        }
-    }
-
     // ------------------------------------------------------------------------
     //  Utilities
     // ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index a0b07b0..392bd0e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -661,13 +661,6 @@ public class PendingCheckpointTest {
     }
 
     private void abort(PendingCheckpoint checkpoint, CheckpointFailureReason reason) {
-        abort(checkpoint, reason, null);
-    }
-
-    private void abort(
-            PendingCheckpoint checkpoint,
-            CheckpointFailureReason reason,
-            PendingCheckpointStats statsCallback) {
         checkpoint.abort(
                 reason, null, new CheckpointsCleaner(), () -> {}, Executors.directExecutor(), null);
     }