You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/03/03 17:00:34 UTC
[flink] 02/03: [FLINK-26049][checkpoint] Moving checkpoint failure log and report failed checkpoint to CheckpointFailureManager
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 33e3b75bb74dad5f75ad0bea788906e24c3df4e8
Author: fanrui <19...@gmail.com>
AuthorDate: Mon Feb 28 17:45:12 2022 +0800
[FLINK-26049][checkpoint] Moving checkpoint failure log and report failed checkpoint to CheckpointFailureManager
---
.../runtime/checkpoint/CheckpointCoordinator.java | 27 +++---------
.../checkpoint/CheckpointFailureManager.java | 50 +++++++++++++++++++---
.../runtime/checkpoint/PendingCheckpoint.java | 14 ------
.../runtime/checkpoint/PendingCheckpointTest.java | 7 ---
4 files changed, 48 insertions(+), 50 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 4efff80..2efc034 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -79,7 +79,6 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.stream.Stream;
@@ -177,9 +176,6 @@ public class CheckpointCoordinator {
/** Actor that receives status updates from the execution graph this coordinator works for. */
private JobStatusListener jobStatusListener;
- /** The number of consecutive failed trigger attempts. */
- private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger(0);
-
/** A handle to the current periodic trigger, to cancel it when necessary. */
private ScheduledFuture<?> currentPeriodicTrigger;
@@ -886,7 +882,6 @@ public class CheckpointCoordinator {
/** Trigger request is successful. NOTE, it must be invoked if trigger request is successful. */
private void onTriggerSuccess() {
isTriggering = false;
- numUnsuccessfulCheckpointsTriggers.set(0);
executeQueuedRequest();
}
@@ -935,25 +930,12 @@ public class CheckpointCoordinator {
CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, throwable);
if (checkpoint != null && !checkpoint.isDisposed()) {
- int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
- LOG.warn(
- "Failed to trigger checkpoint {} for job {}. ({} consecutive failed attempts so far)",
- checkpoint.getCheckpointId(),
- job,
- numUnsuccessful,
- throwable);
-
synchronized (lock) {
abortPendingCheckpoint(checkpoint, cause);
}
} else {
- LOG.info(
- "Failed to trigger checkpoint for job {} because {}",
- job,
- throwable.getMessage());
-
failureManager.handleCheckpointException(
- checkpoint, checkpointProperties, cause, null);
+ checkpoint, checkpointProperties, cause, null, job, null, statsTracker);
}
} finally {
isTriggering = false;
@@ -1917,8 +1899,6 @@ public class CheckpointCoordinator {
final CheckpointException reason =
new CheckpointException(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SUSPEND);
abortPendingAndQueuedCheckpoints(reason);
-
- numUnsuccessfulCheckpointsTriggers.set(0);
}
}
@@ -2098,7 +2078,10 @@ public class CheckpointCoordinator {
pendingCheckpoint,
pendingCheckpoint.getProps(),
exception,
- executionAttemptID);
+ executionAttemptID,
+ job,
+ getStatsCallback(pendingCheckpoint),
+ statsTracker);
} finally {
sendAbortedMessages(
pendingCheckpoint.getCheckpointPlan().getTasksToCommitTo(),
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
index 8ccccc6..df429a4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
@@ -17,10 +17,14 @@
package org.apache.flink.runtime.checkpoint;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import javax.annotation.Nullable;
import java.util.Set;
@@ -34,6 +38,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
/** The checkpoint failure manager which centralized manage checkpoint failure processing logic. */
public class CheckpointFailureManager {
+ private static final Logger LOG = LoggerFactory.getLogger(CheckpointFailureManager.class);
+
public static final int UNLIMITED_TOLERABLE_FAILURE_NUMBER = Integer.MAX_VALUE;
public static final String EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE =
"Exceeded checkpoint tolerable failure threshold.";
@@ -80,25 +86,55 @@ public class CheckpointFailureManager {
* strategy can be used.
* @param exception the checkpoint exception.
* @param executionAttemptID the execution attempt id, as a safe guard.
+ * @param job the JobID.
+ * @param pendingCheckpointStats the pending checkpoint statistics.
+ * @param statsTracker the tracker for checkpoint statistics.
*/
public void handleCheckpointException(
@Nullable PendingCheckpoint pendingCheckpoint,
CheckpointProperties checkpointProperties,
CheckpointException exception,
- @Nullable ExecutionAttemptID executionAttemptID) {
+ @Nullable ExecutionAttemptID executionAttemptID,
+ JobID job,
+ @Nullable PendingCheckpointStats pendingCheckpointStats,
+ CheckpointStatsTracker statsTracker) {
+ long checkpointId =
+ pendingCheckpoint == null
+ ? UNKNOWN_CHECKPOINT_ID
+ : pendingCheckpoint.getCheckpointID();
+ updateStatsAfterCheckpointFailed(pendingCheckpointStats, statsTracker, exception);
+
+ LOG.warn(
+ "Failed to trigger checkpoint {} for job {}. ({} consecutive failed attempts so far)",
+ checkpointId == UNKNOWN_CHECKPOINT_ID ? "UNKNOWN_CHECKPOINT_ID" : checkpointId,
+ job,
+ continuousFailureCounter.get(),
+ exception);
if (isJobManagerFailure(exception, executionAttemptID)) {
- handleJobLevelCheckpointException(
- checkpointProperties,
- exception,
- pendingCheckpoint == null
- ? UNKNOWN_CHECKPOINT_ID
- : pendingCheckpoint.getCheckpointID());
+ handleJobLevelCheckpointException(checkpointProperties, exception, checkpointId);
} else {
handleTaskLevelCheckpointException(
checkNotNull(pendingCheckpoint), exception, checkNotNull(executionAttemptID));
}
}
+ /**
+ * Updating checkpoint statistics after checkpoint failed.
+ *
+ * @param pendingCheckpointStats the pending checkpoint statistics.
+ * @param exception the checkpoint exception.
+ */
+ private void updateStatsAfterCheckpointFailed(
+ @Nullable PendingCheckpointStats pendingCheckpointStats,
+ CheckpointStatsTracker statsTracker,
+ CheckpointException exception) {
+ if (pendingCheckpointStats != null) {
+ long failureTimestamp = System.currentTimeMillis();
+ statsTracker.reportFailedCheckpoint(
+ pendingCheckpointStats.toFailedCheckpoint(failureTimestamp, exception));
+ }
+ }
+
private boolean isJobManagerFailure(
CheckpointException exception, @Nullable ExecutionAttemptID executionAttemptID) {
// TODO: Try to get rid of checking nullability of executionAttemptID because false value of
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 9229024..b4bd8ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -544,7 +544,6 @@ public class PendingCheckpoint implements Checkpoint {
try {
failureCause = new CheckpointException(reason, cause);
onCompletionPromise.completeExceptionally(failureCause);
- reportFailedCheckpoint(statsTracker, failureCause);
assertAbortSubsumedForced(reason);
} finally {
dispose(true, checkpointsCleaner, postCleanup, executor);
@@ -596,19 +595,6 @@ public class PendingCheckpoint implements Checkpoint {
}
}
- /**
- * Reports a failed checkpoint with the given optional cause.
- *
- * @param cause The failure cause or <code>null</code>.
- */
- private void reportFailedCheckpoint(CheckpointStatsTracker statsTracker, Exception cause) {
- // to prevent null-pointers from concurrent modification, copy reference onto stack
- if (pendingCheckpointStats != null) {
- statsTracker.reportFailedCheckpoint(
- pendingCheckpointStats.toFailedCheckpoint(System.currentTimeMillis(), cause));
- }
- }
-
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index a0b07b0..392bd0e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -661,13 +661,6 @@ public class PendingCheckpointTest {
}
private void abort(PendingCheckpoint checkpoint, CheckpointFailureReason reason) {
- abort(checkpoint, reason, null);
- }
-
- private void abort(
- PendingCheckpoint checkpoint,
- CheckpointFailureReason reason,
- PendingCheckpointStats statsCallback) {
checkpoint.abort(
reason, null, new CheckpointsCleaner(), () -> {}, Executors.directExecutor(), null);
}