You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/16 16:27:59 UTC

[flink] 10/13: [FLINK-16357][checkpointing] Offer different methods for "global restore" and "local restore" in CheckpointCoordinator

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7ffa87b1f3c6089391d381d1a94cccf60860d0a3
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed May 13 19:57:59 2020 +0200

    [FLINK-16357][checkpointing] Offer different methods for "global restore" and "local restore" in CheckpointCoordinator
    
    Global restores are meant for initial savepoint restores, and for global failover in the scheduler.
    Global failover in the scheduler happens for example during master failover and as a safety net when
    encountering unexpected/inconsistent situations that might impact correctness.
    
    Local failovers are all common task failures and recoveries.
    
    This commit offers different methods to be called in these two situations, but does not make use of the
    local restore method yet. All calls still go to the global restore, which was the previous behavior in
    all cases.
    
    The difference in the CheckpointCoordinator between local and global restore is currently that OperatorCoordinators
    are only restored during global restores.
    
    As a side effect, this change also eliminates the "failWhenNoCheckpoint" flag outside of the CheckpointCoordinator.
    The flag is exclusively used by the "restoreSavepoint()" method which is a separate call to the Coordinator anyways.
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 62 ++++++++++++++++++----
 .../flink/runtime/scheduler/SchedulerBase.java     |  6 +--
 .../CheckpointCoordinatorMasterHooksTest.java      | 12 ++---
 .../CheckpointCoordinatorRestoringTest.java        | 10 ++--
 .../checkpoint/CheckpointCoordinatorTest.java      |  4 +-
 .../checkpoint/CheckpointStateRestoreTest.java     | 22 ++++----
 6 files changed, 76 insertions(+), 40 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index fbdf2d4..9e2a4a7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -1127,16 +1127,48 @@ public class CheckpointCoordinator {
 			boolean errorIfNoCheckpoint,
 			boolean allowNonRestoredState) throws Exception {
 
-		return restoreLatestCheckpointedState(new HashSet<>(tasks.values()), errorIfNoCheckpoint, allowNonRestoredState);
+		return restoreLatestCheckpointedStateInternal(new HashSet<>(tasks.values()), true, errorIfNoCheckpoint, allowNonRestoredState);
 	}
 
 	/**
-	 * Restores the latest checkpointed state.
+	 * Restores the latest checkpointed state to a set of subtasks. This method represents a "local"
+	 * or "regional" failover and does restore states to coordinators. Note that a regional failover
+	 * might still include all tasks.
+	 *
+	 * @param tasks Set of job vertices to restore. State for these vertices is
+	 * restored via {@link Execution#setInitialState(JobManagerTaskRestore)}.
+
+	 * @return <code>true</code> if state was restored, <code>false</code> otherwise.
+	 * @throws IllegalStateException If the CheckpointCoordinator is shut down.
+	 * @throws IllegalStateException If no completed checkpoint is available and
+	 *                               the <code>failIfNoCheckpoint</code> flag has been set.
+	 * @throws IllegalStateException If the checkpoint contains state that cannot be
+	 *                               mapped to any job vertex in <code>tasks</code> and the
+	 *                               <code>allowNonRestoredState</code> flag has not been set.
+	 * @throws IllegalStateException If the max parallelism changed for an operator
+	 *                               that restores state from this checkpoint.
+	 * @throws IllegalStateException If the parallelism changed for an operator
+	 *                               that restores <i>non-partitioned</i> state from this
+	 *                               checkpoint.
+	 */
+	public boolean restoreLatestCheckpointedStateToSubtasks(final Set<ExecutionJobVertex> tasks) throws Exception {
+		// when restoring subtasks only we accept potentially unmatched state for the
+		// following reasons
+		//   - the set frequently does not include all Job Vertices (only the ones that are part
+		//     of the restarted region), meaning there will be unmatched state by design.
+		//   - because what we might end up restoring from an original savepoint with unmatched
+		//     state, if there is was no checkpoint yet.
+		return restoreLatestCheckpointedStateInternal(tasks, false, false, true);
+	}
+
+	/**
+	 * Restores the latest checkpointed state to all tasks and all coordinators.
+	 * This method represents a "global restore"-style operation where all stateful tasks
+	 * and coordinators from the given set of Job Vertices are restored.
+	 * are restored to their latest checkpointed state.
 	 *
 	 * @param tasks Set of job vertices to restore. State for these vertices is
 	 * restored via {@link Execution#setInitialState(JobManagerTaskRestore)}.
-	 * @param errorIfNoCheckpoint Fail if no completed checkpoint is available to
-	 * restore from.
 	 * @param allowNonRestoredState Allow checkpoint state that cannot be mapped
 	 * to any job vertex in tasks.
 	 * @return <code>true</code> if state was restored, <code>false</code> otherwise.
@@ -1152,11 +1184,19 @@ public class CheckpointCoordinator {
 	 *                               that restores <i>non-partitioned</i> state from this
 	 *                               checkpoint.
 	 */
