You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/01/18 17:09:22 UTC
[05/17] flink git commit: [FLINK-5823] [checkpoints] State backends
now also handle the checkpoint metadata
[FLINK-5823] [checkpoints] State backends now also handle the checkpoint metadata
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/edc6f100
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/edc6f100
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/edc6f100
Branch: refs/heads/master
Commit: edc6f1000704a492629d7bdf8cbfa5ba5c45bb1f
Parents: d19525e
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Oct 26 21:26:00 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jan 18 18:08:03 2018 +0100
----------------------------------------------------------------------
.../streaming/state/RocksDBStateBackend.java | 26 +-
.../jobmanager/JMXJobManagerMetricTest.java | 7 +-
.../checkpoint/CheckpointCoordinator.java | 186 +++++------
.../checkpoint/CheckpointProperties.java | 105 ++----
.../checkpoint/CheckpointRetentionPolicy.java | 37 +++
.../flink/runtime/checkpoint/Checkpoints.java | 327 ++++++++++++++++++
.../runtime/checkpoint/CompletedCheckpoint.java | 85 ++---
.../runtime/checkpoint/PendingCheckpoint.java | 149 +++------
.../runtime/checkpoint/savepoint/Savepoint.java | 3 +-
.../checkpoint/savepoint/SavepointLoader.java | 159 ---------
.../checkpoint/savepoint/SavepointStore.java | 328 -------------------
.../runtime/executiongraph/ExecutionGraph.java | 8 +-
.../executiongraph/ExecutionGraphBuilder.java | 11 +-
.../CheckpointCoordinatorConfiguration.java | 27 +-
.../tasks/ExternalizedCheckpointSettings.java | 89 -----
.../checkpoints/CheckpointConfigHandler.java | 8 +-
.../checkpoints/CheckpointConfigHandler.java | 9 +-
.../flink/runtime/state/CheckpointStorage.java | 93 ++++++
.../state/CheckpointStorageLocation.java | 65 ++++
.../runtime/state/CheckpointStreamFactory.java | 27 ++
.../flink/runtime/state/StateBackend.java | 65 ++--
.../filesystem/AbstractFileStateBackend.java | 33 +-
.../filesystem/AbstractFsCheckpointStorage.java | 256 +++++++++++++++
.../filesystem/FixFileFsStateOutputStream.java | 154 +++++++++
.../state/filesystem/FsCheckpointStorage.java | 85 +++++
.../filesystem/FsCheckpointStorageLocation.java | 122 +++++++
.../state/filesystem/FsStateBackend.java | 11 +
.../memory/MemoryBackendCheckpointStorage.java | 135 ++++++++
.../state/memory/MemoryStateBackend.java | 24 +-
...istentMetadataCheckpointStorageLocation.java | 56 ++++
...istentMetadataCheckpointStorageLocation.java | 64 ++++
.../flink/runtime/jobmanager/JobManager.scala | 102 +++---
...tCoordinatorExternalizedCheckpointsTest.java | 206 ------------
.../CheckpointCoordinatorFailureTest.java | 4 +-
.../CheckpointCoordinatorMasterHooksTest.java | 20 +-
.../checkpoint/CheckpointCoordinatorTest.java | 116 +++----
.../CheckpointExternalResumeTest.java | 203 ++++++++++++
.../CheckpointMetadataLoadingTest.java | 132 ++++++++
.../checkpoint/CheckpointPropertiesTest.java | 38 +--
.../CheckpointSettingsSerializableTest.java | 28 +-
.../checkpoint/CheckpointStateRestoreTest.java | 25 +-
.../checkpoint/CheckpointStatsHistoryTest.java | 8 +-
.../checkpoint/CheckpointStatsSnapshotTest.java | 6 +-
.../checkpoint/CheckpointStatsTrackerTest.java | 28 +-
.../CompletedCheckpointStatsSummaryTest.java | 4 +-
.../CompletedCheckpointStoreTest.java | 6 +-
.../checkpoint/CompletedCheckpointTest.java | 51 +--
.../checkpoint/CoordinatorShutdownTest.java | 7 +-
...ExecutionGraphCheckpointCoordinatorTest.java | 4 +-
.../checkpoint/FailedCheckpointStatsTest.java | 5 +-
.../checkpoint/PendingCheckpointStatsTest.java | 8 +-
.../checkpoint/PendingCheckpointTest.java | 143 ++++----
.../checkpoint/RestoredCheckpointStatsTest.java | 2 +-
.../ZooKeeperCompletedCheckpointStoreTest.java | 13 +-
.../checkpoint/hooks/MasterHooksTest.java | 2 +
.../savepoint/SavepointLoaderTest.java | 124 -------
.../savepoint/SavepointStoreTest.java | 308 -----------------
.../ArchivedExecutionGraphTest.java | 13 +-
.../ExecutionGraphDeploymentTest.java | 4 +-
.../IndividualRestartsConcurrencyTest.java | 9 +-
.../tasks/JobCheckpointingSettingsTest.java | 3 +-
.../jobmanager/JobManagerHARecoveryTest.java | 4 +-
.../runtime/jobmanager/JobManagerTest.java | 16 +-
.../flink/runtime/jobmanager/JobSubmitTest.java | 5 +-
...obCancellationWithSavepointHandlersTest.java | 10 +-
.../CheckpointConfigHandlerTest.java | 31 +-
.../CheckpointStatsDetailsHandlerTest.java | 8 +-
.../checkpoints/CheckpointStatsHandlerTest.java | 11 +-
.../runtime/state/EmptyStreamStateHandle.java | 45 +++
.../AbstractFileStateBackendTest.java | 309 +++++++++++++++++
.../FixFileFsStateOutputStreamTest.java | 285 ++++++++++++++++
.../memory/MemoryCheckpointStorageTest.java | 26 ++
.../runtime/jobmanager/JobManagerITCase.scala | 12 +-
.../testingUtils/TestingJobManagerLike.scala | 29 +-
.../api/graph/StreamingJobGraphGenerator.java | 12 +-
.../tasks/StreamTaskTerminationTest.java | 18 +-
.../tasks/TaskCheckpointingBehaviourTest.java | 2 +-
.../test/checkpointing/SavepointITCase.java | 9 -
.../utils/SavepointMigrationTestBase.java | 2 -
.../streaming/runtime/StateBackendITCase.java | 21 +-
80 files changed, 3167 insertions(+), 2064 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 6bcd595..072f3a7 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -28,12 +28,14 @@ import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.ConfigurableStateBackend;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.util.AbstractID;
@@ -41,7 +43,6 @@ import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.NativeLibraryLoader;
import org.rocksdb.RocksDB;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -300,7 +301,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
private void lazyInitializeForJob(
Environment env,
- String operatorIdentifier) throws IOException {
+ @SuppressWarnings("unused") String operatorIdentifier) throws IOException {
if (isInitialized) {
return;
@@ -352,9 +353,22 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
return initializedDbBasePaths[ni];
}
+ // ------------------------------------------------------------------------
+ // Checkpoint initialization and persistent storage
+ // ------------------------------------------------------------------------
+
+ @Override
+ public StreamStateHandle resolveCheckpoint(String pointer) throws IOException {
+ return checkpointStreamBackend.resolveCheckpoint(pointer);
+ }
+
@Override
- public CheckpointStreamFactory createStreamFactory(JobID jobId,
- String operatorIdentifier) throws IOException {
+ public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException {
+ return checkpointStreamBackend.createCheckpointStorage(jobId);
+ }
+
+ @Override
+ public CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException {
return checkpointStreamBackend.createStreamFactory(jobId, operatorIdentifier);
}
@@ -367,6 +381,10 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
return checkpointStreamBackend.createSavepointStreamFactory(jobId, operatorIdentifier, targetLocation);
}
+ // ------------------------------------------------------------------------
+ // State holding data structures
+ // ------------------------------------------------------------------------
+
@Override
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
index c79e3d7..6770ec3 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -23,13 +23,13 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.jmx.JMXReporter;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
@@ -56,10 +56,9 @@ import static org.junit.Assert.assertEquals;
* Tests to verify JMX reporter functionality on the JobManager.
*/
public class JMXJobManagerMetricTest {
+
/**
* Tests that metrics registered on the JobManager are actually accessible via JMX.
- *
- * @throws Exception
*/
@Test
public void testJobManagerJMXMetricAccess() throws Exception {
@@ -89,7 +88,7 @@ public class JMXJobManagerMetricTest {
500,
50,
5,
- ExternalizedCheckpointSettings.none(),
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true),
null));
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 40fa2bd..95ca5d7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -22,10 +22,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.runtime.checkpoint.hooks.MasterHooks;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
@@ -36,12 +33,14 @@ import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SharedStateRegistryFactory;
import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
@@ -119,12 +118,7 @@ public class CheckpointCoordinator {
/** The root checkpoint state backend, which is responsible for initializing the
* checkpoint, storing the metadata, and cleaning up the checkpoint */
- private final StateBackend checkpointStateBackend;
-
- /** Default directory for persistent checkpoints; <code>null</code> if none configured.
- * THIS WILL BE REPLACED BY PROPER STATE-BACKEND METADATA WRITING */
- @Nullable
- private final String checkpointDirectory;
+ private final CheckpointStorage checkpointStorage;
/** A list of recent checkpoint IDs, to identify late messages (vs invalid ones) */
private final ArrayDeque<Long> recentPendingCheckpoints;
@@ -194,29 +188,23 @@ public class CheckpointCoordinator {
long checkpointTimeout,
long minPauseBetweenCheckpoints,
int maxConcurrentCheckpointAttempts,
- ExternalizedCheckpointSettings externalizeSettings,
+ CheckpointRetentionPolicy retentionPolicy,
ExecutionVertex[] tasksToTrigger,
ExecutionVertex[] tasksToWaitFor,
ExecutionVertex[] tasksToCommitTo,
CheckpointIDCounter checkpointIDCounter,
CompletedCheckpointStore completedCheckpointStore,
- @Nullable String checkpointDirectory,
StateBackend checkpointStateBackend,
Executor executor,
SharedStateRegistryFactory sharedStateRegistryFactory) {
// sanity checks
+ checkNotNull(checkpointStateBackend);
checkArgument(baseInterval > 0, "Checkpoint timeout must be larger than zero");
checkArgument(checkpointTimeout >= 1, "Checkpoint timeout must be larger than zero");
checkArgument(minPauseBetweenCheckpoints >= 0, "minPauseBetweenCheckpoints must be >= 0");
checkArgument(maxConcurrentCheckpointAttempts >= 1, "maxConcurrentCheckpointAttempts must be >= 1");
- if (externalizeSettings.externalizeCheckpoints() && checkpointDirectory == null) {
- throw new IllegalStateException("CheckpointConfig says to persist periodic " +
- "checkpoints, but no checkpoint directory has been configured. You can " +
- "configure configure one via key '" + CheckpointingOptions.CHECKPOINTS_DIRECTORY.key() + "'.");
- }
-
// max "in between duration" can be one year - this is to prevent numeric overflows
if (minPauseBetweenCheckpoints > 365L * 24 * 60 * 60 * 1_000) {
minPauseBetweenCheckpoints = 365L * 24 * 60 * 60 * 1_000;
@@ -239,8 +227,6 @@ public class CheckpointCoordinator {
this.pendingCheckpoints = new LinkedHashMap<>();
this.checkpointIdCounter = checkNotNull(checkpointIDCounter);
this.completedCheckpointStore = checkNotNull(completedCheckpointStore);
- this.checkpointStateBackend = checkNotNull(checkpointStateBackend);
- this.checkpointDirectory = checkpointDirectory;
this.executor = checkNotNull(executor);
this.sharedStateRegistryFactory = checkNotNull(sharedStateRegistryFactory);
this.sharedStateRegistry = sharedStateRegistryFactory.create(executor);
@@ -256,14 +242,11 @@ public class CheckpointCoordinator {
this.timer.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
this.timer.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
- if (externalizeSettings.externalizeCheckpoints()) {
- LOG.info("Persisting periodic checkpoints externally at {}.", checkpointDirectory);
- checkpointProperties = CheckpointProperties.forExternalizedCheckpoint(externalizeSettings.deleteOnCancellation());
- } else {
- checkpointProperties = CheckpointProperties.forStandardCheckpoint();
- }
+ this.checkpointProperties = CheckpointProperties.forCheckpoint(retentionPolicy);
try {
+ this.checkpointStorage = checkpointStateBackend.createCheckpointStorage(job);
+
// Make sure the checkpoint ID enumerator is running. Possibly
// issues a blocking call to ZooKeeper.
checkpointIDCounter.start();
@@ -280,7 +263,7 @@ public class CheckpointCoordinator {
* Adds the given master hook to the checkpoint coordinator. This method does nothing, if
* the checkpoint coordinator already contained a hook with the same ID (as defined via
* {@link MasterTriggerRestoreHook#getIdentifier()}).
- *
+ *
* @param hook The hook to add.
* @return True, if the hook was added, false if the checkpoint coordinator already
* contained a hook with the same ID.
@@ -366,52 +349,32 @@ public class CheckpointCoordinator {
* Triggers a savepoint with the given savepoint directory as a target.
*
* @param timestamp The timestamp for the savepoint.
- * @param targetDirectory Target directory for the savepoint.
+ * @param targetLocation Target location for the savepoint, optional. If null, the
+ * state backend's configured default will be used.
* @return A future to the completed checkpoint
* @throws IllegalStateException If no savepoint directory has been
* specified and no default savepoint directory has been
* configured
* @throws Exception Failures during triggering are forwarded
*/
- public CompletableFuture<CompletedCheckpoint> triggerSavepoint(long timestamp, String targetDirectory) throws Exception {
- checkNotNull(targetDirectory, "Savepoint target directory");
-
- CheckpointProperties props = CheckpointProperties.forStandardSavepoint();
+ public CompletableFuture<CompletedCheckpoint> triggerSavepoint(
+ long timestamp,
+ @Nullable String targetLocation) throws Exception {
- // Create the unique savepoint directory
- final String savepointDirectory = SavepointStore
- .createSavepointDirectory(targetDirectory, job);
+ CheckpointProperties props = CheckpointProperties.forSavepoint();
CheckpointTriggerResult triggerResult = triggerCheckpoint(
timestamp,
props,
- savepointDirectory,
+ targetLocation,
false);
- CompletableFuture<CompletedCheckpoint> result;
-
if (triggerResult.isSuccess()) {
- result = triggerResult.getPendingCheckpoint().getCompletionFuture();
+ return triggerResult.getPendingCheckpoint().getCompletionFuture();
} else {
Throwable cause = new Exception("Failed to trigger savepoint: " + triggerResult.getFailureReason().message());
return FutureUtils.completedExceptionally(cause);
}
-
- // Make sure to remove the created base directory on Exceptions
- result.whenCompleteAsync(
- (CompletedCheckpoint checkpoint, Throwable throwable) -> {
- if (throwable != null) {
- try {
- SavepointStore.deleteSavepointDirectory(savepointDirectory);
- } catch (Throwable t) {
- LOG.warn("Failed to delete savepoint directory " + savepointDirectory
- + " after failed savepoint.", t);
- }
- }
- },
- executor);
-
- return result;
}
/**
@@ -425,7 +388,7 @@ public class CheckpointCoordinator {
* @return <code>true</code> if triggering the checkpoint succeeded.
*/
public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) {
- return triggerCheckpoint(timestamp, checkpointProperties, checkpointDirectory, isPeriodic).isSuccess();
+ return triggerCheckpoint(timestamp, checkpointProperties, null, isPeriodic).isSuccess();
}
/**
@@ -444,7 +407,7 @@ public class CheckpointCoordinator {
case CHECKPOINT:
CheckpointTriggerResult triggerResult =
- triggerCheckpoint(timestamp, checkpointProperties, checkpointDirectory, false);
+ triggerCheckpoint(timestamp, checkpointProperties, null, false);
if (triggerResult.isSuccess()) {
return triggerResult.getPendingCheckpoint().getCompletionFuture();
@@ -462,14 +425,9 @@ public class CheckpointCoordinator {
CheckpointTriggerResult triggerCheckpoint(
long timestamp,
CheckpointProperties props,
- String targetDirectory,
+ @Nullable String externalSavepointLocation,
boolean isPeriodic) {
- // Sanity check
- if (props.externalizeCheckpoint() && targetDirectory == null) {
- throw new IllegalStateException("No target directory specified to persist checkpoint to.");
- }
-
// make some eager pre-checks
synchronized (lock) {
// abort if the coordinator has been shutdown in the meantime
@@ -557,11 +515,18 @@ public class CheckpointCoordinator {
// may issue blocking operations. Using a different lock than the coordinator-wide lock,
// we avoid blocking the processing of 'acknowledge/decline' messages during that time.
synchronized (triggerLock) {
+
+ final CheckpointStorageLocation checkpointStorageLocation;
final long checkpointID;
+
try {
// this must happen outside the coordinator-wide lock, because it communicates
// with external services (in HA mode) and may block for a while.
checkpointID = checkpointIdCounter.getAndIncrement();
+
+ checkpointStorageLocation = props.isSavepoint() ?
+ checkpointStorage.initializeLocationForSavepoint(checkpointID, externalSavepointLocation) :
+ checkpointStorage.initializeLocationForCheckpoint(checkpointID);
}
catch (Throwable t) {
int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
@@ -575,7 +540,7 @@ public class CheckpointCoordinator {
timestamp,
ackTasks,
props,
- targetDirectory,
+ checkpointStorageLocation,
executor);
if (statsTracker != null) {
@@ -588,21 +553,18 @@ public class CheckpointCoordinator {
}
// schedule the timer that will clean up the expired checkpoints
- final Runnable canceller = new Runnable() {
- @Override
- public void run() {
- synchronized (lock) {
- // only do the work if the checkpoint is not discarded anyways
- // note that checkpoint completion discards the pending checkpoint object
- if (!checkpoint.isDiscarded()) {
- LOG.info("Checkpoint " + checkpointID + " expired before completing.");
+ final Runnable canceller = () -> {
+ synchronized (lock) {
+ // only do the work if the checkpoint is not discarded anyways
+ // note that checkpoint completion discards the pending checkpoint object
+ if (!checkpoint.isDiscarded()) {
+ LOG.info("Checkpoint " + checkpointID + " expired before completing.");
- checkpoint.abortExpired();
- pendingCheckpoints.remove(checkpointID);
- rememberRecentCheckpointId(checkpointID);
+ checkpoint.abortExpired();
+ pendingCheckpoints.remove(checkpointID);
+ rememberRecentCheckpointId(checkpointID);
- triggerQueuedRequests();
- }
+ triggerQueuedRequests();
}
}
};
@@ -675,7 +637,7 @@ public class CheckpointCoordinator {
if (!props.isSavepoint()) {
checkpointOptions = CheckpointOptions.forCheckpoint();
} else {
- checkpointOptions = CheckpointOptions.forSavepoint(targetDirectory);
+ checkpointOptions = CheckpointOptions.forSavepoint(checkpointStorageLocation.getLocationAsPointer());
}
// send the messages to the tasks that trigger their checkpoint
@@ -699,6 +661,14 @@ public class CheckpointCoordinator {
if (!checkpoint.isDiscarded()) {
checkpoint.abortError(new Exception("Failed to trigger checkpoint", t));
}
+
+ try {
+ checkpointStorageLocation.disposeOnFailure();
+ }
+ catch (Throwable t2) {
+ LOG.warn("Cannot dispose failed checkpoint storage location {}", checkpointStorageLocation, t2);
+ }
+
return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
}
@@ -718,7 +688,7 @@ public class CheckpointCoordinator {
throw new IllegalArgumentException("Received DeclineCheckpoint message for job " +
message.getJob() + " while this coordinator handles job " + job);
}
-
+
final long checkpointId = message.getCheckpointId();
final String reason = (message.getReason() != null ? message.getReason().getMessage() : "");
@@ -778,7 +748,7 @@ public class CheckpointCoordinator {
}
final long checkpointId = message.getCheckpointId();
-
+
synchronized (lock) {
// we need to check inside the lock for being shutdown as well, otherwise we
// get races and invalid error log messages
@@ -854,7 +824,7 @@ public class CheckpointCoordinator {
/**
* Try to complete the given pending checkpoint.
*
- * Important: This method should only be called in the checkpoint lock scope.
+ * <p>Important: This method should only be called in the checkpoint lock scope.
*
* @param pendingCheckpoint to complete
* @throws CheckpointException if the completion failed
@@ -869,13 +839,9 @@ public class CheckpointCoordinator {
try {
try {
- // externalize the checkpoint if required
- if (pendingCheckpoint.getProps().externalizeCheckpoint()) {
- completedCheckpoint = pendingCheckpoint.finalizeCheckpointExternalized();
- } else {
- completedCheckpoint = pendingCheckpoint.finalizeCheckpointNonExternalized();
- }
- } catch (Exception e1) {
+ completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();
+ }
+ catch (Exception e1) {
// abort the current pending checkpoint if we fails to finalize the pending checkpoint.
if (!pendingCheckpoint.isDiscarded()) {
pendingCheckpoint.abortError(e1);
@@ -1122,38 +1088,40 @@ public class CheckpointCoordinator {
}
/**
- * Restore the state with given savepoint
- *
- * @param savepointPath Location of the savepoint
- * @param allowNonRestored True if allowing checkpoint state that cannot be
+ * Restore the state with given savepoint.
+ *
+ * @param savepointPointer The pointer to the savepoint.
+ * @param allowNonRestored True if allowing checkpoint state that cannot be
* mapped to any job vertex in tasks.
- * @param tasks Map of job vertices to restore. State for these
- * vertices is restored via
+ * @param tasks Map of job vertices to restore. State for these
+ * vertices is restored via
* {@link Execution#setInitialState(TaskStateSnapshot)}.
- * @param userClassLoader The class loader to resolve serialized classes in
- * legacy savepoint versions.
+ * @param userClassLoader The class loader to resolve serialized classes in
+ * legacy savepoint versions.
*/
public boolean restoreSavepoint(
- String savepointPath,
+ String savepointPointer,
boolean allowNonRestored,
Map<JobVertexID, ExecutionJobVertex> tasks,
ClassLoader userClassLoader) throws Exception {
-
- Preconditions.checkNotNull(savepointPath, "The savepoint path cannot be null.");
-
- LOG.info("Starting job from savepoint {} ({})",
- savepointPath, (allowNonRestored ? "allowing non restored state" : ""));
+
+ Preconditions.checkNotNull(savepointPointer, "The savepoint path cannot be null.");
+
+ LOG.info("Starting job from savepoint {} ({})",
+ savepointPointer, (allowNonRestored ? "allowing non restored state" : ""));
+
+ final StreamStateHandle metadataHandle = checkpointStorage.resolveCheckpoint(savepointPointer);
// Load the savepoint as a checkpoint into the system
- CompletedCheckpoint savepoint = SavepointLoader.loadAndValidateSavepoint(
- job, tasks, savepointPath, userClassLoader, allowNonRestored);
+ CompletedCheckpoint savepoint = Checkpoints.loadAndValidateCheckpoint(
+ job, tasks, savepointPointer, metadataHandle, userClassLoader, allowNonRestored);
completedCheckpointStore.addCheckpoint(savepoint);
-
+
// Reset the checkpoint ID counter
long nextCheckpointId = savepoint.getCheckpointID() + 1;
checkpointIdCounter.setCount(nextCheckpointId);
-
+
LOG.info("Reset the checkpoint ID to {}.", nextCheckpointId);
return restoreLatestCheckpointedState(tasks, true, allowNonRestored);
@@ -1185,6 +1153,10 @@ public class CheckpointCoordinator {
}
}
+ public CheckpointStorage getCheckpointStorage() {
+ return checkpointStorage;
+ }
+
public CompletedCheckpointStore getCheckpointStore() {
return completedCheckpointStore;
}
@@ -1221,7 +1193,7 @@ public class CheckpointCoordinator {
periodicScheduling = true;
currentPeriodicTrigger = timer.scheduleAtFixedRate(
- new ScheduledTrigger(),
+ new ScheduledTrigger(),
baseInterval, baseInterval, TimeUnit.MILLISECONDS);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
index 1233b6e..8d6346c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
@@ -34,11 +34,10 @@ import java.io.Serializable;
*/
public class CheckpointProperties implements Serializable {
- private static final long serialVersionUID = -8835900655844879469L;
+ private static final long serialVersionUID = -8835900655844879470L;
private final boolean forced;
- private final boolean externalize;
private final boolean savepoint;
private final boolean discardSubsumed;
@@ -49,7 +48,6 @@ public class CheckpointProperties implements Serializable {
CheckpointProperties(
boolean forced,
- boolean externalize,
boolean savepoint,
boolean discardSubsumed,
boolean discardFinished,
@@ -58,20 +56,12 @@ public class CheckpointProperties implements Serializable {
boolean discardSuspended) {
this.forced = forced;
- this.externalize = externalize;
this.savepoint = savepoint;
this.discardSubsumed = discardSubsumed;
this.discardFinished = discardFinished;
this.discardCancelled = discardCancelled;
this.discardFailed = discardFailed;
this.discardSuspended = discardSuspended;
-
- // Not persisted, but needs manual clean up
- if (!externalize && !(discardSubsumed && discardFinished && discardCancelled
- && discardFailed && discardSuspended)) {
- throw new IllegalStateException("CheckpointProperties say to *not* persist the " +
- "checkpoint, but the checkpoint requires manual cleanup.");
- }
}
// ------------------------------------------------------------------------
@@ -93,18 +83,6 @@ public class CheckpointProperties implements Serializable {
return forced;
}
- /**
- * Returns whether the checkpoint should be persisted externally.
- *
- * @return <code>true</code> if the checkpoint should be persisted
- * externally; <code>false</code> otherwise.
- *
- * @see PendingCheckpoint
- */
- boolean externalizeCheckpoint() {
- return externalize;
- }
-
// ------------------------------------------------------------------------
// Garbage collection behaviour
// ------------------------------------------------------------------------
@@ -203,7 +181,6 @@ public class CheckpointProperties implements Serializable {
CheckpointProperties that = (CheckpointProperties) o;
return forced == that.forced &&
- externalize == that.externalize &&
savepoint == that.savepoint &&
discardSubsumed == that.discardSubsumed &&
discardFinished == that.discardFinished &&
@@ -215,7 +192,6 @@ public class CheckpointProperties implements Serializable {
@Override
public int hashCode() {
int result = (forced ? 1 : 0);
- result = 31 * result + (externalize ? 1 : 0);
result = 31 * result + (savepoint ? 1 : 0);
result = 31 * result + (discardSubsumed ? 1 : 0);
result = 31 * result + (discardFinished ? 1 : 0);
@@ -229,7 +205,6 @@ public class CheckpointProperties implements Serializable {
public String toString() {
return "CheckpointProperties{" +
"forced=" + forced +
- ", externalized=" + externalizeCheckpoint() +
", savepoint=" + savepoint +
", discardSubsumed=" + discardSubsumed +
", discardFinished=" + discardFinished +
@@ -241,8 +216,7 @@ public class CheckpointProperties implements Serializable {
// ------------------------------------------------------------------------
- private static final CheckpointProperties STANDARD_SAVEPOINT = new CheckpointProperties(
- true,
+ private static final CheckpointProperties SAVEPOINT = new CheckpointProperties(
true,
true,
false,
@@ -251,77 +225,66 @@ public class CheckpointProperties implements Serializable {
false,
false);
- private static final CheckpointProperties STANDARD_CHECKPOINT = new CheckpointProperties(
- false,
+ private static final CheckpointProperties CHECKPOINT_NEVER_RETAINED = new CheckpointProperties(
false,
false,
true,
- true,
- true,
- true,
- true);
+ true, // Delete on success
+ true, // Delete on cancellation
+ true, // Delete on failure
+ true); // Delete on suspension
- private static final CheckpointProperties EXTERNALIZED_CHECKPOINT_RETAINED = new CheckpointProperties(
+ private static final CheckpointProperties CHECKPOINT_RETAINED_ON_FAILURE = new CheckpointProperties(
false,
- true,
false,
true,
- true,
- false, // Retain on cancellation
- false,
- false); // Retain on suspension
+ true, // Delete on success
+ true, // Delete on cancellation
+ false, // Retain on failure
+ true); // Delete on suspension
- private static final CheckpointProperties EXTERNALIZED_CHECKPOINT_DELETED = new CheckpointProperties(
+ private static final CheckpointProperties CHECKPOINT_RETAINED_ON_CANCELLATION = new CheckpointProperties(
false,
- true,
false,
true,
- true,
- true, // Delete on cancellation
- false,
- true); // Delete on suspension
+ true, // Delete on success
+ false, // Retain on cancellation
+ false, // Retain on failure
+ false); // Retain on suspension
+
/**
* Creates the checkpoint properties for a (manually triggered) savepoint.
*
- * <p>Savepoints are forced and persisted externally. They have to be
+ * <p>Savepoints are not queued due to time trigger limits. They have to be
* garbage collected manually.
*
* @return Checkpoint properties for a (manually triggered) savepoint.
*/
- public static CheckpointProperties forStandardSavepoint() {
- return STANDARD_SAVEPOINT;
- }
-
- /**
- * Creates the checkpoint properties for a regular checkpoint.
- *
- * <p>Regular checkpoints are not forced and not persisted externally. They
- * are garbage collected automatically.
- *
- * @return Checkpoint properties for a regular checkpoint.
- */
- public static CheckpointProperties forStandardCheckpoint() {
- return STANDARD_CHECKPOINT;
+ public static CheckpointProperties forSavepoint() {
+ return SAVEPOINT;
}
/**
- * Creates the checkpoint properties for an external checkpoint.
+ * Creates the checkpoint properties for a checkpoint.
*
- * <p>External checkpoints are not forced, but persisted externally. They
- * are garbage collected automatically, except when the owning job
+ * <p>Checkpoints may be queued in case too many other checkpoints are currently happening.
+ * They are garbage collected automatically, except when the owning job
* terminates in state {@link JobStatus#FAILED}. The user is required to
* configure the clean up behaviour on job cancellation.
*
- * @param deleteOnCancellation Flag indicating whether to discard on cancellation.
- *
* @return Checkpoint properties for an external checkpoint.
*/
- public static CheckpointProperties forExternalizedCheckpoint(boolean deleteOnCancellation) {
- if (deleteOnCancellation) {
- return EXTERNALIZED_CHECKPOINT_DELETED;
- } else {
- return EXTERNALIZED_CHECKPOINT_RETAINED;
+ public static CheckpointProperties forCheckpoint(CheckpointRetentionPolicy policy) {
+ switch (policy) {
+ case NEVER_RETAIN_AFTER_TERMINATION:
+ return CHECKPOINT_NEVER_RETAINED;
+ case RETAIN_ON_FAILURE:
+ return CHECKPOINT_RETAINED_ON_FAILURE;
+ case RETAIN_ON_CANCELLATION:
+ return CHECKPOINT_RETAINED_ON_CANCELLATION;
+ default:
+ throw new IllegalArgumentException("unknown policy: " + policy);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRetentionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRetentionPolicy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRetentionPolicy.java
new file mode 100644
index 0000000..3bd124d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRetentionPolicy.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Policy for whether checkpoints are retained after a job terminates.
+ */
+@Internal
+public enum CheckpointRetentionPolicy {
+
+ /** Checkpoints should be retained on cancellation and failure. */
+ RETAIN_ON_CANCELLATION,
+
+ /** Checkpoints should be retained on failure, but not on cancellation. */
+ RETAIN_ON_FAILURE,
+
+ /** Checkpoints should always be cleaned up when an application reaches a terminal state. */
+ NEVER_RETAIN_AFTER_TERMINATION;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
new file mode 100644
index 0000000..bfa7d45
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializer;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A utility class with the methods to write/load/dispose the checkpoint and savepoint metadata.
+ *
+ * <p>Stored checkpoint metadata files have the following format:
+ * <pre>[MagicNumber (int) | Format Version (int) | Checkpoint Metadata (variable)]</pre>
+ *
+ * <p>The actual savepoint serialization is version-specific via the {@link SavepointSerializer}.
+ */
+public class Checkpoints {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Checkpoints.class);
+
+ /** Magic number at the beginning of every checkpoint metadata file, for sanity checks. */
+ public static final int HEADER_MAGIC_NUMBER = 0x4960672d;
+
+ // ------------------------------------------------------------------------
+ // Writing out checkpoint metadata
+ // ------------------------------------------------------------------------
+
+ public static <T extends Savepoint> void storeCheckpointMetadata(
+ T checkpointMetadata,
+ OutputStream out) throws IOException {
+
+ DataOutputStream dos = new DataOutputStream(out);
+ storeCheckpointMetadata(checkpointMetadata, dos);
+ }
+
+ public static <T extends Savepoint> void storeCheckpointMetadata(
+ T checkpointMetadata,
+ DataOutputStream out) throws IOException {
+
+ // write generic header
+ out.writeInt(HEADER_MAGIC_NUMBER);
+ out.writeInt(checkpointMetadata.getVersion());
+
+ // write checkpoint metadata
+ SavepointSerializer<T> serializer = SavepointSerializers.getSerializer(checkpointMetadata);
+ serializer.serialize(checkpointMetadata, out);
+ }
+
+ // ------------------------------------------------------------------------
+ // Reading and validating checkpoint metadata
+ // ------------------------------------------------------------------------
+
+ public static Savepoint loadCheckpointMetadata(DataInputStream in, ClassLoader classLoader) throws IOException {
+ checkNotNull(in, "input stream");
+ checkNotNull(classLoader, "classLoader");
+
+ final int magicNumber = in.readInt();
+
+ if (magicNumber == HEADER_MAGIC_NUMBER) {
+ final int version = in.readInt();
+ final SavepointSerializer<?> serializer = SavepointSerializers.getSerializer(version);
+
+ if (serializer != null) {
+ return serializer.deserialize(in, classLoader);
+ }
+ else {
+ throw new IOException("Unrecognized checkpoint version number: " + version);
+ }
+ }
+ else {
+ throw new IOException("Unexpected magic number. This can have multiple reasons: " +
+ "(1) You are trying to load a Flink 1.0 savepoint, which is not supported by this " +
+ "version of Flink. (2) The file you were pointing to is not a savepoint at all. " +
+ "(3) The savepoint file has been corrupted.");
+ }
+ }
+
+ @SuppressWarnings("deprecation")
+ public static CompletedCheckpoint loadAndValidateCheckpoint(
+ JobID jobId,
+ Map<JobVertexID, ExecutionJobVertex> tasks,
+ String checkpointPointer,
+ StreamStateHandle metadataHandle,
+ ClassLoader classLoader,
+ boolean allowNonRestoredState) throws IOException {
+
+ checkNotNull(jobId, "jobId");
+ checkNotNull(tasks, "tasks");
+ checkNotNull(checkpointPointer, "checkpointPointer");
+ checkNotNull(metadataHandle, "metadataHandle");
+ checkNotNull(classLoader, "classLoader");
+
+ // (1) load the savepoint
+ final Savepoint rawCheckpointMetadata;
+ try (FSDataInputStream in = metadataHandle.openInputStream()) {
+ DataInputStream dis = new DataInputStream(in);
+ rawCheckpointMetadata = loadCheckpointMetadata(dis, classLoader);
+ }
+
+ final Savepoint checkpointMetadata = rawCheckpointMetadata.getTaskStates() == null ?
+ rawCheckpointMetadata :
+ SavepointV2.convertToOperatorStateSavepointV2(tasks, rawCheckpointMetadata);
+
+ // generate mapping from operator to task
+ Map<OperatorID, ExecutionJobVertex> operatorToJobVertexMapping = new HashMap<>();
+ for (ExecutionJobVertex task : tasks.values()) {
+ for (OperatorID operatorID : task.getOperatorIDs()) {
+ operatorToJobVertexMapping.put(operatorID, task);
+ }
+ }
+
+ // (2) validate it (parallelism, etc)
+ boolean expandedToLegacyIds = false;
+
+ HashMap<OperatorID, OperatorState> operatorStates = new HashMap<>(checkpointMetadata.getOperatorStates().size());
+ for (OperatorState operatorState : checkpointMetadata.getOperatorStates()) {
+
+ ExecutionJobVertex executionJobVertex = operatorToJobVertexMapping.get(operatorState.getOperatorID());
+
+ // on the first time we can not find the execution job vertex for an id, we also consider alternative ids,
+ // for example as generated from older flink versions, to provide backwards compatibility.
+ if (executionJobVertex == null && !expandedToLegacyIds) {
+ operatorToJobVertexMapping = ExecutionJobVertex.includeAlternativeOperatorIDs(operatorToJobVertexMapping);
+ executionJobVertex = operatorToJobVertexMapping.get(operatorState.getOperatorID());
+ expandedToLegacyIds = true;
+ LOG.info("Could not find ExecutionJobVertex. Including user-defined OperatorIDs in search.");
+ }
+
+ if (executionJobVertex != null) {
+
+ if (executionJobVertex.getMaxParallelism() == operatorState.getMaxParallelism()
+ || !executionJobVertex.isMaxParallelismConfigured()) {
+ operatorStates.put(operatorState.getOperatorID(), operatorState);
+ } else {
+ String msg = String.format("Failed to rollback to checkpoint/savepoint %s. " +
+ "Max parallelism mismatch between checkpoint/savepoint state and new program. " +
+ "Cannot map operator %s with max parallelism %d to new program with " +
+ "max parallelism %d. This indicates that the program has been changed " +
+ "in a non-compatible way after the checkpoint/savepoint.",
+ checkpointMetadata,
+ operatorState.getOperatorID(),
+ operatorState.getMaxParallelism(),
+ executionJobVertex.getMaxParallelism());
+
+ throw new IllegalStateException(msg);
+ }
+ } else if (allowNonRestoredState) {
+ LOG.info("Skipping savepoint state for operator {}.", operatorState.getOperatorID());
+ } else {
+ for (OperatorSubtaskState operatorSubtaskState : operatorState.getStates()) {
+ if (operatorSubtaskState.hasState()) {
+ String msg = String.format("Failed to rollback to checkpoint/savepoint %s. " +
+ "Cannot map checkpoint/savepoint state for operator %s to the new program, " +
+ "because the operator is not available in the new program. If " +
+ "you want to allow to skip this, you can set the --allowNonRestoredState " +
+ "option on the CLI.",
+ checkpointPointer, operatorState.getOperatorID());
+
+ throw new IllegalStateException(msg);
+ }
+ }
+
+ LOG.info("Skipping empty savepoint state for operator {}.", operatorState.getOperatorID());
+ }
+ }
+
+ // (3) convert to checkpoint so the system can fall back to it
+ CheckpointProperties props = CheckpointProperties.forSavepoint();
+
+ return new CompletedCheckpoint(
+ jobId,
+ checkpointMetadata.getCheckpointId(),
+ 0L,
+ 0L,
+ operatorStates,
+ checkpointMetadata.getMasterStates(),
+ props,
+ metadataHandle,
+ checkpointPointer);
+ }
+
+ // ------------------------------------------------------------------------
+ // Savepoint Disposal Hooks
+ // ------------------------------------------------------------------------
+
+ public static void disposeSavepoint(
+ String pointer,
+ StateBackend stateBackend,
+ ClassLoader classLoader) throws IOException, FlinkException {
+
+ checkNotNull(pointer, "location");
+ checkNotNull(stateBackend, "stateBackend");
+ checkNotNull(classLoader, "classLoader");
+
+ final StreamStateHandle metadataHandle = stateBackend.resolveCheckpoint(pointer);
+
+ // load the savepoint object (the metadata) to have all the state handles that we need
+ // to dispose of all state
+ final Savepoint savepoint;
+ try (FSDataInputStream in = metadataHandle.openInputStream();
+ DataInputStream dis = new DataInputStream(in)) {
+
+ savepoint = loadCheckpointMetadata(dis, classLoader);
+ }
+
+ Exception exception = null;
+
+ // first dispose the savepoint metadata, so that the savepoint is not
+ // addressable any more even if the following disposal fails
+ try {
+ metadataHandle.discardState();
+ }
+ catch (Exception e) {
+ exception = e;
+ }
+
+ // now dispose the savepoint data
+ try {
+ savepoint.dispose();
+ }
+ catch (Exception e) {
+ exception = ExceptionUtils.firstOrSuppressed(e, exception);
+ }
+
+ // until we have the proper hooks to delete full directories via the checkpoint storage,
+ // we need to have a special case here to remove the empty directory
+ if (stateBackend instanceof AbstractFileStateBackend && metadataHandle instanceof FileStateHandle) {
+ Path dir = ((FileStateHandle) metadataHandle).getFilePath().getParent();
+ FileUtils.deletePathIfEmpty(dir.getFileSystem(), dir);
+ }
+
+ if (exception != null) {
+ ExceptionUtils.rethrowIOException(exception);
+ }
+ }
+
+ public static void disposeSavepoint(
+ String pointer,
+ Configuration configuration,
+ ClassLoader classLoader,
+ @Nullable Logger logger) throws IOException, FlinkException {
+
+ checkNotNull(pointer, "location");
+ checkNotNull(configuration, "configuration");
+ checkNotNull(classLoader, "classLoader");
+
+ if (logger != null) {
+ logger.info("Attempting to load configured state backend for savepoint disposal");
+ }
+
+ StateBackend backend = null;
+ try {
+ backend = StateBackendLoader.loadStateBackendFromConfig(configuration, classLoader, null);
+
+ if (backend == null && logger != null) {
+ logger.info("No state backend configured, attempting to dispose savepoint " +
+ "with default backend (file system based)");
+ }
+ }
+ catch (Throwable t) {
+ // catches exceptions and errors (like linking errors)
+ if (logger != null) {
+ logger.info("Could not load configured state backend.");
+ logger.debug("Detailed exception:", t);
+ }
+ }
+
+ if (backend == null) {
+ // We use the memory state backend by default. The MemoryStateBackend is actually
+ // FileSystem-based for metadata
+ backend = new MemoryStateBackend();
+ }
+
+ disposeSavepoint(pointer, backend, classLoader);
+ }
+
+ // ------------------------------------------------------------------------
+
+ /** This class contains only static utility methods and is not meant to be instantiated. */
+ private Checkpoints() {}
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index d6d0827..801232c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -25,10 +25,12 @@ import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.ExceptionUtils;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
@@ -44,32 +46,22 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* and that is considered successful. The CompletedCheckpoint class contains all the metadata of the
* checkpoint, i.e., checkpoint ID, timestamps, and the handles to all states that are part of the
* checkpoint.
- *
+ *
* <h2>Size the CompletedCheckpoint Instances</h2>
- *
- * In most cases, the CompletedCheckpoint objects are very small, because the handles to the checkpoint
+ *
+ * <p>In most cases, the CompletedCheckpoint objects are very small, because the handles to the checkpoint
* states are only pointers (such as file paths). However, the some state backend implementations may
* choose to store some payload data directly with the metadata (for example to avoid many small files).
* If those thresholds are increased to large values, the memory consumption of the CompletedCheckpoint
* objects can be significant.
- *
- * <h2>Externalized Metadata</h2>
- *
- * The metadata of the CompletedCheckpoint is optionally also persisted in an external storage
- * system. In that case, the checkpoint is called <i>externalized</i>.
- *
- * <p>Externalized checkpoints have an external pointer, which points to the metadata. For example
- * when externalizing to a file system, that pointer is the file path to the checkpoint's folder
+ *
+ * <h2>Metadata Persistence</h2>
+ *
+ * <p>The metadata of the CompletedCheckpoint is also persisted in an external storage
+ * system. Checkpoints have an external pointer, which points to the metadata. For example
+ * when storing a checkpoint in a file system, that pointer is the file path to the checkpoint's folder
* or the metadata file. For a state backend that stores metadata in database tables, the pointer
* could be the table name and row key. The pointer is encoded as a String.
- *
- * <h2>Externalized Metadata and High-availability</h2>
- *
- * For high availability setups, the checkpoint metadata must be stored persistent and available
- * as well. The high-availability services that stores the checkpoint ground-truth (meaning what are
- * the latest completed checkpoints in what order) often rely on checkpoints being externalized. That
- * way, those services only store pointers to the externalized metadata, rather than the complete
- * metadata itself (for example ZooKeeper's ZNode payload should ideally be less than megabytes).
*/
public class CompletedCheckpoint implements Serializable {
@@ -79,10 +71,10 @@ public class CompletedCheckpoint implements Serializable {
// ------------------------------------------------------------------------
- /** The ID of the job that the checkpoint belongs to */
+ /** The ID of the job that the checkpoint belongs to. */
private final JobID job;
- /** The ID (logical timestamp) of the checkpoint */
+ /** The ID (logical timestamp) of the checkpoint. */
private final long checkpointID;
/** The timestamp when the checkpoint was triggered. */
@@ -91,21 +83,19 @@ public class CompletedCheckpoint implements Serializable {
/** The duration of the checkpoint (completion timestamp - trigger timestamp). */
private final long duration;
- /** States of the different operator groups belonging to this checkpoint */
+ /** States of the different operator groups belonging to this checkpoint. */
private final Map<OperatorID, OperatorState> operatorStates;
/** Properties for this checkpoint. */
private final CheckpointProperties props;
- /** States that were created by a hook on the master (in the checkpoint coordinator) */
+ /** States that were created by a hook on the master (in the checkpoint coordinator). */
private final Collection<MasterState> masterHookStates;
- /** The state handle to the externalized meta data, if the metadata has been externalized */
- @Nullable
- private final StreamStateHandle externalizedMetadata;
+ /** The state handle to the externalized meta data. */
+ private final StreamStateHandle metadataHandle;
- /** External pointer to the completed checkpoint (for example file path) if externalized; null otherwise. */
- @Nullable
+ /** External pointer to the completed checkpoint (for example file path). */
private final String externalPointer;
/** Optional stats tracker callback for discard. */
@@ -122,19 +112,13 @@ public class CompletedCheckpoint implements Serializable {
Map<OperatorID, OperatorState> operatorStates,
@Nullable Collection<MasterState> masterHookStates,
CheckpointProperties props,
- @Nullable StreamStateHandle externalizedMetadata,
- @Nullable String externalPointer) {
+ StreamStateHandle metadataHandle,
+ String externalPointer) {
checkArgument(checkpointID >= 0);
checkArgument(timestamp >= 0);
checkArgument(completionTimestamp >= 0);
- checkArgument((externalPointer == null) == (externalizedMetadata == null),
- "external pointer without externalized metadata must be both null or both non-null");
-
- checkArgument(!props.externalizeCheckpoint() || externalPointer != null,
- "Checkpoint properties require externalized checkpoint, but checkpoint is not externalized");
-
this.job = checkNotNull(job);
this.checkpointID = checkpointID;
this.timestamp = timestamp;
@@ -148,8 +132,8 @@ public class CompletedCheckpoint implements Serializable {
new ArrayList<>(masterHookStates);
this.props = checkNotNull(props);
- this.externalizedMetadata = externalizedMetadata;
- this.externalPointer = externalPointer;
+ this.metadataHandle = checkNotNull(metadataHandle);
+ this.externalPointer = checkNotNull(externalPointer);
}
// ------------------------------------------------------------------------
@@ -216,12 +200,10 @@ public class CompletedCheckpoint implements Serializable {
Exception exception = null;
// drop the metadata, if we have some
- if (externalizedMetadata != null) {
- try {
- externalizedMetadata.discardState();
- } catch (Exception e) {
- exception = e;
- }
+ try {
+ metadataHandle.discardState();
+ } catch (Exception e) {
+ exception = e;
}
// discard private state objects
@@ -263,16 +245,10 @@ public class CompletedCheckpoint implements Serializable {
return Collections.unmodifiableCollection(masterHookStates);
}
- public boolean isExternalized() {
- return externalizedMetadata != null;
+ public StreamStateHandle getMetadataHandle() {
+ return metadataHandle;
}
- @Nullable
- public StreamStateHandle getExternalizedMetadata() {
- return externalizedMetadata;
- }
-
- @Nullable
public String getExternalPointer() {
return externalPointer;
}
@@ -314,10 +290,7 @@ public class CompletedCheckpoint implements Serializable {
CompletedCheckpoint that = (CompletedCheckpoint) o;
- if (checkpointID != that.checkpointID) {
- return false;
- }
- return job.equals(that.job);
+ return checkpointID == that.checkpointID && job.equals(that.job);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index a9b6d4d..5e8559d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -20,14 +20,14 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
+import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
@@ -35,7 +35,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
import java.io.IOException;
import java.util.ArrayList;
@@ -56,7 +55,7 @@ import static org.apache.flink.util.Preconditions.checkState;
* A pending checkpoint is a checkpoint that has been started, but has not been
* acknowledged by all tasks that need to acknowledge it. Once all tasks have
* acknowledged it, it becomes a {@link CompletedCheckpoint}.
- *
+ *
* <p>Note that the pending checkpoint, as well as the successful checkpoint keep the
* state handles always as serialized values, never as actual values.
*/
@@ -74,7 +73,7 @@ public class PendingCheckpoint {
// ------------------------------------------------------------------------
- /** The PendingCheckpoint logs to the same logger as the CheckpointCoordinator */
+ /** The PendingCheckpoint logs to the same logger as the CheckpointCoordinator. */
private static final Logger LOG = LoggerFactory.getLogger(CheckpointCoordinator.class);
private final Object lock = new Object();
@@ -91,20 +90,19 @@ public class PendingCheckpoint {
private final List<MasterState> masterState;
- /** Set of acknowledged tasks */
+ /** Set of acknowledged tasks. */
private final Set<ExecutionAttemptID> acknowledgedTasks;
- /** The checkpoint properties. If the checkpoint should be persisted
- * externally, it happens in {@link #finalizeCheckpointExternalized()}. */
+ /** The checkpoint properties. */
private final CheckpointProperties props;
- /** Target directory to potentially persist checkpoint to; <code>null</code> if none configured. */
- private final String targetDirectory;
+ /** Target storage location to persist the checkpoint metadata to. */
+ private final CheckpointStorageLocation targetLocation;
/** The promise to fulfill once the checkpoint has been completed. */
private final CompletableFuture<CompletedCheckpoint> onCompletionPromise;
- /** The executor for potentially blocking I/O operations, like state disposal */
+ /** The executor for potentially blocking I/O operations, like state disposal. */
private final Executor executor;
private int numAcknowledgedTasks;
@@ -125,14 +123,9 @@ public class PendingCheckpoint {
long checkpointTimestamp,
Map<ExecutionAttemptID, ExecutionVertex> verticesToConfirm,
CheckpointProperties props,
- String targetDirectory,
+ CheckpointStorageLocation targetLocation,
Executor executor) {
- // Sanity check
- if (props.externalizeCheckpoint() && targetDirectory == null) {
- throw new NullPointerException("No target directory specified to persist checkpoint to.");
- }
-
checkArgument(verticesToConfirm.size() > 0,
"Checkpoint needs at least one vertex that commits the checkpoint");
@@ -141,7 +134,7 @@ public class PendingCheckpoint {
this.checkpointTimestamp = checkpointTimestamp;
this.notYetAcknowledgedTasks = checkNotNull(verticesToConfirm);
this.props = checkNotNull(props);
- this.targetDirectory = targetDirectory;
+ this.targetLocation = checkNotNull(targetLocation);
this.executor = Preconditions.checkNotNull(executor);
this.operatorStates = new HashMap<>();
@@ -195,7 +188,7 @@ public class PendingCheckpoint {
/**
* Checks whether this checkpoint can be subsumed or whether it should always continue, regardless
* of newer checkpoints in progress.
- *
+ *
* @return True if the checkpoint can be subsumed, false otherwise.
*/
public boolean canBeSubsumed() {
@@ -207,10 +200,6 @@ public class PendingCheckpoint {
return props;
}
- String getTargetDirectory() {
- return targetDirectory;
- }
-
/**
* Sets the callback for tracking this pending checkpoint.
*
@@ -223,7 +212,7 @@ public class PendingCheckpoint {
/**
* Sets the handle for the canceller to this pending checkpoint. This method fails
* with an exception if a handle has already been set.
- *
+ *
* @return true, if the handle was set, false, if the checkpoint is already disposed;
*/
public boolean setCancellerHandle(ScheduledFuture<?> cancellerHandle) {
@@ -255,103 +244,60 @@ public class PendingCheckpoint {
return onCompletionPromise;
}
- public CompletedCheckpoint finalizeCheckpointExternalized() throws IOException {
+ public CompletedCheckpoint finalizeCheckpoint() throws IOException {
synchronized (lock) {
checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet.");
// make sure we fulfill the promise with an exception if something fails
try {
- // externalize the metadata
+ // write out the metadata
final Savepoint savepoint = new SavepointV2(checkpointId, operatorStates.values(), masterState);
+ final StreamStateHandle metadataHandle;
- // TEMP FIX - The savepoint store is strictly typed to file systems currently
- // but the checkpoints think more generic. we need to work with file handles
- // here until the savepoint serializer accepts a generic stream factory
-
- // We have this branch here, because savepoints and externalized checkpoints
- // currently behave differently.
- // Savepoints:
- // - Metadata file in unique directory
- // - External pointer can be the directory
- // Externalized checkpoints:
- // - Multiple metadata files per directory possible (need to be unique)
- // - External pointer needs to be the file itself
- //
- // This should be unified as part of the JobManager metadata stream factories.
- if (props.isSavepoint()) {
- final FileStateHandle metadataHandle = SavepointStore.storeSavepointToHandle(targetDirectory, savepoint);
- final String externalPointer = metadataHandle.getFilePath().getParent().toString();
-
- return finalizeInternal(metadataHandle, externalPointer);
- } else {
- final FileStateHandle metadataHandle = SavepointStore.storeExternalizedCheckpointToHandle(targetDirectory, savepoint);
- final String externalPointer = metadataHandle.getFilePath().toString();
+ try (CheckpointStateOutputStream out = targetLocation.createMetadataOutputStream()) {
+ Checkpoints.storeCheckpointMetadata(savepoint, out);
+ metadataHandle = out.closeAndGetHandle();
+ }
- return finalizeInternal(metadataHandle, externalPointer);
+ final String externalPointer = targetLocation.markCheckpointAsFinished();
+
+ CompletedCheckpoint completed = new CompletedCheckpoint(
+ jobId,
+ checkpointId,
+ checkpointTimestamp,
+ System.currentTimeMillis(),
+ operatorStates,
+ masterState,
+ props,
+ metadataHandle,
+ externalPointer);
+
+ onCompletionPromise.complete(completed);
+
+ // to prevent null-pointers from concurrent modification, copy reference onto stack
+ PendingCheckpointStats statsCallback = this.statsCallback;
+ if (statsCallback != null) {
+ // Finalize the statsCallback and give the completed checkpoint a
+ // callback for discards.
+ CompletedCheckpointStats.DiscardCallback discardCallback =
+ statsCallback.reportCompletedCheckpoint(externalPointer);
+ completed.setDiscardCallback(discardCallback);
}
- }
- catch (Throwable t) {
- onCompletionPromise.completeExceptionally(t);
- ExceptionUtils.rethrowIOException(t);
- return null; // silence the compiler
- }
- }
- }
- public CompletedCheckpoint finalizeCheckpointNonExternalized() {
- synchronized (lock) {
- checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet.");
+ // mark this pending checkpoint as disposed, but do NOT drop the state
+ dispose(false);
- // make sure we fulfill the promise with an exception if something fails
- try {
- // finalize without external metadata
- return finalizeInternal(null, null);
+ return completed;
}
catch (Throwable t) {
onCompletionPromise.completeExceptionally(t);
- ExceptionUtils.rethrow(t);
+ ExceptionUtils.rethrowIOException(t);
return null; // silence the compiler
}
}
}
- @GuardedBy("lock")
- private CompletedCheckpoint finalizeInternal(
- @Nullable StreamStateHandle externalMetadata,
- @Nullable String externalPointer) {
-
- assert(Thread.holdsLock(lock));
-
- CompletedCheckpoint completed = new CompletedCheckpoint(
- jobId,
- checkpointId,
- checkpointTimestamp,
- System.currentTimeMillis(),
- operatorStates,
- masterState,
- props,
- externalMetadata,
- externalPointer);
-
- onCompletionPromise.complete(completed);
-
- // to prevent null-pointers from concurrent modification, copy reference onto stack
- PendingCheckpointStats statsCallback = this.statsCallback;
- if (statsCallback != null) {
- // Finalize the statsCallback and give the completed checkpoint a
- // callback for discards.
- CompletedCheckpointStats.DiscardCallback discardCallback =
- statsCallback.reportCompletedCheckpoint(externalPointer);
- completed.setDiscardCallback(discardCallback);
- }
-
- // mark this pending checkpoint as disposed, but do NOT drop the state
- dispose(false);
-
- return completed;
- }
-
/**
* Acknowledges the task with the given execution attempt id and the given subtask state.
*
@@ -528,6 +474,7 @@ public class PendingCheckpoint {
// unregistered shared states are still considered private at this point.
try {
StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values());
+ targetLocation.disposeOnFailure();
} catch (Throwable t) {
LOG.warn("Could not properly dispose the private states in the pending checkpoint {} of job {}.",
checkpointId, jobId, t);
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
index a7cf4b5..d7966e6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
@@ -34,8 +34,7 @@ import java.util.Collection;
* we allow different savepoint implementations (see subclasses of this
* interface).
*
- * <p>Savepoints are serialized via a {@link SavepointSerializer} and stored
- * via a {@link SavepointStore}.
+ * <p>Savepoints are serialized via a {@link SavepointSerializer}.
*/
public interface Savepoint extends Versioned {
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
deleted file mode 100644
index 31d9124..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.checkpoint.CheckpointProperties;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
-import org.apache.flink.runtime.checkpoint.OperatorState;
-import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.state.StreamStateHandle;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * The SavepointLoader is a utility to load and verify a Savepoint, and to create a checkpoint from it.
- */
-public class SavepointLoader {
-
- private static final Logger LOG = LoggerFactory.getLogger(SavepointLoader.class);
-
- /**
- * Loads a savepoint back as a {@link CompletedCheckpoint}.
- *
- * <p>This method verifies that tasks and parallelism still match the savepoint parameters.
- *
- * @param jobId The JobID of the job to load the savepoint for.
- * @param tasks Tasks that will possibly be reset
- * @param savepointPath The path of the savepoint to rollback to
- * @param classLoader The class loader to resolve serialized classes in legacy savepoint versions.
- * @param allowNonRestoredState Allow to skip checkpoint state that cannot be mapped
- * to any job vertex in tasks.
- *
- * @throws IllegalStateException If mismatch between program and savepoint state
- * @throws IOException If savepoint store failure
- */
- public static CompletedCheckpoint loadAndValidateSavepoint(
- JobID jobId,
- Map<JobVertexID, ExecutionJobVertex> tasks,
- String savepointPath,
- ClassLoader classLoader,
- boolean allowNonRestoredState) throws IOException {
-
- // (1) load the savepoint
- final Tuple2<Savepoint, StreamStateHandle> savepointAndHandle =
- SavepointStore.loadSavepointWithHandle(savepointPath, classLoader);
-
- Savepoint savepoint = savepointAndHandle.f0;
- final StreamStateHandle metadataHandle = savepointAndHandle.f1;
-
- if (savepoint.getTaskStates() != null) {
- savepoint = SavepointV2.convertToOperatorStateSavepointV2(tasks, savepoint);
- }
- // generate mapping from operator to task
- Map<OperatorID, ExecutionJobVertex> operatorToJobVertexMapping = new HashMap<>();
- for (ExecutionJobVertex task : tasks.values()) {
- for (OperatorID operatorID : task.getOperatorIDs()) {
- operatorToJobVertexMapping.put(operatorID, task);
- }
- }
-
- // (2) validate it (parallelism, etc)
- boolean expandedToLegacyIds = false;
-
- HashMap<OperatorID, OperatorState> operatorStates = new HashMap<>(savepoint.getOperatorStates().size());
- for (OperatorState operatorState : savepoint.getOperatorStates()) {
-
- ExecutionJobVertex executionJobVertex = operatorToJobVertexMapping.get(operatorState.getOperatorID());
-
- // on the first time we can not find the execution job vertex for an id, we also consider alternative ids,
- // for example as generated from older flink versions, to provide backwards compatibility.
- if (executionJobVertex == null && !expandedToLegacyIds) {
- operatorToJobVertexMapping = ExecutionJobVertex.includeAlternativeOperatorIDs(operatorToJobVertexMapping);
- executionJobVertex = operatorToJobVertexMapping.get(operatorState.getOperatorID());
- expandedToLegacyIds = true;
- LOG.info("Could not find ExecutionJobVertex. Including user-defined OperatorIDs in search.");
- }
-
- if (executionJobVertex != null) {
-
- if (executionJobVertex.getMaxParallelism() == operatorState.getMaxParallelism()
- || !executionJobVertex.isMaxParallelismConfigured()) {
- operatorStates.put(operatorState.getOperatorID(), operatorState);
- } else {
- String msg = String.format("Failed to rollback to savepoint %s. " +
- "Max parallelism mismatch between savepoint state and new program. " +
- "Cannot map operator %s with max parallelism %d to new program with " +
- "max parallelism %d. This indicates that the program has been changed " +
- "in a non-compatible way after the savepoint.",
- savepoint,
- operatorState.getOperatorID(),
- operatorState.getMaxParallelism(),
- executionJobVertex.getMaxParallelism());
-
- throw new IllegalStateException(msg);
- }
- } else if (allowNonRestoredState) {
- LOG.info("Skipping savepoint state for operator {}.", operatorState.getOperatorID());
- } else {
- for (OperatorSubtaskState operatorSubtaskState : operatorState.getStates()) {
- if (operatorSubtaskState.hasState()) {
- String msg = String.format("Failed to rollback to savepoint %s. " +
- "Cannot map savepoint state for operator %s to the new program, " +
- "because the operator is not available in the new program. If " +
- "you want to allow to skip this, you can set the --allowNonRestoredState " +
- "option on the CLI.",
- savepointPath, operatorState.getOperatorID());
-
- throw new IllegalStateException(msg);
- }
- }
- LOG.info("Skipping empty savepoint state for operator {}.", operatorState.getOperatorID());
- }
- }
-
- // (3) convert to checkpoint so the system can fall back to it
- CheckpointProperties props = CheckpointProperties.forStandardSavepoint();
-
- return new CompletedCheckpoint(
- jobId,
- savepoint.getCheckpointId(),
- 0L,
- 0L,
- operatorStates,
- savepoint.getMasterStates(),
- props,
- metadataHandle,
- savepointPath);
- }
-
- // ------------------------------------------------------------------------
-
- /** This class is not meant to be instantiated */
- private SavepointLoader() {}
-}