You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/16 16:28:00 UTC

[flink] 11/13: [FLINK-16357][checkpointing] Only global failure/restores reset the coordinator state.

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8e3e9ccea1b376050c46a7b055a0055380e0c2be
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed May 13 17:58:44 2020 +0200

    [FLINK-16357][checkpointing] Only global failure/restores reset the coordinator state.
    
    The failure handling results are now flagged with whether the failure was a global failure
    or a task failure. Based on that, the Scheduler invokes different restore methods on the
    CheckpointCoordinator.
---
 .../failover/flip1/ExecutionFailureHandler.java    | 17 +++++++----
 .../failover/flip1/FailureHandlingResult.java      | 34 ++++++++++++++++++----
 .../flink/runtime/scheduler/DefaultScheduler.java  |  7 +++--
 .../flink/runtime/scheduler/SchedulerBase.java     | 25 ++++++++++------
 .../failover/flip1/FailureHandlingResultTest.java  |  5 ++--
 .../OperatorCoordinatorSchedulerTest.java          | 12 ++++++++
 6 files changed, 74 insertions(+), 26 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java
index 63d5e88..e6f8fcb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java
@@ -75,7 +75,7 @@ public class ExecutionFailureHandler {
 	 * @return result of the failure handling
 	 */
 	public FailureHandlingResult getFailureHandlingResult(ExecutionVertexID failedTask, Throwable cause) {
-		return handleFailure(cause, failoverStrategy.getTasksNeedingRestart(failedTask, cause));
+		return handleFailure(cause, failoverStrategy.getTasksNeedingRestart(failedTask, cause), false);
 	}
 
 	/**
@@ -90,15 +90,18 @@ public class ExecutionFailureHandler {
 			cause,
 			IterableUtils.toStream(schedulingTopology.getVertices())
 				.map(SchedulingExecutionVertex::getId)
-				.collect(Collectors.toSet()));
+				.collect(Collectors.toSet()),
+			true);
 	}
 
 	private FailureHandlingResult handleFailure(
 			final Throwable cause,
-			final Set<ExecutionVertexID> verticesToRestart) {
+			final Set<ExecutionVertexID> verticesToRestart,
+			final boolean globalFailure) {
 
 		if (isUnrecoverableError(cause)) {
-			return FailureHandlingResult.unrecoverable(new JobException("The failure is not recoverable", cause));
+			return FailureHandlingResult.unrecoverable(
+				new JobException("The failure is not recoverable", cause), globalFailure);
 		}
 
 		restartBackoffTimeStrategy.notifyFailure(cause);
@@ -107,10 +110,12 @@ public class ExecutionFailureHandler {
 
 			return FailureHandlingResult.restartable(
 				verticesToRestart,
-				restartBackoffTimeStrategy.getBackoffTime());
+				restartBackoffTimeStrategy.getBackoffTime(),
+				globalFailure);
 		} else {
 			return FailureHandlingResult.unrecoverable(
-				new JobException("Recovery is suppressed by " + restartBackoffTimeStrategy, cause));
+				new JobException("Recovery is suppressed by " + restartBackoffTimeStrategy, cause),
+				globalFailure);
 		}
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
index 51e487d..f45daa0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
@@ -41,18 +41,22 @@ public class FailureHandlingResult {
 	/** Reason why the failure is not recoverable. */
 	private final Throwable error;
 
+	/** True if the original failure was a global failure. **/
+	private final boolean globalFailure;
+
 	/**
 	 * Creates a result of a set of tasks to restart to recover from the failure.
 	 *
 	 * @param verticesToRestart containing task vertices to restart to recover from the failure
 	 * @param restartDelayMS indicate a delay before conducting the restart
 	 */
-	private FailureHandlingResult(Set<ExecutionVertexID> verticesToRestart, long restartDelayMS) {
+	private FailureHandlingResult(Set<ExecutionVertexID> verticesToRestart, long restartDelayMS, boolean globalFailure) {
 		checkState(restartDelayMS >= 0);
 
 		this.verticesToRestart = Collections.unmodifiableSet(checkNotNull(verticesToRestart));
 		this.restartDelayMS = restartDelayMS;
 		this.error = null;
+		this.globalFailure = globalFailure;
 	}
 
 	/**
@@ -60,10 +64,11 @@ public class FailureHandlingResult {
 	 *
 	 * @param error reason why the failure is not recoverable
 	 */
-	private FailureHandlingResult(Throwable error) {
+	private FailureHandlingResult(Throwable error, boolean globalFailure) {
 		this.verticesToRestart = null;
 		this.restartDelayMS = -1;
 		this.error = checkNotNull(error);
+		this.globalFailure = globalFailure;
 	}
 
 	/**
@@ -115,23 +120,40 @@ public class FailureHandlingResult {
 	}
 
 	/**
+	 * Checks if this failure was a global failure, i.e., coming from a "safety net" failover that involved
+	 * all tasks and should reset also components like the coordinators.
+	 */
+	public boolean isGlobalFailure() {
+		return globalFailure;
+	}
+
+	/**
 	 * Creates a result of a set of tasks to restart to recover from the failure.
 	 *
+	 * <p>The result can be flagged to be from a global failure triggered by the scheduler, rather than from
+	 * the failure of an individual task.
+	 *
 	 * @param verticesToRestart containing task vertices to restart to recover from the failure
 	 * @param restartDelayMS indicate a delay before conducting the restart
 	 * @return result of a set of tasks to restart to recover from the failure
 	 */
-	public static FailureHandlingResult restartable(Set<ExecutionVertexID> verticesToRestart, long restartDelayMS) {
-		return new FailureHandlingResult(verticesToRestart, restartDelayMS);
+	public static FailureHandlingResult restartable(
+			Set<ExecutionVertexID> verticesToRestart,
+			long restartDelayMS,
+			boolean globalFailure) {
+		return new FailureHandlingResult(verticesToRestart, restartDelayMS, globalFailure);
 	}
 
 	/**
 	 * Creates a result that the failure is not recoverable and no restarting should be conducted.
 	 *
+	 * <p>The result can be flagged to be from a global failure triggered by the scheduler, rather than from
+	 * the failure of an individual task.
+	 *
 	 * @param error reason why the failure is not recoverable
 	 * @return result indicating the failure is not recoverable
 	 */
-	public static FailureHandlingResult unrecoverable(Throwable error) {
-		return new FailureHandlingResult(error);
+	public static FailureHandlingResult unrecoverable(Throwable error, boolean globalFailure) {
+		return new FailureHandlingResult(error, globalFailure);
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index dcb6549..e42af43 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -221,6 +221,7 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
 
 		final Set<ExecutionVertexVersion> executionVertexVersions =
 			new HashSet<>(executionVertexVersioner.recordVertexModifications(verticesToRestart).values());
+		final boolean globalRecovery = failureHandlingResult.isGlobalFailure();
 
 		addVerticesToRestartPending(verticesToRestart);
 
@@ -228,7 +229,7 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
 
 		delayExecutor.schedule(
 			() -> FutureUtils.assertNoException(
-				cancelFuture.thenRunAsync(restartTasks(executionVertexVersions), getMainThreadExecutor())),
+				cancelFuture.thenRunAsync(restartTasks(executionVertexVersions, globalRecovery), getMainThreadExecutor())),
 			failureHandlingResult.getRestartDelayMS(),
 			TimeUnit.MILLISECONDS);
 	}
@@ -245,7 +246,7 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
 		}
 	}
 
-	private Runnable restartTasks(final Set<ExecutionVertexVersion> executionVertexVersions) {
+	private Runnable restartTasks(final Set<ExecutionVertexVersion> executionVertexVersions, final boolean isGlobalRecovery) {
 		return () -> {
 			final Set<ExecutionVertexID> verticesToRestart = executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions);
 
@@ -254,7 +255,7 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
 			resetForNewExecutions(verticesToRestart);
 
 			try {
-				restoreState(verticesToRestart);
+				restoreState(verticesToRestart, isGlobalRecovery);
 			} catch (Throwable t) {
 				handleGlobalFailure(t);
 				return;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index d8908c2..3d510fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -321,18 +321,25 @@ public abstract class SchedulerBase implements SchedulerNG {
 
 	}
 
-	protected void restoreState(final Set<ExecutionVertexID> vertices) throws Exception {
+	protected void restoreState(final Set<ExecutionVertexID> vertices, final boolean isGlobalRecovery) throws Exception {
+		final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
+		if (checkpointCoordinator == null) {
+			return;
+		}
+
 		// if there is checkpointed state, reload it into the executions
-		if (executionGraph.getCheckpointCoordinator() != null) {
-			// abort pending checkpoints to
-			// i) enable new checkpoint triggering without waiting for last checkpoint expired.
-			// ii) ensure the EXACTLY_ONCE semantics if needed.
-			executionGraph.getCheckpointCoordinator().abortPendingCheckpoints(
+
+		// abort pending checkpoints to
+		// i) enable new checkpoint triggering without waiting for last checkpoint expired.
+		// ii) ensure the EXACTLY_ONCE semantics if needed.
+		checkpointCoordinator.abortPendingCheckpoints(
 				new CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION));
 
-			executionGraph.getCheckpointCoordinator().restoreLatestCheckpointedStateToAll(
-				getInvolvedExecutionJobVertices(vertices),
-				true);
+		final Set<ExecutionJobVertex> jobVerticesToRestore = getInvolvedExecutionJobVertices(vertices);
+		if (isGlobalRecovery) {
+			checkpointCoordinator.restoreLatestCheckpointedStateToAll(jobVerticesToRestore, true);
+		} else {
+			checkpointCoordinator.restoreLatestCheckpointedStateToSubtasks(jobVerticesToRestore);
 		}
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResultTest.java
index 8943655..c64d50d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResultTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.executiongraph.failover.flip1;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
 import java.util.HashSet;
@@ -45,7 +46,7 @@ public class FailureHandlingResultTest extends TestLogger {
 		Set<ExecutionVertexID> tasks = new HashSet<>();
 		tasks.add(new ExecutionVertexID(new JobVertexID(), 0));
 		long delay = 1234;
-		FailureHandlingResult result = FailureHandlingResult.restartable(tasks, delay);
+		FailureHandlingResult result = FailureHandlingResult.restartable(tasks, delay, false);
 
 		assertTrue(result.canRestart());
 		assertEquals(delay, result.getRestartDelayMS());
@@ -65,7 +66,7 @@ public class FailureHandlingResultTest extends TestLogger {
 	public void testRestartingSuppressedFailureHandlingResult() {
 		// create a FailureHandlingResult with error
 		Throwable error = new Exception("test error");
-		FailureHandlingResult result = FailureHandlingResult.unrecoverable(error);
+		FailureHandlingResult result = FailureHandlingResult.unrecoverable(error, false);
 
 		assertFalse(result.canRestart());
 		assertEquals(error, result.getError());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
index bb0e68e..a3fb582 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
@@ -71,6 +71,7 @@ import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -272,6 +273,17 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 	}
 
 	@Test
+	public void testLocalFailureDoesNotResetToCheckpoint() throws Exception {
+		final DefaultScheduler scheduler = createSchedulerAndDeployTasks();
+		final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);
+
+		takeCompleteCheckpoint(scheduler, coordinator, new byte[] {37, 11, 83, 4});
+		failAndRestartTask(scheduler, 0);
+
+		assertNull("coordinator should not have a restored checkpoint", coordinator.getLastRestoredCheckpointState());
+	}
+
+	@Test
 	public void testConfirmCheckpointComplete() throws Exception {
 		final DefaultScheduler scheduler = createSchedulerAndDeployTasks();
 		final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);