-	public boolean restoreLatestCheckpointedState(
+	public boolean restoreLatestCheckpointedStateToAll(
 			final Set<ExecutionJobVertex> tasks,
-			final boolean errorIfNoCheckpoint,
 			final boolean allowNonRestoredState) throws Exception {
 
+		return restoreLatestCheckpointedStateInternal(tasks, true, false, allowNonRestoredState);
+	}
+
+	private boolean restoreLatestCheckpointedStateInternal(
+		final Set<ExecutionJobVertex> tasks,
+		final boolean restoreCoordinators,
+		final boolean errorIfNoCheckpoint,
+		final boolean allowNonRestoredState) throws Exception {
+
 		synchronized (lock) {
 			if (shutdown) {
 				throw new IllegalStateException("CheckpointCoordinator is shut down");
@@ -1202,7 +1242,9 @@ public class CheckpointCoordinator {
 
 			stateAssignmentOperation.assignStates();
 
-			// call master hooks for restore
+			// call master hooks for restore. we currently call them also on "regional restore" because
+			// there is no other failure notification mechanism in the master hooks
+			// ultimately these should get removed anyways in favor of the operator coordinators
 
 			MasterHooks.restoreMasterHooks(
 					masterHooks,
@@ -1211,7 +1253,9 @@ public class CheckpointCoordinator {
 					allowNonRestoredState,
 					LOG);
 
-			restoreStateToCoordinators(operatorStates);
+			if (restoreCoordinators) {
+				restoreStateToCoordinators(operatorStates);
+			}
 
 			// update metrics
 
@@ -1267,7 +1311,7 @@ public class CheckpointCoordinator {
 
 		LOG.info("Reset the checkpoint ID of job {} to {}.", job, nextCheckpointId);
 
-		return restoreLatestCheckpointedState(new HashSet<>(tasks.values()), true, allowNonRestored);
+		return restoreLatestCheckpointedStateInternal(new HashSet<>(tasks.values()), true, true, allowNonRestored);
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 58b077a..d8908c2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -244,9 +244,8 @@ public abstract class SchedulerBase implements SchedulerNG {
 
 		if (checkpointCoordinator != null) {
 			// check whether we find a valid checkpoint
-			if (!checkpointCoordinator.restoreLatestCheckpointedState(
+			if (!checkpointCoordinator.restoreLatestCheckpointedStateToAll(
 				new HashSet<>(newExecutionGraph.getAllVertices().values()),
-				false,
 				false)) {
 
 				// check whether we can restore from a savepoint
@@ -331,9 +330,8 @@ public abstract class SchedulerBase implements SchedulerNG {
 			executionGraph.getCheckpointCoordinator().abortPendingCheckpoints(
 				new CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION));
 
-			executionGraph.getCheckpointCoordinator().restoreLatestCheckpointedState(
+			executionGraph.getCheckpointCoordinator().restoreLatestCheckpointedStateToAll(
 				getInvolvedExecutionJobVertices(vertices),
-				false,
 				true);
 		}
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index a935306..91c5ce8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -141,9 +141,8 @@ public class CheckpointCoordinatorMasterHooksTest {
 		cc.addMasterHook(hook2);
 
 		// initialize the hooks
-		cc.restoreLatestCheckpointedState(
+		cc.restoreLatestCheckpointedStateToAll(
 			Collections.emptySet(),
-			false,
 			false);
 		verify(hook1, times(1)).reset();
 		verify(hook2, times(1)).reset();
@@ -280,9 +279,8 @@ public class CheckpointCoordinatorMasterHooksTest {
 		cc.addMasterHook(statefulHook2);
 
 		cc.getCheckpointStore().addCheckpoint(checkpoint);
-		cc.restoreLatestCheckpointedState(
+		cc.restoreLatestCheckpointedStateToAll(
 				Collections.emptySet(),
-				true,
 				false);
 
 		verify(statefulHook1, times(1)).restoreCheckpoint(eq(checkpointId), eq(state1));
@@ -335,18 +333,16 @@ public class CheckpointCoordinatorMasterHooksTest {
 
 		// since we have unmatched state, this should fail
 		try {
-			cc.restoreLatestCheckpointedState(
+			cc.restoreLatestCheckpointedStateToAll(
 					Collections.emptySet(),
-					true,
 					false);
 			fail("exception expected");
 		}
 		catch (IllegalStateException ignored) {}
 
 		// permitting unmatched state should succeed
-		cc.restoreLatestCheckpointedState(
+		cc.restoreLatestCheckpointedStateToAll(
 				Collections.emptySet(),
-				true,
 				true);
 
 		verify(statefulHook, times(1)).restoreCheckpoint(eq(checkpointId), eq(state1));
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
index 143e6a8..c2140cc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
@@ -204,7 +204,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
 		tasks.add(jobVertex1);
 		tasks.add(jobVertex2);
 
-		coord.restoreLatestCheckpointedState(tasks, true, false);
+		assertTrue(coord.restoreLatestCheckpointedStateToAll(tasks, false));
 
 		// validate that all shared states are registered again after the recovery.
 		for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) {
@@ -333,7 +333,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
 			assertNotNull(savepointFuture.get());
 
 			//restore and jump the latest savepoint
-			coord.restoreLatestCheckpointedState(tasks, true, false);
+			assertTrue(coord.restoreLatestCheckpointedStateToAll(tasks, false));
 
 			//compare and see if it used the checkpoint's subtaskStates
 			BaseMatcher<JobManagerTaskRestore> matcher = new BaseMatcher<JobManagerTaskRestore>() {
@@ -493,7 +493,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
 
 		tasks.add(newJobVertex1);
 		tasks.add(newJobVertex2);
-		coord.restoreLatestCheckpointedState(tasks, true, false);
+		assertTrue(coord.restoreLatestCheckpointedStateToAll(tasks, false));
 
 		// verify the restored state
 		verifyStateRestore(jobVertexID1, newJobVertex1, keyGroupPartitions1);
@@ -639,7 +639,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
 		tasks.add(newJobVertex1);
 		tasks.add(newJobVertex2);
 
-		coord.restoreLatestCheckpointedState(tasks, true, false);
+		assertTrue(coord.restoreLatestCheckpointedStateToAll(tasks, false));
 
 		fail("The restoration should have failed because the max parallelism changed.");
 	}
@@ -813,7 +813,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
 				.setTimer(manuallyTriggeredScheduledExecutor)
 				.build();
 
-		coord.restoreLatestCheckpointedState(tasks, false, true);
+		coord.restoreLatestCheckpointedStateToAll(tasks, true);
 
 		for (int i = 0; i < newJobVertex1.getParallelism(); i++) {
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 23e0f3b..ac48106 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -1930,7 +1930,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
 		coord.setCheckpointStatsTracker(tracker);
 
-		assertTrue(coord.restoreLatestCheckpointedState(Collections.emptySet(), false, true));
+		assertTrue(coord.restoreLatestCheckpointedStateToAll(Collections.emptySet(), true));
 
 		verify(tracker, times(1))
 			.reportRestoredCheckpoint(any(RestoredCheckpointStats.class));
@@ -2046,7 +2046,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		// restore the store
 		Set<ExecutionJobVertex> tasks = new HashSet<>();
 		tasks.add(jobVertex1);
-		coord.restoreLatestCheckpointedState(tasks, true, false);
+		assertTrue(coord.restoreLatestCheckpointedStateToAll(tasks, false));
 
 		// validate that all shared states are registered again after the recovery.
 		cp = 0;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index ed4e516..ed04b21 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -50,6 +50,8 @@ import java.util.Objects;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -138,7 +140,7 @@ public class CheckpointStateRestoreTest {
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 
 			// let the coordinator inject the state
-			coord.restoreLatestCheckpointedState(tasks, true, false);
+			assertTrue(coord.restoreLatestCheckpointedStateToAll(tasks, false));
 
 			// verify that each stateful vertex got the state
 
@@ -177,13 +179,8 @@ public class CheckpointStateRestoreTest {
 				new CheckpointCoordinatorBuilder()
 					.build();
 
-			try {
-				coord.restoreLatestCheckpointedState(Collections.emptySet(), true, false);
-				fail("this should throw an exception");
-			}
-			catch (IllegalStateException e) {
-				// expected
-			}
+			final boolean restored = coord.restoreLatestCheckpointedStateToAll(Collections.emptySet(), false);
+			assertFalse(restored);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -245,8 +242,8 @@ public class CheckpointStateRestoreTest {
 
 		coord.getCheckpointStore().addCheckpoint(checkpoint);
 
-		coord.restoreLatestCheckpointedState(tasks, true, false);
-		coord.restoreLatestCheckpointedState(tasks, true, true);
+		assertTrue(coord.restoreLatestCheckpointedStateToAll(tasks, false));
+		assertTrue(coord.restoreLatestCheckpointedStateToAll(tasks, true));
 
 		// --- (3) JobVertex missing for task state that is part of the checkpoint ---
 		JobVertexID newJobVertexID = new JobVertexID();
@@ -273,11 +270,12 @@ public class CheckpointStateRestoreTest {
 		coord.getCheckpointStore().addCheckpoint(checkpoint);
 
 		// (i) Allow non restored state (should succeed)
-		coord.restoreLatestCheckpointedState(tasks, true, true);
+		final boolean restored = coord.restoreLatestCheckpointedStateToAll(tasks, true);
+		assertTrue(restored);
 
 		// (ii) Don't allow non restored state (should fail)
 		try {
-			coord.restoreLatestCheckpointedState(tasks, true, false);
+			coord.restoreLatestCheckpointedStateToAll(tasks, false);
 			fail("Did not throw the expected Exception.");
 		} catch (IllegalStateException ignored) {
 		}