You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/07/31 08:43:00 UTC
[flink] 06/07: [FLINK-13440] Move checkpoint failure logic from
scheduler to failure manager
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 58bf1bf4cb244493ad25fbe485edef0565d72649
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Fri Jul 26 14:52:22 2019 +0200
[FLINK-13440] Move checkpoint failure logic from scheduler to failure manager
---
.../runtime/checkpoint/CheckpointCoordinator.java | 10 +++++++-
.../checkpoint/CheckpointFailureManager.java | 23 +++++++++++++++++
.../flink/runtime/scheduler/LegacyScheduler.java | 30 +++-------------------
3 files changed, 35 insertions(+), 28 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 682685c..7f258c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -59,6 +59,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -395,7 +396,14 @@ public class CheckpointCoordinator {
@Nullable final String targetLocation) {
final CheckpointProperties properties = CheckpointProperties.forSyncSavepoint();
- return triggerSavepointInternal(timestamp, properties, advanceToEndOfEventTime, targetLocation);
+ return triggerSavepointInternal(timestamp, properties, advanceToEndOfEventTime, targetLocation).handle(
+ (completedCheckpoint, throwable) -> {
+ if (throwable != null) {
+ failureManager.handleSynchronousSavepointFailure(throwable);
+ throw new CompletionException(throwable);
+ }
+ return completedCheckpoint;
+ });
}
private CompletableFuture<CompletedCheckpoint> triggerSavepointInternal(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
index 07f37fd..f65a30a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
@@ -17,6 +17,7 @@
package org.apache.flink.runtime.checkpoint;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import java.util.Set;
@@ -124,6 +125,28 @@ public class CheckpointFailureManager {
}
/**
+ * Fails the whole job graph in case an in-progress synchronous savepoint is discarded.
+ *
+ * <p>If the checkpoint failure was cancelled at the checkpoint coordinator, i.e. before
+ * the synchronous savepoint barrier was sent to the tasks, then we do not cancel the job
+ * as we do not risk having a deadlock.
+ *
+ * @param cause The reason why the job is cancelled.
+ * */
+ void handleSynchronousSavepointFailure(final Throwable cause) {
+ if (!isPreFlightFailure(cause)) {
+ failureCallback.failJob(cause);
+ }
+ }
+
+ private static boolean isPreFlightFailure(final Throwable cause) {
+ return ExceptionUtils.findThrowable(cause, CheckpointException.class)
+ .map(CheckpointException::getCheckpointFailureReason)
+ .map(CheckpointFailureReason::isPreFlight)
+ .orElse(false);
+ }
+
+ /**
* A callback interface about how to fail a job.
*/
public interface FailJobCallback {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java
index fad7c9f..e33b6b6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java
@@ -30,8 +30,6 @@ import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
-import org.apache.flink.runtime.checkpoint.CheckpointException;
-import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
@@ -77,7 +75,6 @@ import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
-import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.function.FunctionUtils;
@@ -608,30 +605,16 @@ public class LegacyScheduler implements SchedulerNG {
"default via key '" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'."));
}
- final long now = System.currentTimeMillis();
-
// we stop the checkpoint coordinator so that we are guaranteed
// to have only the data of the synchronous savepoint committed.
// in case of failure, and if the job restarts, the coordinator
// will be restarted by the CheckpointCoordinatorDeActivator.
checkpointCoordinator.stopCheckpointScheduler();
+ final long now = System.currentTimeMillis();
final CompletableFuture<String> savepointFuture = checkpointCoordinator
- .triggerSynchronousSavepoint(now, advanceToEndOfEventTime, targetDirectory)
- .handle((completedCheckpoint, throwable) -> {
- if (throwable != null) {
- log.info("Failed during stopping job {} with a savepoint. Reason: {}", jobGraph.getJobID(), throwable.getMessage());
- // it's possible this failure hasn't triggered a task failure, but rather the savepoint was aborted
- // "softly" (for example on checkpoints barriers alignment, due to buffer limits).
- // such situation may leave other tasks of the job in a blocking state.
- // to workaround this, we fail the whole job.
- if (!isCheckpointPreFlightFailure(throwable)) {
- executionGraph.failGlobal(throwable);
- }
- throw new CompletionException(throwable);
- }
- return completedCheckpoint.getExternalPointer();
- });
+ .triggerSynchronousSavepoint(now, advanceToEndOfEventTime, targetDirectory)
+ .thenApply(CompletedCheckpoint::getExternalPointer);
final CompletableFuture<JobStatus> terminationFuture = executionGraph
.getTerminationFuture()
@@ -659,11 +642,4 @@ public class LegacyScheduler implements SchedulerNG {
.map(TaskManagerLocation::toString)
.orElse("Unknown location");
}
-
- private static boolean isCheckpointPreFlightFailure(Throwable throwable) {
- return ExceptionUtils.findThrowable(throwable, CheckpointException.class)
- .map(CheckpointException::getCheckpointFailureReason)
- .map(CheckpointFailureReason::isPreFlight)
- .orElse(false);
- }
}