You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/16 16:28:00 UTC
[flink] 11/13: [FLINK-16357][checkpointing] Only global
failure/restores reset the coordinator state.
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8e3e9ccea1b376050c46a7b055a0055380e0c2be
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed May 13 17:58:44 2020 +0200
[FLINK-16357][checkpointing] Only global failure/restores reset the coordinator state.
The failure handling results are now flagged with whether the failure was a global failure
or a task failure. Based on that, the Scheduler invokes different restore methods on the
CheckpointCoordinator.
---
.../failover/flip1/ExecutionFailureHandler.java | 17 +++++++----
.../failover/flip1/FailureHandlingResult.java | 34 ++++++++++++++++++----
.../flink/runtime/scheduler/DefaultScheduler.java | 7 +++--
.../flink/runtime/scheduler/SchedulerBase.java | 25 ++++++++++------
.../failover/flip1/FailureHandlingResultTest.java | 5 ++--
.../OperatorCoordinatorSchedulerTest.java | 12 ++++++++
6 files changed, 74 insertions(+), 26 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java
index 63d5e88..e6f8fcb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java
@@ -75,7 +75,7 @@ public class ExecutionFailureHandler {
* @return result of the failure handling
*/
public FailureHandlingResult getFailureHandlingResult(ExecutionVertexID failedTask, Throwable cause) {
- return handleFailure(cause, failoverStrategy.getTasksNeedingRestart(failedTask, cause));
+ return handleFailure(cause, failoverStrategy.getTasksNeedingRestart(failedTask, cause), false);
}
/**
@@ -90,15 +90,18 @@ public class ExecutionFailureHandler {
cause,
IterableUtils.toStream(schedulingTopology.getVertices())
.map(SchedulingExecutionVertex::getId)
- .collect(Collectors.toSet()));
+ .collect(Collectors.toSet()),
+ true);
}
private FailureHandlingResult handleFailure(
final Throwable cause,
- final Set<ExecutionVertexID> verticesToRestart) {
+ final Set<ExecutionVertexID> verticesToRestart,
+ final boolean globalFailure) {
if (isUnrecoverableError(cause)) {
- return FailureHandlingResult.unrecoverable(new JobException("The failure is not recoverable", cause));
+ return FailureHandlingResult.unrecoverable(
+ new JobException("The failure is not recoverable", cause), globalFailure);
}
restartBackoffTimeStrategy.notifyFailure(cause);
@@ -107,10 +110,12 @@ public class ExecutionFailureHandler {
return FailureHandlingResult.restartable(
verticesToRestart,
- restartBackoffTimeStrategy.getBackoffTime());
+ restartBackoffTimeStrategy.getBackoffTime(),
+ globalFailure);
} else {
return FailureHandlingResult.unrecoverable(
- new JobException("Recovery is suppressed by " + restartBackoffTimeStrategy, cause));
+ new JobException("Recovery is suppressed by " + restartBackoffTimeStrategy, cause),
+ globalFailure);
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
index 51e487d..f45daa0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
@@ -41,18 +41,22 @@ public class FailureHandlingResult {
/** Reason why the failure is not recoverable. */
private final Throwable error;
+ /** True if the original failure was a global failure. **/
+ private final boolean globalFailure;
+
/**
* Creates a result of a set of tasks to restart to recover from the failure.
*
* @param verticesToRestart containing task vertices to restart to recover from the failure
* @param restartDelayMS indicate a delay before conducting the restart
*/
- private FailureHandlingResult(Set<ExecutionVertexID> verticesToRestart, long restartDelayMS) {
+ private FailureHandlingResult(Set<ExecutionVertexID> verticesToRestart, long restartDelayMS, boolean globalFailure) {
checkState(restartDelayMS >= 0);
this.verticesToRestart = Collections.unmodifiableSet(checkNotNull(verticesToRestart));
this.restartDelayMS = restartDelayMS;
this.error = null;
+ this.globalFailure = globalFailure;
}
/**
@@ -60,10 +64,11 @@ public class FailureHandlingResult {
*
* @param error reason why the failure is not recoverable
*/
- private FailureHandlingResult(Throwable error) {
+ private FailureHandlingResult(Throwable error, boolean globalFailure) {
this.verticesToRestart = null;
this.restartDelayMS = -1;
this.error = checkNotNull(error);
+ this.globalFailure = globalFailure;
}
/**
@@ -115,23 +120,40 @@ public class FailureHandlingResult {
}
/**
+ * Checks if this failure was a global failure, i.e., coming from a "safety net" failover that involved
+ * all tasks and should reset also components like the coordinators.
+ */
+ public boolean isGlobalFailure() {
+ return globalFailure;
+ }
+
+ /**
* Creates a result of a set of tasks to restart to recover from the failure.
*
+ * <p>The result can be flagged to be from a global failure triggered by the scheduler, rather than from
+ * the failure of an individual task.
+ *
* @param verticesToRestart containing task vertices to restart to recover from the failure
* @param restartDelayMS indicate a delay before conducting the restart
* @return result of a set of tasks to restart to recover from the failure
*/
- public static FailureHandlingResult restartable(Set<ExecutionVertexID> verticesToRestart, long restartDelayMS) {
- return new FailureHandlingResult(verticesToRestart, restartDelayMS);
+ public static FailureHandlingResult restartable(
+ Set<ExecutionVertexID> verticesToRestart,
+ long restartDelayMS,
+ boolean globalFailure) {
+ return new FailureHandlingResult(verticesToRestart, restartDelayMS, globalFailure);
}
/**
* Creates a result that the failure is not recoverable and no restarting should be conducted.
*
+ * <p>The result can be flagged to be from a global failure triggered by the scheduler, rather than from
+ * the failure of an individual task.
+ *
* @param error reason why the failure is not recoverable
* @return result indicating the failure is not recoverable
*/
- public static FailureHandlingResult unrecoverable(Throwable error) {
- return new FailureHandlingResult(error);
+ public static FailureHandlingResult unrecoverable(Throwable error, boolean globalFailure) {
+ return new FailureHandlingResult(error, globalFailure);
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index dcb6549..e42af43 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -221,6 +221,7 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
final Set<ExecutionVertexVersion> executionVertexVersions =
new HashSet<>(executionVertexVersioner.recordVertexModifications(verticesToRestart).values());
+ final boolean globalRecovery = failureHandlingResult.isGlobalFailure();
addVerticesToRestartPending(verticesToRestart);
@@ -228,7 +229,7 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
delayExecutor.schedule(
() -> FutureUtils.assertNoException(
- cancelFuture.thenRunAsync(restartTasks(executionVertexVersions), getMainThreadExecutor())),
+ cancelFuture.thenRunAsync(restartTasks(executionVertexVersions, globalRecovery), getMainThreadExecutor())),
failureHandlingResult.getRestartDelayMS(),
TimeUnit.MILLISECONDS);
}
@@ -245,7 +246,7 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
}
}
- private Runnable restartTasks(final Set<ExecutionVertexVersion> executionVertexVersions) {
+ private Runnable restartTasks(final Set<ExecutionVertexVersion> executionVertexVersions, final boolean isGlobalRecovery) {
return () -> {
final Set<ExecutionVertexID> verticesToRestart = executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions);
@@ -254,7 +255,7 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
resetForNewExecutions(verticesToRestart);
try {
- restoreState(verticesToRestart);
+ restoreState(verticesToRestart, isGlobalRecovery);
} catch (Throwable t) {
handleGlobalFailure(t);
return;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index d8908c2..3d510fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -321,18 +321,25 @@ public abstract class SchedulerBase implements SchedulerNG {
}
- protected void restoreState(final Set<ExecutionVertexID> vertices) throws Exception {
+ protected void restoreState(final Set<ExecutionVertexID> vertices, final boolean isGlobalRecovery) throws Exception {
+ final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
+ if (checkpointCoordinator == null) {
+ return;
+ }
+
// if there is checkpointed state, reload it into the executions
- if (executionGraph.getCheckpointCoordinator() != null) {
- // abort pending checkpoints to
- // i) enable new checkpoint triggering without waiting for last checkpoint expired.
- // ii) ensure the EXACTLY_ONCE semantics if needed.
- executionGraph.getCheckpointCoordinator().abortPendingCheckpoints(
+
+ // abort pending checkpoints to
+ // i) enable new checkpoint triggering without waiting for last checkpoint expired.
+ // ii) ensure the EXACTLY_ONCE semantics if needed.
+ checkpointCoordinator.abortPendingCheckpoints(
new CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION));
- executionGraph.getCheckpointCoordinator().restoreLatestCheckpointedStateToAll(
- getInvolvedExecutionJobVertices(vertices),
- true);
+ final Set<ExecutionJobVertex> jobVerticesToRestore = getInvolvedExecutionJobVertices(vertices);
+ if (isGlobalRecovery) {
+ checkpointCoordinator.restoreLatestCheckpointedStateToAll(jobVerticesToRestore, true);
+ } else {
+ checkpointCoordinator.restoreLatestCheckpointedStateToSubtasks(jobVerticesToRestore);
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResultTest.java
index 8943655..c64d50d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResultTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.executiongraph.failover.flip1;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.util.TestLogger;
+
import org.junit.Test;
import java.util.HashSet;
@@ -45,7 +46,7 @@ public class FailureHandlingResultTest extends TestLogger {
Set<ExecutionVertexID> tasks = new HashSet<>();
tasks.add(new ExecutionVertexID(new JobVertexID(), 0));
long delay = 1234;
- FailureHandlingResult result = FailureHandlingResult.restartable(tasks, delay);
+ FailureHandlingResult result = FailureHandlingResult.restartable(tasks, delay, false);
assertTrue(result.canRestart());
assertEquals(delay, result.getRestartDelayMS());
@@ -65,7 +66,7 @@ public class FailureHandlingResultTest extends TestLogger {
public void testRestartingSuppressedFailureHandlingResult() {
// create a FailureHandlingResult with error
Throwable error = new Exception("test error");
- FailureHandlingResult result = FailureHandlingResult.unrecoverable(error);
+ FailureHandlingResult result = FailureHandlingResult.unrecoverable(error, false);
assertFalse(result.canRestart());
assertEquals(error, result.getError());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
index bb0e68e..a3fb582 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
@@ -71,6 +71,7 @@ import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -272,6 +273,17 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
}
@Test
+ public void testLocalFailureDoesNotResetToCheckpoint() throws Exception {
+ final DefaultScheduler scheduler = createSchedulerAndDeployTasks();
+ final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);
+
+ takeCompleteCheckpoint(scheduler, coordinator, new byte[] {37, 11, 83, 4});
+ failAndRestartTask(scheduler, 0);
+
+ assertNull("coordinator should not have a restored checkpoint", coordinator.getLastRestoredCheckpointState());
+ }
+
+ @Test
public void testConfirmCheckpointComplete() throws Exception {
final DefaultScheduler scheduler = createSchedulerAndDeployTasks();
final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);