You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/01/18 17:09:21 UTC
[04/17] flink git commit: [FLINK-5823] [checkpoints] State backends
now also handle the checkpoint metadata
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
deleted file mode 100644
index 586df57..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.FileSystem.WriteMode;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.state.filesystem.FileStateHandle;
-import org.apache.flink.util.FileUtils;
-import org.apache.flink.util.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Utilities for storing and loading savepoint meta data files.
- *
- * <p>Stored savepoints have the following format:
- * <pre>
- * MagicNumber SavepointVersion Savepoint
- * - MagicNumber => int
- * - SavepointVersion => int (returned by Savepoint#getVersion())
- * - Savepoint => bytes (serialized via version-specific SavepointSerializer)
- * </pre>
- */
-public class SavepointStore {
-
- private static final Logger LOG = LoggerFactory.getLogger(SavepointStore.class);
-
- /** Magic number for sanity checks against stored savepoints. */
- public static final int MAGIC_NUMBER = 0x4960672d;
-
- private static final String SAVEPOINT_METADATA_FILE = "_metadata";
-
- /**
- * Metadata file for an externalized checkpoint, random suffix added
- * during store, because the parent directory is not unique.
- */
- static final String EXTERNALIZED_CHECKPOINT_METADATA_FILE = "checkpoint_metadata-";
-
- /**
- * Creates a savepoint directory.
- *
- * @param baseDirectory Base target directory for the savepoint
- * @param jobId Optional JobID the savepoint belongs to
- * @return The created savepoint directory
- * @throws IOException FileSystem operation failures are forwarded
- */
- public static String createSavepointDirectory(@Nonnull String baseDirectory, @Nullable JobID jobId) throws IOException {
- final Path basePath = new Path(baseDirectory);
- final FileSystem fs = basePath.getFileSystem();
-
- final String prefix;
- if (jobId == null) {
- prefix = "savepoint-";
- } else {
- prefix = String.format("savepoint-%s-", jobId.toString().substring(0, 6));
- }
-
- Exception latestException = null;
-
- // Try to create a FS output stream
- for (int attempt = 0; attempt < 10; attempt++) {
- Path path = new Path(basePath, FileUtils.getRandomFilename(prefix));
-
- try {
- if (fs.mkdirs(path)) {
- return path.toString();
- }
- } catch (Exception e) {
- latestException = e;
- }
- }
-
- throw new IOException("Failed to create savepoint directory at " + baseDirectory, latestException);
- }
-
- /**
- * Deletes a savepoint directory.
- *
- * @param savepointDirectory Recursively deletes the given directory
- * @throws IOException FileSystem operation failures are forwarded
- */
- public static void deleteSavepointDirectory(@Nonnull String savepointDirectory) throws IOException {
- Path path = new Path(savepointDirectory);
- FileSystem fs = FileSystem.get(path.toUri());
- fs.delete(path, true);
- }
-
- /**
- * Stores the savepoint metadata file.
- *
- * @param <T> Savepoint type
- * @param directory Target directory to store savepoint in
- * @param savepoint Savepoint to be stored
- * @return Path of stored savepoint
- * @throws IOException Failures during store are forwarded
- */
- public static <T extends Savepoint> String storeSavepoint(String directory, T savepoint) throws IOException {
- // write and create the file handle
- FileStateHandle metadataFileHandle = storeSavepointToHandle(directory,
- SAVEPOINT_METADATA_FILE, savepoint);
-
- // we return the savepoint directory path here!
- // The directory path also works to resume from and is more elegant than the direct
- // metadata file pointer
- return metadataFileHandle.getFilePath().getParent().toString();
- }
-
- /**
- * Stores the savepoint metadata file to a state handle.
- *
- * @param directory Target directory to store savepoint in
- * @param savepoint Savepoint to be stored
- *
- * @return State handle to the checkpoint metadata
- * @throws IOException Failures during store are forwarded
- */
- public static <T extends Savepoint> FileStateHandle storeSavepointToHandle(String directory, T savepoint) throws IOException {
- return storeSavepointToHandle(directory, SAVEPOINT_METADATA_FILE, savepoint);
- }
-
- /**
- * Stores the externalized checkpoint metadata file to a state handle.
- *
- * @param directory Target directory to store savepoint in
- * @param savepoint Savepoint to be stored
- *
- * @return State handle to the checkpoint metadata
- * @throws IOException Failures during store are forwarded
- */
- public static <T extends Savepoint> FileStateHandle storeExternalizedCheckpointToHandle(String directory, T savepoint) throws IOException {
- String fileName = FileUtils.getRandomFilename(EXTERNALIZED_CHECKPOINT_METADATA_FILE);
- return storeSavepointToHandle(directory, fileName, savepoint);
- }
-
- /**
- * Stores the savepoint metadata file to a state handle.
- *
- * @param directory Target directory to store savepoint in
- * @param savepoint Savepoint to be stored
- *
- * @return State handle to the checkpoint metadata
- * @throws IOException Failures during store are forwarded
- */
- static <T extends Savepoint> FileStateHandle storeSavepointToHandle(
- String directory,
- String filename,
- T savepoint) throws IOException {
-
- checkNotNull(directory, "Target directory");
- checkNotNull(savepoint, "Savepoint");
-
- final Path basePath = new Path(directory);
- final Path metadataFilePath = new Path(basePath, filename);
-
- final FileSystem fs = FileSystem.get(basePath.toUri());
-
- boolean success = false;
- try (FSDataOutputStream fdos = fs.create(metadataFilePath, WriteMode.NO_OVERWRITE);
- DataOutputStream dos = new DataOutputStream(fdos))
- {
- // Write header
- dos.writeInt(MAGIC_NUMBER);
- dos.writeInt(savepoint.getVersion());
-
- // Write savepoint
- SavepointSerializer<T> serializer = SavepointSerializers.getSerializer(savepoint);
- serializer.serialize(savepoint, dos);
-
- // construct result handle
- FileStateHandle handle = new FileStateHandle(metadataFilePath, dos.size());
-
- // all good!
- success = true;
- return handle;
- }
- finally {
- if (!success && fs.exists(metadataFilePath)) {
- if (!fs.delete(metadataFilePath, true)) {
- LOG.warn("Failed to delete file {} after failed metadata write.", metadataFilePath);
- }
- }
- }
- }
-
- /**
- * Loads the savepoint at the specified path.
- *
- * @param savepointFileOrDirectory Path to the parent savepoint directory or the meta data file.
- * @param classLoader The class loader used to resolve serialized classes from legacy savepoint formats.
- * @return The loaded savepoint
- *
- * @throws IOException Failures during load are forwarded
- */
- public static Savepoint loadSavepoint(String savepointFileOrDirectory, ClassLoader classLoader) throws IOException {
- return loadSavepointWithHandle(savepointFileOrDirectory, classLoader).f0;
- }
-
- /**
- * Loads the savepoint at the specified path. This methods returns the savepoint, as well as the
- * handle to the metadata.
- *
- * @param savepointFileOrDirectory Path to the parent savepoint directory or the meta data file.
- * @param classLoader The class loader used to resolve serialized classes from legacy savepoint formats.
- * @return The loaded savepoint
- *
- * @throws IOException Failures during load are forwarded
- */
- public static Tuple2<Savepoint, StreamStateHandle> loadSavepointWithHandle(
- String savepointFileOrDirectory,
- ClassLoader classLoader) throws IOException {
-
- checkNotNull(savepointFileOrDirectory, "savepointFileOrDirectory");
- checkNotNull(classLoader, "classLoader");
-
- Path path = new Path(savepointFileOrDirectory);
-
- LOG.info("Loading savepoint from {}", path);
-
- FileSystem fs = FileSystem.get(path.toUri());
-
- FileStatus status = fs.getFileStatus(path);
-
- // If this is a directory, we need to find the meta data file
- if (status.isDir()) {
- Path candidatePath = new Path(path, SAVEPOINT_METADATA_FILE);
- if (fs.exists(candidatePath)) {
- path = candidatePath;
- LOG.info("Using savepoint file in {}", path);
- } else {
- throw new IOException("Cannot find meta data file in directory " + path
- + ". Please try to load the savepoint directly from the meta data file "
- + "instead of the directory.");
- }
- }
-
- // load the savepoint
- final Savepoint savepoint;
- try (DataInputStream dis = new DataInputViewStreamWrapper(fs.open(path))) {
- int magicNumber = dis.readInt();
-
- if (magicNumber == MAGIC_NUMBER) {
- int version = dis.readInt();
-
- SavepointSerializer<?> serializer = SavepointSerializers.getSerializer(version);
- savepoint = serializer.deserialize(dis, classLoader);
- } else {
- throw new RuntimeException("Unexpected magic number. This can have multiple reasons: " +
- "(1) You are trying to load a Flink 1.0 savepoint, which is not supported by this " +
- "version of Flink. (2) The file you were pointing to is not a savepoint at all. " +
- "(3) The savepoint file has been corrupted.");
- }
- }
-
- // construct the stream handle to the metadata file
- // we get the size best-effort
- long size = 0;
- try {
- size = fs.getFileStatus(path).getLen();
- }
- catch (Exception ignored) {
- // we don't know the size, but we don't want to fail the savepoint loading for that
- }
- StreamStateHandle metadataHandle = new FileStateHandle(path, size);
-
- return new Tuple2<>(savepoint, metadataHandle);
- }
-
- /**
- * Removes the savepoint meta data w/o loading and disposing it.
- *
- * @param path Path of savepoint to remove
- * @throws IOException Failures during disposal are forwarded
- */
- public static void removeSavepointFile(String path) throws IOException {
- Preconditions.checkNotNull(path, "Path");
-
- try {
- LOG.info("Removing savepoint: {}.", path);
-
- Path filePath = new Path(path);
- FileSystem fs = FileSystem.get(filePath.toUri());
-
- if (fs.exists(filePath)) {
- if (!fs.delete(filePath, true)) {
- throw new IOException("Failed to delete " + filePath + ".");
- }
- } else {
- throw new IllegalArgumentException("Invalid path '" + filePath.toUri() + "'.");
- }
- } catch (Throwable t) {
- throw new IOException("Failed to dispose savepoint " + path + ".", t);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index f88a6b62..5c2f0f6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
@@ -58,7 +59,6 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
@@ -444,14 +444,13 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
long checkpointTimeout,
long minPauseBetweenCheckpoints,
int maxConcurrentCheckpoints,
- ExternalizedCheckpointSettings externalizeSettings,
+ CheckpointRetentionPolicy retentionPolicy,
List<ExecutionJobVertex> verticesToTrigger,
List<ExecutionJobVertex> verticesToWaitFor,
List<ExecutionJobVertex> verticesToCommitTo,
List<MasterTriggerRestoreHook<?>> masterHooks,
CheckpointIDCounter checkpointIDCounter,
CompletedCheckpointStore checkpointStore,
- String checkpointDir,
StateBackend checkpointStateBackend,
CheckpointStatsTracker statsTracker) {
@@ -475,13 +474,12 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
checkpointTimeout,
minPauseBetweenCheckpoints,
maxConcurrentCheckpoints,
- externalizeSettings,
+ retentionPolicy,
tasksToTrigger,
tasksToWaitFor,
tasksToCommitTo,
checkpointIDCounter,
checkpointStore,
- checkpointDir,
checkpointStateBackend,
ioExecutor,
SharedStateRegistry.DEFAULT_FACTORY);
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index c742903..fe70bb6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -186,7 +186,7 @@ public class ExecutionGraphBuilder {
// configure the state checkpointing
JobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();
if (snapshotSettings != null) {
- List<ExecutionJobVertex> triggerVertices =
+ List<ExecutionJobVertex> triggerVertices =
idToVertex(snapshotSettings.getVerticesToTrigger(), executionGraph);
List<ExecutionJobVertex> ackVertices =
@@ -242,7 +242,7 @@ public class ExecutionGraphBuilder {
try {
applicationConfiguredBackend = serializedAppConfigured.deserializeValue(classLoader);
} catch (IOException | ClassNotFoundException e) {
- throw new JobExecutionException(jobId,
+ throw new JobExecutionException(jobId,
"Could not deserialize application-defined state backend.", e);
}
}
@@ -295,14 +295,13 @@ public class ExecutionGraphBuilder {
chkConfig.getCheckpointTimeout(),
chkConfig.getMinPauseBetweenCheckpoints(),
chkConfig.getMaxConcurrentCheckpoints(),
- chkConfig.getExternalizedCheckpointSettings(),
+ chkConfig.getCheckpointRetentionPolicy(),
triggerVertices,
ackVertices,
confirmVertices,
hooks,
checkpointIdCounter,
completedCheckpoints,
- externalizedCheckpointsDir,
rootBackend,
checkpointStatsTracker);
}
@@ -331,7 +330,7 @@ public class ExecutionGraphBuilder {
} else {
throw new IllegalArgumentException(
"The snapshot checkpointing settings refer to non-existent vertex " + id);
- }
+ }
}
return result;
@@ -339,6 +338,6 @@ public class ExecutionGraphBuilder {
// ------------------------------------------------------------------------
- /** This class is not supposed to be instantiated */
+ /** This class is not supposed to be instantiated. */
private ExecutionGraphBuilder() {}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
index e00a6d4..4ecbda5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.jobgraph.tasks;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.util.Preconditions;
import java.io.Serializable;
@@ -31,7 +32,7 @@ import java.util.Objects;
*/
public class CheckpointCoordinatorConfiguration implements Serializable {
- private static final long serialVersionUID = -647384516034982626L;
+ private static final long serialVersionUID = 2L;
private final long checkpointInterval;
@@ -41,8 +42,8 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
private final int maxConcurrentCheckpoints;
- /** Settings for externalized checkpoints. */
- private final ExternalizedCheckpointSettings externalizedCheckpointSettings;
+ /** Settings for what to do with checkpoints when a job finishes. */
+ private final CheckpointRetentionPolicy checkpointRetentionPolicy;
/**
* Flag indicating whether exactly once checkpoint mode has been configured.
@@ -58,7 +59,7 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
long checkpointTimeout,
long minPauseBetweenCheckpoints,
int maxConcurrentCheckpoints,
- ExternalizedCheckpointSettings externalizedCheckpointSettings,
+ CheckpointRetentionPolicy checkpointRetentionPolicy,
boolean isExactlyOnce) {
// sanity checks
@@ -71,7 +72,7 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
this.checkpointTimeout = checkpointTimeout;
this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
- this.externalizedCheckpointSettings = Preconditions.checkNotNull(externalizedCheckpointSettings);
+ this.checkpointRetentionPolicy = Preconditions.checkNotNull(checkpointRetentionPolicy);
this.isExactlyOnce = isExactlyOnce;
}
@@ -91,8 +92,8 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
return maxConcurrentCheckpoints;
}
- public ExternalizedCheckpointSettings getExternalizedCheckpointSettings() {
- return externalizedCheckpointSettings;
+ public CheckpointRetentionPolicy getCheckpointRetentionPolicy() {
+ return checkpointRetentionPolicy;
}
public boolean isExactlyOnce() {
@@ -113,12 +114,18 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
minPauseBetweenCheckpoints == that.minPauseBetweenCheckpoints &&
maxConcurrentCheckpoints == that.maxConcurrentCheckpoints &&
isExactlyOnce == that.isExactlyOnce &&
- Objects.equals(externalizedCheckpointSettings, that.externalizedCheckpointSettings);
+ checkpointRetentionPolicy == that.checkpointRetentionPolicy;
}
@Override
public int hashCode() {
- return Objects.hash(checkpointInterval, checkpointTimeout, minPauseBetweenCheckpoints, maxConcurrentCheckpoints, externalizedCheckpointSettings, isExactlyOnce);
+ return Objects.hash(
+ checkpointInterval,
+ checkpointTimeout,
+ minPauseBetweenCheckpoints,
+ maxConcurrentCheckpoints,
+ checkpointRetentionPolicy,
+ isExactlyOnce);
}
@Override
@@ -128,7 +135,7 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
", checkpointTimeout=" + checkpointTimeout +
", minPauseBetweenCheckpoints=" + minPauseBetweenCheckpoints +
", maxConcurrentCheckpoints=" + maxConcurrentCheckpoints +
- ", externalizedCheckpointSettings=" + externalizedCheckpointSettings +
+ ", checkpointRetentionPolicy=" + checkpointRetentionPolicy +
'}';
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/ExternalizedCheckpointSettings.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/ExternalizedCheckpointSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/ExternalizedCheckpointSettings.java
deleted file mode 100644
index f432796..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/ExternalizedCheckpointSettings.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobgraph.tasks;
-
-import org.apache.flink.annotation.Internal;
-
-import java.util.Objects;
-
-/**
- * Grouped settings for externalized checkpoints.
- */
-@Internal
-public class ExternalizedCheckpointSettings implements java.io.Serializable {
-
- private static final long serialVersionUID = -6271691851124392955L;
-
- private static final ExternalizedCheckpointSettings NONE = new ExternalizedCheckpointSettings(false, false);
-
- /** Flag indicating whether checkpoints should be externalized. */
- private final boolean externalizeCheckpoints;
-
- /** Flag indicating whether externalized checkpoints should delete on cancellation. */
- private final boolean deleteOnCancellation;
-
- private ExternalizedCheckpointSettings(boolean externalizeCheckpoints, boolean deleteOnCancellation) {
- this.externalizeCheckpoints = externalizeCheckpoints;
- this.deleteOnCancellation = deleteOnCancellation;
- }
-
- /**
- * Returns <code>true</code> if checkpoints should be externalized.
- *
- * @return <code>true</code> if checkpoints should be externalized.
- */
- public boolean externalizeCheckpoints() {
- return externalizeCheckpoints;
- }
-
- /**
- * Returns <code>true</code> if externalized checkpoints should be deleted on cancellation.
- *
- * @return <code>true</code> if externalized checkpoints should be deleted on cancellation.
- */
- public boolean deleteOnCancellation() {
- return deleteOnCancellation;
- }
-
- public static ExternalizedCheckpointSettings externalizeCheckpoints(boolean deleteOnCancellation) {
- return new ExternalizedCheckpointSettings(true, deleteOnCancellation);
- }
-
- public static ExternalizedCheckpointSettings none() {
- return NONE;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- ExternalizedCheckpointSettings that = (ExternalizedCheckpointSettings) o;
- return externalizeCheckpoints == that.externalizeCheckpoints &&
- deleteOnCancellation == that.deleteOnCancellation;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(externalizeCheckpoints, deleteOnCancellation);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
index 91c9ae1..b88183e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
@@ -19,9 +19,9 @@
package org.apache.flink.runtime.rest.handler.job.checkpoints;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
@@ -71,11 +71,11 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphHandler<Check
"Checkpointing is not enabled for this job (" + executionGraph.getJobID() + ").",
HttpResponseStatus.NOT_FOUND);
} else {
- ExternalizedCheckpointSettings externalizedCheckpointSettings = checkpointCoordinatorConfiguration.getExternalizedCheckpointSettings();
+ CheckpointRetentionPolicy retentionPolicy = checkpointCoordinatorConfiguration.getCheckpointRetentionPolicy();
CheckpointConfigInfo.ExternalizedCheckpointInfo externalizedCheckpointInfo = new CheckpointConfigInfo.ExternalizedCheckpointInfo(
- externalizedCheckpointSettings.externalizeCheckpoints(),
- externalizedCheckpointSettings.deleteOnCancellation());
+ retentionPolicy != CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+ retentionPolicy != CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION);
return new CheckpointConfigInfo(
checkpointCoordinatorConfiguration.isExactlyOnce() ? CheckpointConfigInfo.ProcessingMode.EXACTLY_ONCE : CheckpointConfigInfo.ProcessingMode.AT_LEAST_ONCE,
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
index 69cc55f..fbaa484 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
@@ -18,9 +18,9 @@
package org.apache.flink.runtime.rest.handler.legacy.checkpoints;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
@@ -100,12 +100,13 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandle
gen.writeNumberField(CheckpointConfigInfo.FIELD_NAME_CHECKPOINT_MIN_PAUSE, jobCheckpointingConfiguration.getMinPauseBetweenCheckpoints());
gen.writeNumberField(CheckpointConfigInfo.FIELD_NAME_CHECKPOINT_MAX_CONCURRENT, jobCheckpointingConfiguration.getMaxConcurrentCheckpoints());
- ExternalizedCheckpointSettings externalization = jobCheckpointingConfiguration.getExternalizedCheckpointSettings();
+ CheckpointRetentionPolicy retentionPolicy = jobCheckpointingConfiguration.getCheckpointRetentionPolicy();
gen.writeObjectFieldStart(CheckpointConfigInfo.FIELD_NAME_EXTERNALIZED_CHECKPOINT_CONFIG);
{
- if (externalization.externalizeCheckpoints()) {
+ if (retentionPolicy != CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION) {
gen.writeBooleanField(CheckpointConfigInfo.ExternalizedCheckpointInfo.FIELD_NAME_ENABLED, true);
- gen.writeBooleanField(CheckpointConfigInfo.ExternalizedCheckpointInfo.FIELD_NAME_DELETE_ON_CANCELLATION, externalization.deleteOnCancellation());
+ gen.writeBooleanField(CheckpointConfigInfo.ExternalizedCheckpointInfo.FIELD_NAME_DELETE_ON_CANCELLATION,
+ retentionPolicy != CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION);
} else {
gen.writeBooleanField(CheckpointConfigInfo.ExternalizedCheckpointInfo.FIELD_NAME_ENABLED, false);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
new file mode 100644
index 0000000..ea1015a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * CheckpointStorage implements the durable storage of checkpoint data and metadata streams.
+ * An individual checkpoint or savepoint is stored to a {@link CheckpointStorageLocation},
+ * created by this class.
+ */
+public interface CheckpointStorage {
+
+ /**
+ * Checks whether this backend supports highly available storage of data.
+ *
+ * <p>Some state backends may offer support for that with default settings, which makes them
+ * suitable for zero-config prototyping, but not for actual production setups.
+ */
+ boolean supportsHighlyAvailableStorage();
+
+ /**
+ * Checks whether the storage has a default savepoint location configured.
+ */
+ boolean hasDefaultSavepointLocation();
+
+ /**
+ * Resolves the given pointer to a checkpoint/savepoint into a state handle from which the
+ * checkpoint metadata can be read. If the state backend cannot understand the format of
+ * the pointer (for example because it was created by a different state backend) this method
+ * should throw an {@code IOException}.
+ *
+ * @param pointer The pointer to resolve.
+ * @return The state handler from which one can read the checkpoint metadata.
+ *
+ * @throws IOException Thrown, if the state backend does not understand the pointer, or if
+ * the pointer could not be resolved due to an I/O error.
+ */
+ StreamStateHandle resolveCheckpoint(String pointer) throws IOException;
+
+ /**
+ * Initializes a storage location for new checkpoint with the given ID.
+ *
+ * <p>The returned storage location can be used to write the checkpoint data and metadata
+ * to and to obtain the pointers for the location(s) where the actual checkpoint data should be
+ * stored.
+ *
+ * @param checkpointId The ID (logical timestamp) of the checkpoint that should be persisted.
+ * @return A storage location for the data and metadata of the given checkpoint.
+ *
+ * @throws IOException Thrown if the storage location cannot be initialized due to an I/O exception.
+ */
+ CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException;
+
+ /**
+ * Initializes a storage location for new savepoint with the given ID.
+ *
+ * <p>If an external location pointer is passed, the savepoint storage location
+ * will be initialized at the location of that pointer. If the external location pointer is null,
+ * the default savepoint location will be used. If no default savepoint location is configured,
+ * this will throw an exception. Whether a default savepoint location is configured can be
+ * checked via {@link #hasDefaultSavepointLocation()}.
+ *
+ * @param checkpointId The ID (logical timestamp) of the savepoint's checkpoint.
+ * @param externalLocationPointer Optionally, a pointer to the location where the savepoint should
+ * be stored. May be null.
+ *
+ * @return A storage location for the data and metadata of the savepoint.
+ *
+ * @throws IOException Thrown if the storage location cannot be initialized due to an I/O exception.
+ */
+ CheckpointStorageLocation initializeLocationForSavepoint(
+ long checkpointId,
+ @Nullable String externalLocationPointer) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLocation.java
new file mode 100644
index 0000000..fbc4805
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLocation.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+
+import java.io.IOException;
+
+/**
+ * A storage location for one particular checkpoint. This location is typically
+ * created and initialized via {@link CheckpointStorage#initializeLocationForCheckpoint(long)} or
+ * {@link CheckpointStorage#initializeLocationForSavepoint(long, String)}.
+ */
+public interface CheckpointStorageLocation {
+
+ /**
+ * Creates the output stream to persist the checkpoint metadata to.
+ *
+ * @return The output stream to persist the checkpoint metadata to.
+ * @throws IOException Thrown, if the stream cannot be opened due to an I/O error.
+ */
+ CheckpointStateOutputStream createMetadataOutputStream() throws IOException;
+
+ /**
+ * Finalizes the checkpoint, marking the location as a finished checkpoint.
+ * This method returns the external checkpoint pointer that can be used to resolve
+ * the checkpoint upon recovery.
+ *
+ * @return The external pointer to the checkpoint at this location.
+ * @throws IOException Thrown, if finalizing / marking as finished fails due to an I/O error.
+ */
+ String markCheckpointAsFinished() throws IOException;
+
+ /**
+ * Disposes the checkpoint location in case the checkpoint has failed.
+ */
+ void disposeOnFailure() throws IOException;
+
+ /**
+ * Gets the location encoded as a string pointer.
+ *
+ * <p>This pointer is used to send the target storage location via checkpoint RPC messages
+ * and checkpoint barriers, in a format avoiding backend-specific classes.
+ *
+ * <p>That string encodes the location typically in a backend-specific way.
+ * For example, file-based backends can encode paths here.
+ */
+ String getLocationAsPointer();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java
index 73113ec..c94b0e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.runtime.state;
import org.apache.flink.core.fs.FSDataOutputStream;
@@ -42,6 +43,14 @@ public interface CheckpointStreamFactory {
/**
* A dedicated output stream that produces a {@link StreamStateHandle} when closed.
*
+ * <p><b>Important:</b> When closing this stream after the successful case, you must
+ * call {@link #closeAndGetHandle()} - only that method will actually retain the resource
+ * written to. The method has the semantics of "close on success".
+ * The {@link #close()} method is supposed to remove the target resource if
+ * called before {@link #closeAndGetHandle()}, hence having the semantics of
+ * "close on failure". That way, simple try-with-resources statements automatically
+ * clean up unsuccessful partial state resources in case the writing does not complete.
+ *
* <p>Note: This is an abstract class and not an interface because {@link OutputStream}
* is an abstract class.
*/
@@ -51,9 +60,27 @@ public interface CheckpointStreamFactory {
* Closes the stream and gets a state handle that can create an input stream
* producing the data written to this stream.
*
+ * <p>This closing must be called (also when the caller is not interested in the handle)
+ * to successfully close the stream and retain the produced resource. In contrast,
+ * the {@link #close()} method removes the target resource when called.
+ *
* @return A state handle that can create an input stream producing the data written to this stream.
* @throws IOException Thrown, if the stream cannot be closed.
*/
public abstract StreamStateHandle closeAndGetHandle() throws IOException;
+
+ /**
+ * This method should close the stream, if has not been closed before.
+ * If this method actually closes the stream, it should delete/release the
+ * resource behind the stream, such as the file that the stream writes to.
+ *
+ * <p>The above implies that this method is intended to be the "unsuccessful close",
+ * such as when cancelling the stream writing, or when an exception occurs.
+ * Closing the stream for the successful case must go through {@link #closeAndGetHandle()}.
+ *
+ * @throws IOException Thrown, if the stream cannot be closed.
+ */
+ @Override
+ public abstract void close() throws IOException;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
index 7961b5e..3d3fda2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import javax.annotation.Nullable;
+
import java.io.IOException;
/**
@@ -73,8 +74,7 @@ import java.io.IOException;
* states stores that provide access to the persistent storage and hold the keyed- and operator
* state data structures. That way, the State Backend can be very lightweight (contain only
* configurations) which makes it easier to be serializable.
- *
- *
+ *
* <h2>Thread Safety</h2>
*
* State backend implementations have to be thread-safe. Multiple threads may be creating
@@ -84,13 +84,44 @@ import java.io.IOException;
public interface StateBackend extends java.io.Serializable {
// ------------------------------------------------------------------------
- // Persistent Bytes Storage
+ // Checkpoint storage - the durable persistence of checkpoint data
+ // ------------------------------------------------------------------------
+
+ /**
+ * Resolves the given pointer to a checkpoint/savepoint into a state handle from which the
+ * checkpoint metadata can be read. If the state backend cannot understand the format of
+ * the pointer (for example because it was created by a different state backend) this method
+ * should throw an {@code IOException}.
+ *
+ * @param pointer The pointer to resolve.
+ * @return The state handler from which one can read the checkpoint metadata.
+ *
+ * @throws IOException Thrown, if the state backend does not understand the pointer, or if
+ * the pointer could not be resolved due to an I/O error.
+ */
+ StreamStateHandle resolveCheckpoint(String pointer) throws IOException;
+
+ /**
+ * Creates a storage for checkpoints for the given job. The checkpoint storage is
+ * used to write checkpoint data and metadata.
+ *
+ * @param jobId The job to store checkpoint data for.
+ * @return A checkpoint storage for the given job.
+ *
+ * @throws IOException Thrown if the checkpoint storage cannot be initialized.
+ */
+ CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException;
+
+ // ------------------------------------------------------------------------
+ // Persistent bytes storage for checkpoint data
// ------------------------------------------------------------------------
/**
* Creates a {@link CheckpointStreamFactory} that can be used to create streams
* that should end up in a checkpoint.
*
+ * <p>NOTE: This method will probably go into the {@link CheckpointStorage} in the future.
+ *
* @param jobId The {@link JobID} of the job for which we are creating checkpoint streams.
* @param operatorIdentifier An identifier of the operator for which we create streams.
*/
@@ -104,6 +135,8 @@ public interface StateBackend extends java.io.Serializable {
* this will return the same factory as for regular checkpoints, but maybe
* slightly adjusted.
*
+ * <p>NOTE: This method will probably go into the {@link CheckpointStorage} in the future.
+ *
* @param jobId The {@link JobID} of the job for which we are creating checkpoint streams.
* @param operatorIdentifier An identifier of the operator for which we create streams.
* @param targetLocation An optional custom location for the savepoint stream.
@@ -124,21 +157,13 @@ public interface StateBackend extends java.io.Serializable {
/**
* Creates a new {@link AbstractKeyedStateBackend} that is responsible for holding <b>keyed state</b>
* and checkpointing it.
- *
+ *
* <p><i>Keyed State</i> is state where each value is bound to a key.
- *
- * @param env
- * @param jobID
- * @param operatorIdentifier
- * @param keySerializer
- * @param numberOfKeyGroups
- * @param keyGroupRange
- * @param kvStateRegistry
- *
+ *
* @param <K> The type of the keys by which the state is organized.
- *
+ *
* @return The Keyed State Backend for the given job, operator, and key group range.
- *
+ *
* @throws Exception This method may forward all exceptions that occur while instantiating the backend.
*/
<K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
@@ -149,18 +174,18 @@ public interface StateBackend extends java.io.Serializable {
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry) throws Exception;
-
+
/**
* Creates a new {@link OperatorStateBackend} that can be used for storing operator state.
- *
+ *
* <p>Operator state is state that is associated with parallel operator (or function) instances,
* rather than with keys.
- *
+ *
* @param env The runtime environment of the executing task.
* @param operatorIdentifier The identifier of the operator whose state should be stored.
- *
+ *
* @return The OperatorStateBackend for operator identified by the job and operator identifier.
- *
+ *
* @throws Exception This method may forward all exceptions that occur while instantiating the backend.
*/
OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) throws Exception;
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java
index 6ec6f24..0b622c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java
@@ -25,11 +25,11 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.AbstractStateBackend;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.flink.runtime.state.StreamStateHandle;
import javax.annotation.Nullable;
+
+import java.io.IOException;
import java.net.URI;
/**
@@ -45,8 +45,8 @@ import java.net.URI;
*
* <h1>Checkpoint Layout</h1>
*
- * The state backend is configured with a base directory and persists the checkpoint data of specific
- * checkpoints in specific subdirectories. For example, if the base directory was set to
+ * <p>The state backend is configured with a base directory and persists the checkpoint data of specific
+ * checkpoints in specific subdirectories. For example, if the base directory was set to
* {@code hdfs://namenode:port/flink-checkpoints/}, the state backend will create a subdirectory with
* the job's ID that will contain the actual checkpoints:
* ({@code hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b})
@@ -56,17 +56,20 @@ import java.net.URI;
*
* <h1>Savepoint Layout</h1>
*
- * A savepoint that is set to be stored in path {@code hdfs://namenode:port/flink-savepoints/}, will create
+ * <p>A savepoint that is set to be stored in path {@code hdfs://namenode:port/flink-savepoints/}, will create
* a subdirectory {@code savepoint-jobId(0, 6)-randomDigits} in which it stores all savepoint data.
* The random digits are added as "entropy" to avoid directory collisions.
+ *
+ * <h1>Metadata File</h1>
+ *
+ * <p>A completed checkpoint writes its metadata into a file
+ * '{@value AbstractFsCheckpointStorage#METADATA_FILE_NAME}'.
*/
@PublicEvolving
public abstract class AbstractFileStateBackend extends AbstractStateBackend {
private static final long serialVersionUID = 1L;
- private static final Logger LOG = LoggerFactory.getLogger(AbstractFileStateBackend.class);
-
// ------------------------------------------------------------------------
// State Backend Properties
// ------------------------------------------------------------------------
@@ -118,7 +121,7 @@ public abstract class AbstractFileStateBackend extends AbstractStateBackend {
*
* @param baseCheckpointPath The checkpoint base directory to use (or null).
* @param baseSavepointPath The default savepoint directory to use (or null).
- * @param configuration The configuration to read values from
+ * @param configuration The configuration to read values from.
*/
protected AbstractFileStateBackend(
@Nullable Path baseCheckpointPath,
@@ -154,10 +157,18 @@ public abstract class AbstractFileStateBackend extends AbstractStateBackend {
}
// ------------------------------------------------------------------------
+ // Initialization and metadata storage
+ // ------------------------------------------------------------------------
+
+ @Override
+ public StreamStateHandle resolveCheckpoint(String pointer) throws IOException {
+ return AbstractFsCheckpointStorage.resolveCheckpointPointer(pointer);
+ }
+
+ // ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
- //
/**
* Checks the validity of the path's scheme and path.
*
@@ -203,4 +214,4 @@ public abstract class AbstractFileStateBackend extends AbstractStateBackend {
}
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsCheckpointStorage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsCheckpointStorage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsCheckpointStorage.java
new file mode 100644
index 0000000..1254ee2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsCheckpointStorage.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.FileUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of durable checkpoint storage to file systems.
+ */
+public abstract class AbstractFsCheckpointStorage implements CheckpointStorage {
+
+ // ------------------------------------------------------------------------
+ // Constants
+ // ------------------------------------------------------------------------
+
+ /** The prefix of the directory containing the data exclusive to a checkpoint. */
+ public static final String CHECKPOINT_DIR_PREFIX = "chk-";
+
+ /** The name of the directory for shared checkpoint state. */
+ public static final String CHECKPOINT_SHARED_STATE_DIR = "shared";
+
+ /** The name of the directory for state not owned/released by the master, but by the TaskManagers. */
+ public static final String CHECKPOINT_TASK_OWNED_STATE_DIR = "taskowned";
+
+ /** The name of the metadata files in checkpoints / savepoints. */
+ public static final String METADATA_FILE_NAME = "_metadata";
+
+ // ------------------------------------------------------------------------
+ // Fields and properties
+ // ------------------------------------------------------------------------
+
+ /** The jobId, written into the generated savepoint directories. */
+ private final JobID jobId;
+
+ /** The default location for savepoints. Null, if none is configured. */
+ @Nullable
+ private final Path defaultSavepointDirectory;
+
+ /**
+ * Creates a new checkpoint storage.
+ *
+ * @param jobId The ID of the job that writes the checkpoints.
+ * @param defaultSavepointDirectory The default location for savepoints, or null, if none is set.
+ */
+ protected AbstractFsCheckpointStorage(
+ JobID jobId,
+ @Nullable Path defaultSavepointDirectory) {
+
+ this.jobId = checkNotNull(jobId);
+ this.defaultSavepointDirectory = defaultSavepointDirectory;
+ }
+
+ /**
+ * Gets the default directory for savepoints. Returns null, if no default savepoint
+ * directory is configured.
+ */
+ @Nullable
+ public Path getDefaultSavepointDirectory() {
+ return defaultSavepointDirectory;
+ }
+
+ // ------------------------------------------------------------------------
+ // CheckpointStorage implementation
+ // ------------------------------------------------------------------------
+
+ @Override
+ public boolean hasDefaultSavepointLocation() {
+ return defaultSavepointDirectory != null;
+ }
+
+ @Override
+ public StreamStateHandle resolveCheckpoint(String checkpointPointer) throws IOException {
+ return resolveCheckpointPointer(checkpointPointer);
+ }
+
+ /**
+ * Creates a file system based storage location for a savepoint.
+ *
+ * <p>This methods implements the logic that decides which location to use (given optional
+ * parameters for a configured location and a location passed for this specific savepoint)
+ * and how to name and initialize the savepoint directory.
+ *
+ * @param externalLocationPointer The target location pointer for the savepoint.
+ * Must be a valid URI. Null, if not supplied.
+ * @param checkpointId The checkpoint ID of the savepoint.
+ *
+ * @return The checkpoint storage location for the savepoint.
+ *
+ * @throws IOException Thrown if the target directory could not be created.
+ */
+ @Override
+ public FsCheckpointStorageLocation initializeLocationForSavepoint(
+ @SuppressWarnings("unused") long checkpointId,
+ @Nullable String externalLocationPointer) throws IOException {
+
+ // determine where to write the savepoint to
+
+ final Path savepointBasePath;
+ if (externalLocationPointer != null) {
+ savepointBasePath = new Path(externalLocationPointer);
+ }
+ else if (defaultSavepointDirectory != null) {
+ savepointBasePath = defaultSavepointDirectory;
+ }
+ else {
+ throw new IllegalArgumentException("No savepoint location given and no default location configured.");
+ }
+
+ // generate the savepoint directory
+
+ final FileSystem fs = savepointBasePath.getFileSystem();
+ final String prefix = "savepoint-" + jobId.toString().substring(0, 6) + '-';
+
+ Exception latestException = null;
+ for (int attempt = 0; attempt < 10; attempt++) {
+ final Path path = new Path(savepointBasePath, FileUtils.getRandomFilename(prefix));
+
+ try {
+ if (fs.mkdirs(path)) {
+ return new FsCheckpointStorageLocation(fs, path, path, path);
+ }
+ } catch (Exception e) {
+ latestException = e;
+ }
+ }
+
+ throw new IOException("Failed to create savepoint directory at " + savepointBasePath, latestException);
+ }
+
+ // ------------------------------------------------------------------------
+ // Creating and resolving paths
+ // ------------------------------------------------------------------------
+
+ /**
+ * Builds directory into which a specific job checkpoints, meaning the directory inside which
+ * it creates the checkpoint-specific subdirectories.
+ *
+ * <p>This method only succeeds if a base checkpoint directory has been set; otherwise
+ * the method fails with an exception.
+ *
+ * @param jobId The ID of the job
+ * @return The job's checkpoint directory, re
+ *
+ * @throws UnsupportedOperationException Thrown, if no base checkpoint directory has been set.
+ */
+ protected static Path getCheckpointDirectoryForJob(Path baseCheckpointPath, JobID jobId) {
+ return new Path(baseCheckpointPath, jobId.toString());
+ }
+
+ /**
+ * Creates the directory path for the data exclusive to a specific checkpoint.
+ *
+ * @param baseDirectory The base directory into which the job checkpoints.
+ * @param checkpointId The ID (logical timestamp) of the checkpoint.
+ */
+ protected static Path createCheckpointDirectory(Path baseDirectory, long checkpointId) {
+ return new Path(baseDirectory, CHECKPOINT_DIR_PREFIX + checkpointId);
+ }
+
+ /**
+ * Takes the given string (representing a pointer to a checkpoint) and resolves it to a file
+ * status for the checkpoint's metadata file.
+ *
+ * @param checkpointPointer The pointer to resolve.
+ * @return A state handle to checkpoint/savepoint's metadata.
+ *
+ * @throws IOException Thrown, if the pointer cannot be resolved, the file system not accessed, or
+ * the pointer points to a location that does not seem to be a checkpoint/savepoint.
+ */
+ protected static StreamStateHandle resolveCheckpointPointer(String checkpointPointer) throws IOException {
+ checkNotNull(checkpointPointer, "checkpointPointer");
+ checkArgument(!checkpointPointer.isEmpty(), "empty checkpoint pointer");
+
+ // check if the pointer is in fact a valid file path
+ final Path path;
+ try {
+ path = new Path(checkpointPointer);
+ }
+ catch (Exception e) {
+ throw new IOException("Checkpoint/savepoint path '" + checkpointPointer + "' is not a valid file URI. " +
+ "Either the pointer path is invalid, or the checkpoint was created by a different state backend.");
+ }
+
+ // check if the file system can be accessed
+ final FileSystem fs;
+ try {
+ fs = path.getFileSystem();
+ }
+ catch (IOException e) {
+ throw new IOException("Cannot access file system for checkpoint/savepoint path '" +
+ checkpointPointer + "'.", e);
+ }
+
+ final FileStatus status;
+ try {
+ status = fs.getFileStatus(path);
+ }
+ catch (FileNotFoundException e) {
+ throw new FileNotFoundException("Cannot find checkpoint or savepoint " +
+ "file/directory '" + checkpointPointer + "' on file system '" + fs.getUri().getScheme() + "'.");
+ }
+
+ // if we are here, the file / directory exists
+ final FileStatus metadataFileStatus;
+
+ // If this is a directory, we need to find the meta data file
+ if (status.isDir()) {
+ final Path metadataFilePath = new Path(path, METADATA_FILE_NAME);
+ try {
+ metadataFileStatus = fs.getFileStatus(metadataFilePath);
+ }
+ catch (FileNotFoundException e) {
+ throw new FileNotFoundException("Cannot find meta data file '" + METADATA_FILE_NAME +
+ "' in directory '" + path + "'. Please try to load the checkpoint/savepoint " +
+ "directly from the metadata file instead of the directory.");
+ }
+ }
+ else {
+ // this points to a file and we either do no name validation, or
+ // the name is actually correct, so we can return the path
+ metadataFileStatus = status;
+ }
+
+ return new FileStateHandle(metadataFileStatus.getPath(), metadataFileStatus.getLen());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FixFileFsStateOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FixFileFsStateOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FixFileFsStateOutputStream.java
new file mode 100644
index 0000000..bc1ca01
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FixFileFsStateOutputStream.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link CheckpointStateOutputStream} that writes into a specified file and
+ * returns a {@link FileStateHandle} upon closing.
+ *
+ * <p>Unlike the {@link org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream},
+ * this stream does not have a threshold below which it returns a memory byte stream handle,
+ * and does not create random files, but writes to a specified file.
+ */
+public final class FixFileFsStateOutputStream extends CheckpointStateOutputStream {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FixFileFsStateOutputStream.class);
+
+ // ------------------------------------------------------------------------
+
+ private final FSDataOutputStream out;
+
+ private final Path path;
+
+ private final FileSystem fileSystem;
+
+ private volatile boolean closed;
+
+
+ public FixFileFsStateOutputStream(FileSystem fileSystem, Path path) throws IOException {
+ this.fileSystem = checkNotNull(fileSystem);
+ this.path = checkNotNull(path);
+
+ this.out = fileSystem.create(path, WriteMode.NO_OVERWRITE);
+ }
+
+ // ------------------------------------------------------------------------
+ // I/O
+ // ------------------------------------------------------------------------
+
+ @Override
+ public final void write(int b) throws IOException {
+ out.write(b);
+ }
+
+ @Override
+ public final void write(@Nonnull byte[] b, int off, int len) throws IOException {
+ out.write(b, off, len);
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return out.getPos();
+ }
+
+ @Override
+ public void flush() throws IOException {
+ out.flush();
+ }
+
+ @Override
+ public void sync() throws IOException {
+ out.sync();
+ }
+
+ // ------------------------------------------------------------------------
+ // Closing
+ // ------------------------------------------------------------------------
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ @Override
+ public void close() {
+ if (!closed) {
+ closed = true;
+
+ try {
+ out.close();
+ fileSystem.delete(path, false);
+ }
+ catch (Throwable t) {
+ LOG.warn("Could not close the state stream for {}.", path, t);
+ }
+ }
+ }
+
+ @Override
+ public FileStateHandle closeAndGetHandle() throws IOException {
+ synchronized (this) {
+ if (!closed) {
+ try {
+ // make a best effort attempt to figure out the size
+ long size = 0;
+ try {
+ size = out.getPos();
+ } catch (Exception ignored) {}
+
+ // close and return
+ out.close();
+
+ return new FileStateHandle(path, size);
+ }
+ catch (Exception e) {
+ try {
+ fileSystem.delete(path, false);
+ }
+ catch (Exception deleteException) {
+ LOG.warn("Could not delete the checkpoint stream file {}.", path, deleteException);
+ }
+
+ throw new IOException("Could not flush and close the file system " +
+ "output stream to " + path + " in order to obtain the " +
+ "stream state handle", e);
+ }
+ finally {
+ closed = true;
+ }
+ }
+ else {
+ throw new IOException("Stream has already been closed and discarded.");
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
new file mode 100644
index 0000000..b7be8fa
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * An implementation of durable checkpoint storage to file systems.
+ */
+public class FsCheckpointStorage extends AbstractFsCheckpointStorage {
+
+ private final FileSystem fileSystem;
+
+ private final Path checkpointsDirectory;
+
+ private final Path sharedStateDirectory;
+
+ private final Path taskOwnedStateDirectory;
+
+ public FsCheckpointStorage(
+ Path checkpointBaseDirectory,
+ @Nullable Path defaultSavepointDirectory,
+ JobID jobId) throws IOException {
+
+ super(jobId, defaultSavepointDirectory);
+
+ this.fileSystem = checkpointBaseDirectory.getFileSystem();
+ this.checkpointsDirectory = getCheckpointDirectoryForJob(checkpointBaseDirectory, jobId);
+ this.sharedStateDirectory = new Path(checkpointsDirectory, CHECKPOINT_SHARED_STATE_DIR);
+ this.taskOwnedStateDirectory = new Path(checkpointsDirectory, CHECKPOINT_TASK_OWNED_STATE_DIR);
+
+ // initialize the dedicated directories
+ fileSystem.mkdirs(checkpointsDirectory);
+ fileSystem.mkdirs(sharedStateDirectory);
+ fileSystem.mkdirs(taskOwnedStateDirectory);
+ }
+
+ // ------------------------------------------------------------------------
+ // CheckpointStorage implementation
+ // ------------------------------------------------------------------------
+
+ @Override
+ public boolean supportsHighlyAvailableStorage() {
+ return true;
+ }
+
+ @Override
+ public CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException {
+ checkArgument(checkpointId >= 0);
+
+ // prepare all the paths needed for the checkpoints
+ final Path checkpointDir = createCheckpointDirectory(checkpointsDirectory, checkpointId);
+
+ // create the checkpoint exclusive directory
+ fileSystem.mkdirs(checkpointDir);
+
+ return new FsCheckpointStorageLocation(
+ fileSystem, checkpointDir, sharedStateDirectory, taskOwnedStateDirectory);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.java
new file mode 100644
index 0000000..829ab9a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
+import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A storage location for checkpoints on a file system.
+ */
+public class FsCheckpointStorageLocation implements CheckpointStorageLocation {
+
+ private final FileSystem fileSystem;
+
+ private final Path checkpointDirectory;
+
+ private final Path sharedStateDirectory;
+
+ private final Path taskOwnedStateDirectory;
+
+ private final Path metadataFilePath;
+
+ private final String qualifiedCheckpointDirectory;
+
+ public FsCheckpointStorageLocation(
+ FileSystem fileSystem,
+ Path checkpointDir,
+ Path sharedStateDir,
+ Path taskOwnedStateDir) {
+
+ this.fileSystem = checkNotNull(fileSystem);
+ this.checkpointDirectory = checkNotNull(checkpointDir);
+ this.sharedStateDirectory = checkNotNull(sharedStateDir);
+ this.taskOwnedStateDirectory = checkNotNull(taskOwnedStateDir);
+
+ this.metadataFilePath = new Path(checkpointDir, AbstractFsCheckpointStorage.METADATA_FILE_NAME);
+
+ this.qualifiedCheckpointDirectory = checkpointDir.makeQualified(fileSystem).toString();
+ }
+
+ // ------------------------------------------------------------------------
+ // Properties
+ // ------------------------------------------------------------------------
+
+ public Path getCheckpointDirectory() {
+ return checkpointDirectory;
+ }
+
+ public Path getSharedStateDirectory() {
+ return sharedStateDirectory;
+ }
+
+ public Path getTaskOwnedStateDirectory() {
+ return taskOwnedStateDirectory;
+ }
+
+ public Path getMetadataFilePath() {
+ return metadataFilePath;
+ }
+
+ // ------------------------------------------------------------------------
+ // checkpoint metadata
+ // ------------------------------------------------------------------------
+
+ @Override
+ public CheckpointStateOutputStream createMetadataOutputStream() throws IOException {
+ return new FixFileFsStateOutputStream(fileSystem, metadataFilePath);
+ }
+
+ @Override
+ public String markCheckpointAsFinished() throws IOException {
+ return qualifiedCheckpointDirectory;
+ }
+
+ @Override
+ public void disposeOnFailure() throws IOException {
+ // on a failure, no chunk in the checkpoint directory needs to be saved, so
+ // we can drop it as a whole
+ fileSystem.delete(checkpointDirectory, true);
+ }
+
+ @Override
+ public String getLocationAsPointer() {
+ return qualifiedCheckpointDirectory;
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ @Override
+ public String toString() {
+ return "FsCheckpointStorageLocation {" +
+ "metadataFilePath=" + metadataFilePath +
+ ", taskOwnedStateDirectory=" + taskOwnedStateDirectory +
+ ", sharedStateDirectory=" + sharedStateDirectory +
+ ", checkpointDirectory=" + checkpointDirectory +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index 2fff45a..de49552 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -28,6 +28,7 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.ConfigurableStateBackend;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
@@ -434,6 +435,12 @@ public class FsStateBackend extends AbstractFileStateBackend implements Configur
// ------------------------------------------------------------------------
@Override
+ public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException {
+ checkNotNull(jobId, "jobId");
+ return new FsCheckpointStorage(getCheckpointPath(), getSavepointPath(), jobId);
+ }
+
+ @Override
public CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException {
return new FsCheckpointStreamFactory(getCheckpointPath(), jobId, getMinFileSizeThreshold());
}
@@ -447,6 +454,10 @@ public class FsStateBackend extends AbstractFileStateBackend implements Configur
return new FsSavepointStreamFactory(new Path(targetLocation), jobId, getMinFileSizeThreshold());
}
+ // ------------------------------------------------------------------------
+ // state holding structures
+ // ------------------------------------------------------------------------
+
@Override
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorage.java
new file mode 100644
index 0000000..3fb2627
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorage.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.memory;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
+import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An implementation of a checkpoint storage for the {@link MemoryStateBackend}.
+ * Depending on whether this is created with a checkpoint location, the setup supports
+ * durable checkpoints (durable metadata) or not.
+ */
+public class MemoryBackendCheckpointStorage extends AbstractFsCheckpointStorage {
+
+ /** The target directory for checkpoints (here metadata files only). Null, if not configured. */
+ @Nullable
+ private final Path checkpointsDirectory;
+
+ /** The file system to persist the checkpoints to. Null if this does not durably persist checkpoints. */
+ @Nullable
+ private final FileSystem fileSystem;
+
+ /**
+ * Creates a new MemoryBackendCheckpointStorage. The storage neither persists checkpoints
+ * in the filesystem, nor does it have a default savepoint location. The storage does support
+ * savepoints, though, when an explicit savepoint location is passed to
+ * {@link #initializeLocationForSavepoint(long, String)}.
+ *
+ * @param jobId The ID of the job writing the checkpoints.
+ */
+ public MemoryBackendCheckpointStorage(JobID jobId) {
+ super(jobId, null);
+ checkpointsDirectory = null;
+ fileSystem = null;
+ }
+
+ /**
+ * Creates a new MemoryBackendCheckpointStorage.
+ *
+ * @param jobId The ID of the job writing the checkpoints.
+ * @param checkpointsBaseDirectory The directory to write checkpoints to. May be null,
+ * in which case this storage does not support durable persistence.
+ * @param defaultSavepointLocation The default savepoint directory, or null, if none is set.
+ *
+ * @throws IOException Thrown if a checkpoint base directory is given configured and the
+ * checkpoint directory cannot be created within that directory.
+ */
+ public MemoryBackendCheckpointStorage(
+ JobID jobId,
+ @Nullable Path checkpointsBaseDirectory,
+ @Nullable Path defaultSavepointLocation) throws IOException {
+
+ super(jobId, defaultSavepointLocation);
+
+ if (checkpointsBaseDirectory == null) {
+ checkpointsDirectory = null;
+ fileSystem = null;
+ }
+ else {
+ this.fileSystem = checkpointsBaseDirectory.getFileSystem();
+ this.checkpointsDirectory = getCheckpointDirectoryForJob(checkpointsBaseDirectory, jobId);
+
+ fileSystem.mkdirs(checkpointsDirectory);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Checkpoint Storage
+ // ------------------------------------------------------------------------
+
+ @Override
+ public boolean supportsHighlyAvailableStorage() {
+ return checkpointsDirectory != null;
+ }
+
+ @Override
+ public CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException {
+ checkArgument(checkpointId >= 0);
+
+ if (checkpointsDirectory != null) {
+ // configured for durable metadata
+ // prepare all the paths needed for the checkpoints
+ checkState(fileSystem != null);
+
+ final Path checkpointDir = createCheckpointDirectory(checkpointsDirectory, checkpointId);
+
+ // create the checkpoint exclusive directory
+ fileSystem.mkdirs(checkpointDir);
+
+ return new PersistentMetadataCheckpointStorageLocation(fileSystem, checkpointDir);
+ }
+ else {
+ // no durable metadata - typical in IDE or test setup case
+ return new NonPersistentMetadataCheckpointStorageLocation();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ @Override
+ public String toString() {
+ return getClass().getName() + " - " +
+ (checkpointsDirectory == null ? "(not persistent)" : checkpointsDirectory) +
+ ", default savepoint dir: " +
+ (getDefaultSavepointDirectory() == null ? "(none)" : getDefaultSavepointDirectory());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index 2079a97..afcf9a8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -27,6 +27,7 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.ConfigurableStateBackend;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
@@ -278,14 +279,8 @@ public class MemoryStateBackend extends AbstractFileStateBackend implements Conf
// ------------------------------------------------------------------------
@Override
- public OperatorStateBackend createOperatorStateBackend(
- Environment env,
- String operatorIdentifier) throws Exception {
-
- return new DefaultOperatorStateBackend(
- env.getUserClassLoader(),
- env.getExecutionConfig(),
- isUsingAsynchronousSnapshots());
+ public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException {
+ return new MemoryBackendCheckpointStorage(jobId, getCheckpointPath(), getSavepointPath());
}
@Override
@@ -304,10 +299,21 @@ public class MemoryStateBackend extends AbstractFileStateBackend implements Conf
}
// ------------------------------------------------------------------------
- // checkpoint state persistence
+ // state holding structures
// ------------------------------------------------------------------------
@Override
+ public OperatorStateBackend createOperatorStateBackend(
+ Environment env,
+ String operatorIdentifier) throws Exception {
+
+ return new DefaultOperatorStateBackend(
+ env.getUserClassLoader(),
+ env.getExecutionConfig(),
+ isUsingAsynchronousSnapshots());
+ }
+
+ @Override
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env, JobID jobID,
String operatorIdentifier,
http://git-wip-us.apache.org/repos/asf/flink/blob/edc6f100/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/NonPersistentMetadataCheckpointStorageLocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/NonPersistentMetadataCheckpointStorageLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/NonPersistentMetadataCheckpointStorageLocation.java
new file mode 100644
index 0000000..3baa319
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/NonPersistentMetadataCheckpointStorageLocation.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.memory;
+
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
+import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory.MemoryCheckpointOutputStream;
+
+import java.io.IOException;
+
+/**
+ * A checkpoint storage location for the {@link MemoryStateBackend} in case no durable persistence
+ * for metadata has been configured.
+ */
+public class NonPersistentMetadataCheckpointStorageLocation implements CheckpointStorageLocation {
+
+ /** The external pointer returned for checkpoints that are not externally addressable. */
+ public static final String EXTERNAL_POINTER = "<checkpoint-not-externally-addressable>";
+
+ /** The maximum serialized state size for the checkpoint metadata. */
+ private static final int MAX_METADATA_STATE_SIZE = Integer.MAX_VALUE;
+
+ @Override
+ public CheckpointStateOutputStream createMetadataOutputStream() throws IOException {
+ return new MemoryCheckpointOutputStream(MAX_METADATA_STATE_SIZE);
+ }
+
+ @Override
+ public String markCheckpointAsFinished() {
+ return EXTERNAL_POINTER;
+ }
+
+ @Override
+ public void disposeOnFailure() {}
+
+ @Override
+ public String getLocationAsPointer() {
+ return PersistentMetadataCheckpointStorageLocation.LOCATION_POINTER;
+ }
+}