You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/07/31 08:43:00 UTC

[flink] 06/07: [FLINK-13440] Move checkpoint failure logic from scheduler to failure manager

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 58bf1bf4cb244493ad25fbe485edef0565d72649
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Fri Jul 26 14:52:22 2019 +0200

    [FLINK-13440] Move checkpoint failure logic from scheduler to failure manager
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 10 +++++++-
 .../checkpoint/CheckpointFailureManager.java       | 23 +++++++++++++++++
 .../flink/runtime/scheduler/LegacyScheduler.java   | 30 +++-------------------
 3 files changed, 35 insertions(+), 28 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 682685c..7f258c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -59,6 +59,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -395,7 +396,14 @@ public class CheckpointCoordinator {
 			@Nullable final String targetLocation) {
 
 		final CheckpointProperties properties = CheckpointProperties.forSyncSavepoint();
-		return triggerSavepointInternal(timestamp, properties, advanceToEndOfEventTime, targetLocation);
+		return triggerSavepointInternal(timestamp, properties, advanceToEndOfEventTime, targetLocation).handle(
+				(completedCheckpoint, throwable) -> {
+					if (throwable != null) {
+						failureManager.handleSynchronousSavepointFailure(throwable);
+						throw new CompletionException(throwable);
+					}
+					return completedCheckpoint;
+				});
 	}
 
 	private CompletableFuture<CompletedCheckpoint> triggerSavepointInternal(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
index 07f37fd..f65a30a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import java.util.Set;
@@ -124,6 +125,28 @@ public class CheckpointFailureManager {
 	}
 
 	/**
+	 * Fails the whole job graph in case an in-progress synchronous savepoint is discarded.
+	 *
+	 * <p>If the checkpoint failure was cancelled at the checkpoint coordinator, i.e. before
+	 * the synchronous savepoint barrier was sent to the tasks, then we do not cancel the job
+	 * as we do not risk having a deadlock.
+	 *
+	 * @param cause The reason why the job is cancelled.
+	 * */
+	void handleSynchronousSavepointFailure(final Throwable cause) {
+		if (!isPreFlightFailure(cause)) {
+			failureCallback.failJob(cause);
+		}
+	}
+
+	private static boolean isPreFlightFailure(final Throwable cause) {
+		return ExceptionUtils.findThrowable(cause, CheckpointException.class)
+				.map(CheckpointException::getCheckpointFailureReason)
+				.map(CheckpointFailureReason::isPreFlight)
+				.orElse(false);
+	}
+
+	/**
 	 * A callback interface about how to fail a job.
 	 */
 	public interface FailJobCallback {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java
index fad7c9f..e33b6b6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java
@@ -30,8 +30,6 @@ import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
-import org.apache.flink.runtime.checkpoint.CheckpointException;
-import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
@@ -77,7 +75,6 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
-import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.function.FunctionUtils;
@@ -608,30 +605,16 @@ public class LegacyScheduler implements SchedulerNG {
 					"default via key '" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'."));
 		}
 
-		final long now = System.currentTimeMillis();
-
 		// we stop the checkpoint coordinator so that we are guaranteed
 		// to have only the data of the synchronous savepoint committed.
 		// in case of failure, and if the job restarts, the coordinator
 		// will be restarted by the CheckpointCoordinatorDeActivator.
 		checkpointCoordinator.stopCheckpointScheduler();
 
+		final long now = System.currentTimeMillis();
 		final CompletableFuture<String> savepointFuture = checkpointCoordinator
-			.triggerSynchronousSavepoint(now, advanceToEndOfEventTime, targetDirectory)
-			.handle((completedCheckpoint, throwable) -> {
-				if (throwable != null) {
-					log.info("Failed during stopping job {} with a savepoint. Reason: {}", jobGraph.getJobID(), throwable.getMessage());
-					// it's possible this failure hasn't triggered a task failure, but rather the savepoint was aborted
-					// "softly" (for example on checkpoints barriers alignment, due to buffer limits).
-					// such situation may leave other tasks of the job in a blocking state.
-					// to workaround this, we fail the whole job.
-					if (!isCheckpointPreFlightFailure(throwable)) {
-						executionGraph.failGlobal(throwable);
-					}
-					throw new CompletionException(throwable);
-				}
-				return completedCheckpoint.getExternalPointer();
-			});
+				.triggerSynchronousSavepoint(now, advanceToEndOfEventTime, targetDirectory)
+				.thenApply(CompletedCheckpoint::getExternalPointer);
 
 		final CompletableFuture<JobStatus> terminationFuture = executionGraph
 			.getTerminationFuture()
@@ -659,11 +642,4 @@ public class LegacyScheduler implements SchedulerNG {
 			.map(TaskManagerLocation::toString)
 			.orElse("Unknown location");
 	}
-
-	private static boolean isCheckpointPreFlightFailure(Throwable throwable) {
-		return ExceptionUtils.findThrowable(throwable, CheckpointException.class)
-			.map(CheckpointException::getCheckpointFailureReason)
-			.map(CheckpointFailureReason::isPreFlight)
-			.orElse(false);
-	}
 }