You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/01/28 18:27:07 UTC

[GitHub] [flink] rkhachatryan commented on a change in pull request #13912: [FLINK-19466][FLINK-19467][runtime / state backends] Add Flip-142 public interfaces and methods

rkhachatryan commented on a change in pull request #13912:
URL: https://github.com/apache/flink/pull/13912#discussion_r566247386



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
##########
@@ -24,19 +24,46 @@
 import java.io.IOException;
 
 /**
- * CheckpointStorage defines how checkpoint snapshots are persisted for fault tolerance. Various
- * implementations store their checkpoints in different fashions and have different requirements and
- * availability guarantees.
+ * CheckpointStorage defines how {@link StateBackend}'s checkpoint their state for fault tolerance

Review comment:
       checkpoint their state -> store their state?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
##########
@@ -24,19 +24,46 @@
 import java.io.IOException;
 
 /**
- * CheckpointStorage defines how checkpoint snapshots are persisted for fault tolerance. Various
- * implementations store their checkpoints in different fashions and have different requirements and
- * availability guarantees.
+ * CheckpointStorage defines how {@link StateBackend}'s checkpoint their state for fault tolerance
+ * in streaming applications. Various implementations store their checkpoints in different fashions
+ * and have different requirements and availability guarantees.
  *
- * <p>For example, JobManagerCheckpointStorage stores checkpoints in the memory of the JobManager.
- * It is lightweight and without additional dependencies but is not highly available and only
- * supports small state sizes. This checkpoint storage policy is convenient for local testing and
+ * <p>For example, {@link org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage
+ * JobManagerCheckpointStorage} stores checkpoints in the memory of the JobManager. It is
+ * lightweight and without additional dependencies but is not highly available and only supports

Review comment:
       IIUC, high availability can still be achieved with `JobManagerCheckpointStorage`. What can't be achieved is scalability.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/storage/AbstractFileCheckpointStorage.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.storage;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
+import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * A base class for all checkpoint storage instances that store their metadata (and data) in files.
+ *
+ * <p>This class takes the base checkpoint- and savepoint directory paths, but also accepts null for
+ * both of then, in which case creating externalized checkpoint is not possible, and it is not
+ * possible to create a savepoint with a default path. Null is accepted to enable implementations
+ * that only optionally support default savepoints.
+ *
+ * <h1>Checkpoint Layout</h1>
+ *
+ * <p>The checkpoint storage is configured with a base directory and persists the checkpoint data of
+ * specific checkpoints in specific subdirectories. For example, if the base directory was set to
+ * {@code hdfs://namenode:port/flink-checkpoints/}, the state backend will create a subdirectory
+ * with the job's ID that will contain the actual checkpoints: ({@code
+ * hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b})
+ *
+ * <p>Each checkpoint individually will store all its files in a subdirectory that includes the
+ * checkpoint number, such as {@code
+ * hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b/chk-17/}.
+ *
+ * <h1>Savepoint Layout</h1>
+ *
+ * <p>A savepoint that is set to be stored in path {@code hdfs://namenode:port/flink-savepoints/},
+ * will create a subdirectory {@code savepoint-jobId(0, 6)-randomDigits} in which it stores all
+ * savepoint data. The random digits are added as "entropy" to avoid directory collisions.
+ *
+ * <h1>Metadata File</h1>
+ *
+ * <p>A completed checkpoint writes its metadata into a file '{@value
+ * AbstractFsCheckpointStorageAccess#METADATA_FILE_NAME}'.
+ */
+@PublicEvolving
+public abstract class AbstractFileCheckpointStorage implements CheckpointStorage {
+
+    private static final long serialVersionUID = 1L;
+
+    // ------------------------------------------------------------------------
+    //  Checkpoint Storage Properties
+    // ------------------------------------------------------------------------
+
+    /** The path where checkpoints will be stored, or null, if none has been configured. */
+    @Nullable private Path baseCheckpointPath;
+
+    /** The path where savepoints will be stored, or null, if none has been configured. */
+    @Nullable private Path baseSavepointPath;
+
+    @Override
+    public CompletedCheckpointStorageLocation resolveCheckpoint(String pointer) throws IOException {
+        return AbstractFsCheckpointStorageAccess.resolveCheckpointPointer(pointer);
+    }
+
+    /**
+     * Gets the directory where savepoints are stored by default (when no custom path is given to
+     * the savepoint trigger command).
+     *
+     * @return The default directory for savepoints, or null, if no default directory has been
+     *     configured.
+     */
+    @Nullable
+    public Path getSavepointPath() {
+        return baseSavepointPath;
+    }
+
+    /** @param baseSavepointPath The base directory for savepoints. */
+    public void setSavepointPath(String baseSavepointPath) {
+        setSavepointPath(new Path(baseSavepointPath));
+    }
+
+    /** @param baseSavepointPath The base directory for savepoints. */
+    public void setSavepointPath(@Nullable Path baseSavepointPath) {
+        this.baseSavepointPath = baseSavepointPath == null ? null : validatePath(baseSavepointPath);
+    }
+
+    /**
+     * Sets the given savepoint directory, or the values defined in the given configuration. If a
+     * checkpoint parameter is not null, that value takes precedence over the value in the
+     * configuration. If the configuration does not specify a value, it is possible that the
+     * savepoint directory in the savepoint storage will be null.
+     *
+     * @param baseSavepointPath The checkpoint base directory to use (or null).
+     * @param config The configuration to read values from.
+     */
+    public void setSavepointPath(Path baseSavepointPath, ReadableConfig config) {
+        this.baseSavepointPath =
+                parameterOrConfigured(
+                        baseSavepointPath, config, CheckpointingOptions.SAVEPOINT_DIRECTORY);
+    }
+
+    /**
+     * Gets the directory where savepoints are stored by default (when no custom path is given to
+     * the savepoint trigger command).
+     *
+     * @return The default directory for savepoints, or null, if no default directory has been
+     *     configured.
+     */
+    @Nullable
+    public Path getCheckpointPath() {
+        return baseCheckpointPath;
+    }
+
+    /** @param baseCheckpointPath The base directory for checkpoints. */
+    public void setCheckpointPath(String baseCheckpointPath) {
+        setCheckpointPath(new Path(baseCheckpointPath));
+    }
+
+    /** @param baseCheckpointPath The base directory for checkpoints. */
+    public void setCheckpointPath(@Nullable Path baseCheckpointPath) {
+        this.baseCheckpointPath =
+                baseCheckpointPath == null ? null : validatePath(baseCheckpointPath);
+    }
+
+    /**
+     * Sets the given checkpoint directory, or the values defined in the given configuration. If a
+     * checkpoint parameter is not null, that value takes precedence over the value in the
+     * configuration. If the configuration does not specify a value, it is possible that the
+     * checkpoint directory in the checkpoint storage will be null.
+     *
+     * @param baseCheckpointPath The checkpoint base directory to use (or null).
+     * @param config The configuration to read values from.
+     */
+    public void setCheckpointPath(Path baseCheckpointPath, ReadableConfig config) {
+        this.baseCheckpointPath =
+                parameterOrConfigured(
+                        baseCheckpointPath, config, CheckpointingOptions.CHECKPOINTS_DIRECTORY);
+    }
+
+    // ------------------------------------------------------------------------
+    //  Utilities
+    // ------------------------------------------------------------------------
+
+    /**
+     * Checks the validity of the path's scheme and path.
+     *
+     * @param path The path to check.
+     * @return The URI as a Path.
+     * @throws IllegalArgumentException Thrown, if the URI misses scheme or path.
+     */
+    protected static Path validatePath(Path path) {
+        final URI uri = path.toUri();
+        final String scheme = uri.getScheme();
+        final String pathPart = uri.getPath();
+
+        // some validity checks
+        if (scheme == null) {
+            throw new IllegalArgumentException(
+                    "The scheme (hdfs://, file://, etc) is null. "
+                            + "Please specify the file system scheme explicitly in the URI.");
+        }
+        if (pathPart == null) {
+            throw new IllegalArgumentException(
+                    "The path to store the checkpoint data in is null. "
+                            + "Please specify a directory path for the checkpoint data.");
+        }
+        if (pathPart.length() == 0 || pathPart.equals("/")) {
+            throw new IllegalArgumentException("Cannot use the root directory for checkpoints.");
+        }
+
+        return path;
+    }
+
+    @Nullable
+    protected static Path parameterOrConfigured(

Review comment:
       How about moving this method to `ReadableConfig` (replacing `Path` type with `T` and adding `Function<String, T>` argument)?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/storage/AbstractFileCheckpointStorage.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.storage;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
+import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * A base class for all checkpoint storage instances that store their metadata (and data) in files.
+ *
+ * <p>This class takes the base checkpoint- and savepoint directory paths, but also accepts null for
+ * both of then, in which case creating externalized checkpoint is not possible, and it is not
+ * possible to create a savepoint with a default path. Null is accepted to enable implementations
+ * that only optionally support default savepoints.
+ *
+ * <h1>Checkpoint Layout</h1>
+ *
+ * <p>The checkpoint storage is configured with a base directory and persists the checkpoint data of
+ * specific checkpoints in specific subdirectories. For example, if the base directory was set to
+ * {@code hdfs://namenode:port/flink-checkpoints/}, the state backend will create a subdirectory
+ * with the job's ID that will contain the actual checkpoints: ({@code
+ * hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b})
+ *
+ * <p>Each checkpoint individually will store all its files in a subdirectory that includes the
+ * checkpoint number, such as {@code
+ * hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b/chk-17/}.
+ *
+ * <h1>Savepoint Layout</h1>
+ *
+ * <p>A savepoint that is set to be stored in path {@code hdfs://namenode:port/flink-savepoints/},
+ * will create a subdirectory {@code savepoint-jobId(0, 6)-randomDigits} in which it stores all
+ * savepoint data. The random digits are added as "entropy" to avoid directory collisions.
+ *
+ * <h1>Metadata File</h1>
+ *
+ * <p>A completed checkpoint writes its metadata into a file '{@value
+ * AbstractFsCheckpointStorageAccess#METADATA_FILE_NAME}'.
+ */
+@PublicEvolving
+public abstract class AbstractFileCheckpointStorage implements CheckpointStorage {
+
+    private static final long serialVersionUID = 1L;
+
+    // ------------------------------------------------------------------------
+    //  Checkpoint Storage Properties
+    // ------------------------------------------------------------------------
+
+    /** The path where checkpoints will be stored, or null, if none has been configured. */
+    @Nullable private Path baseCheckpointPath;
+
+    /** The path where savepoints will be stored, or null, if none has been configured. */
+    @Nullable private Path baseSavepointPath;
+
+    @Override
+    public CompletedCheckpointStorageLocation resolveCheckpoint(String pointer) throws IOException {
+        return AbstractFsCheckpointStorageAccess.resolveCheckpointPointer(pointer);
+    }
+
+    /**
+     * Gets the directory where savepoints are stored by default (when no custom path is given to
+     * the savepoint trigger command).
+     *
+     * @return The default directory for savepoints, or null, if no default directory has been
+     *     configured.
+     */
+    @Nullable
+    public Path getSavepointPath() {
+        return baseSavepointPath;
+    }
+
+    /** @param baseSavepointPath The base directory for savepoints. */
+    public void setSavepointPath(String baseSavepointPath) {
+        setSavepointPath(new Path(baseSavepointPath));
+    }
+
+    /** @param baseSavepointPath The base directory for savepoints. */
+    public void setSavepointPath(@Nullable Path baseSavepointPath) {
+        this.baseSavepointPath = baseSavepointPath == null ? null : validatePath(baseSavepointPath);
+    }
+
+    /**
+     * Sets the given savepoint directory, or the values defined in the given configuration. If a
+     * checkpoint parameter is not null, that value takes precedence over the value in the
+     * configuration. If the configuration does not specify a value, it is possible that the
+     * savepoint directory in the savepoint storage will be null.
+     *
+     * @param baseSavepointPath The checkpoint base directory to use (or null).
+     * @param config The configuration to read values from.
+     */
+    public void setSavepointPath(Path baseSavepointPath, ReadableConfig config) {
+        this.baseSavepointPath =
+                parameterOrConfigured(
+                        baseSavepointPath, config, CheckpointingOptions.SAVEPOINT_DIRECTORY);
+    }
+
+    /**
+     * Gets the directory where savepoints are stored by default (when no custom path is given to
+     * the savepoint trigger command).
+     *
+     * @return The default directory for savepoints, or null, if no default directory has been
+     *     configured.
+     */
+    @Nullable
+    public Path getCheckpointPath() {
+        return baseCheckpointPath;
+    }
+
+    /** @param baseCheckpointPath The base directory for checkpoints. */
+    public void setCheckpointPath(String baseCheckpointPath) {
+        setCheckpointPath(new Path(baseCheckpointPath));
+    }
+
+    /** @param baseCheckpointPath The base directory for checkpoints. */
+    public void setCheckpointPath(@Nullable Path baseCheckpointPath) {
+        this.baseCheckpointPath =
+                baseCheckpointPath == null ? null : validatePath(baseCheckpointPath);
+    }
+
+    /**
+     * Sets the given checkpoint directory, or the values defined in the given configuration. If a
+     * checkpoint parameter is not null, that value takes precedence over the value in the
+     * configuration. If the configuration does not specify a value, it is possible that the
+     * checkpoint directory in the checkpoint storage will be null.
+     *
+     * @param baseCheckpointPath The checkpoint base directory to use (or null).
+     * @param config The configuration to read values from.
+     */
+    public void setCheckpointPath(Path baseCheckpointPath, ReadableConfig config) {
+        this.baseCheckpointPath =
+                parameterOrConfigured(
+                        baseCheckpointPath, config, CheckpointingOptions.CHECKPOINTS_DIRECTORY);
+    }
+
+    // ------------------------------------------------------------------------
+    //  Utilities
+    // ------------------------------------------------------------------------
+
+    /**
+     * Checks the validity of the path's scheme and path.
+     *
+     * @param path The path to check.
+     * @return The URI as a Path.
+     * @throws IllegalArgumentException Thrown, if the URI misses scheme or path.
+     */
+    protected static Path validatePath(Path path) {

Review comment:
       How about moving this method is the `Path` class itself?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
##########
@@ -24,19 +24,46 @@
 import java.io.IOException;
 
 /**
- * CheckpointStorage defines how checkpoint snapshots are persisted for fault tolerance. Various
- * implementations store their checkpoints in different fashions and have different requirements and
- * availability guarantees.
+ * CheckpointStorage defines how {@link StateBackend}'s checkpoint their state for fault tolerance
+ * in streaming applications. Various implementations store their checkpoints in different fashions
+ * and have different requirements and availability guarantees.
  *
- * <p>For example, JobManagerCheckpointStorage stores checkpoints in the memory of the JobManager.
- * It is lightweight and without additional dependencies but is not highly available and only
- * supports small state sizes. This checkpoint storage policy is convenient for local testing and
+ * <p>For example, {@link org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage
+ * JobManagerCheckpointStorage} stores checkpoints in the memory of the JobManager. It is
+ * lightweight and without additional dependencies but is not highly available and only supports
+ * small state sizes. This checkpoint storage policy is convenient for local testing and
  * development.
  *
- * <p>FileSystemCheckpointStorage stores checkpoints in a filesystem. For systems like HDFS, NFS
+ * <p>{@link org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage
+ * FileSystemCheckpointStorage} stores checkpoints in a filesystem. For systems like HDFS, NFS
  * Drives, S3, and GCS, this storage policy supports large state size, in the magnitude of many
  * terabytes while providing a highly available foundation for stateful applications. This
  * checkpoint storage policy is recommended for most production deployments.
+ *
+ * <h2>Raw Bytes Storage</h2>
+ *
+ * <p>The {@code CheckpointStorage} creates services for <i>raw bytes storage</i>.
+ *
+ * <p>The <i>raw bytes storage</i> (through the {@link CheckpointStreamFactory}) is the fundamental
+ * service that simply stores bytes in a fault tolerant fashion. This service is used by the
+ * JobManager to store checkpoint and recovery metadata and is typically also used by the keyed- and
+ * operator state backends to store checkpointed state.
+ *
+ * <h2>Serializability</h2>
+ *
+ * <p>State Backends need to be {@link java.io.Serializable serializable}, because they distributed
+ * across parallel processes (for distributed execution) together with the streaming application
+ * code.
+ *
+ * <p>Because of that, {@code CheckpointStorage} implementations are meant to be like
+ * <i>factories</i> that create the proper states stores that provide access to the persistent. That
+ * way, the Checkpoint Storage can be very lightweight (contain only configurations) which makes it
+ * easier to be serializable.
+ *
+ * <h2>Thread Safety</h2>
+ *
+ * <p>Checkpoint storage implementations have to be thread-safe. Multiple threads may be creating
+ * streams concurrently.
  */
 @PublicEvolving
 public interface CheckpointStorage extends java.io.Serializable {

Review comment:
       I think `resolveCheckpoint` javadoc is outdated as it 
   1. mentions state backends and 
   2. implies some snapshot validation (while all it does is only opening a file or something like that). 

##########
File path: flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
##########
@@ -32,17 +32,15 @@
     public static final ConfigOption<String> STATE_BACKEND =
             ConfigOptions.key("state.backend")
                     .noDefaultValue()
-                    .withDescription("The state backend to be used to store and checkpoint state.");
+                    .withDescription("The state backend to be used to store state.");
 
     /** The checkpoint storage used to checkpoint state. */
     @Documentation.Section(value = Documentation.Sections.COMMON_STATE_BACKENDS, position = 2)
-    @Documentation.ExcludeFromDocumentation(
-            "Hidden until FileSystemStorage and JobManagerStorage are implemented")
     public static final ConfigOption<String> CHECKPOINT_STORAGE =
             ConfigOptions.key("state.checkpoint-storage")
                     .stringType()
                     .noDefaultValue()
-                    .withDescription("The state backend to be used to checkpoint state.");
+                    .withDescription("The checkpoint storage to be used to checkpoint state.");

Review comment:
       I'd copy or move here (and to the javadoc) the part how to configure this option from 
    `CheckpointStorageLoader.fromConfig` javadoc. It talks about shortcuts, class name, etc. which might be relevant here.
   
   There, should "state backends" be replaced with "implementations"?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/storage/AbstractFileCheckpointStorage.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.storage;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
+import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * A base class for all checkpoint storage instances that store their metadata (and data) in files.
+ *
+ * <p>This class takes the base checkpoint- and savepoint directory paths, but also accepts null for
+ * both of then, in which case creating externalized checkpoint is not possible, and it is not

Review comment:
       ```suggestion
    * both of them, in which case creating externalized checkpoint is not possible, and it is not
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java
##########
@@ -162,6 +178,13 @@ public static CheckpointStorage load(
         Preconditions.checkNotNull(classLoader, "classLoader");
         Preconditions.checkNotNull(configuredStateBackend, "statebackend");
 
+        if (defaultSavepointDirectory != null) {
+            config.set(
+                    CheckpointingOptions.SAVEPOINT_DIRECTORY, defaultSavepointDirectory.toString());

Review comment:
       Could you add a brief motivation for this change to the commit message?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
##########
@@ -24,19 +24,46 @@
 import java.io.IOException;
 
 /**
- * CheckpointStorage defines how checkpoint snapshots are persisted for fault tolerance. Various
- * implementations store their checkpoints in different fashions and have different requirements and
- * availability guarantees.
+ * CheckpointStorage defines how {@link StateBackend}'s checkpoint their state for fault tolerance
+ * in streaming applications. Various implementations store their checkpoints in different fashions
+ * and have different requirements and availability guarantees.
  *
- * <p>For example, JobManagerCheckpointStorage stores checkpoints in the memory of the JobManager.
- * It is lightweight and without additional dependencies but is not highly available and only
- * supports small state sizes. This checkpoint storage policy is convenient for local testing and
+ * <p>For example, {@link org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage
+ * JobManagerCheckpointStorage} stores checkpoints in the memory of the JobManager. It is
+ * lightweight and without additional dependencies but is not highly available and only supports
+ * small state sizes. This checkpoint storage policy is convenient for local testing and
  * development.
  *
- * <p>FileSystemCheckpointStorage stores checkpoints in a filesystem. For systems like HDFS, NFS
+ * <p>{@link org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage
+ * FileSystemCheckpointStorage} stores checkpoints in a filesystem. For systems like HDFS, NFS
  * Drives, S3, and GCS, this storage policy supports large state size, in the magnitude of many
  * terabytes while providing a highly available foundation for stateful applications. This
  * checkpoint storage policy is recommended for most production deployments.
+ *
+ * <h2>Raw Bytes Storage</h2>
+ *
+ * <p>The {@code CheckpointStorage} creates services for <i>raw bytes storage</i>.
+ *
+ * <p>The <i>raw bytes storage</i> (through the {@link CheckpointStreamFactory}) is the fundamental
+ * service that simply stores bytes in a fault tolerant fashion. This service is used by the
+ * JobManager to store checkpoint and recovery metadata and is typically also used by the keyed- and
+ * operator state backends to store checkpointed state.
+ *
+ * <h2>Serializability</h2>
+ *
+ * <p>State Backends need to be {@link java.io.Serializable serializable}, because they distributed

Review comment:
       ```suggestion
    * <p>Implementations need to be {@link java.io.Serializable serializable}, because they are distributed
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java
##########
@@ -84,12 +88,23 @@
 
         switch (storageName.toLowerCase()) {
             case JOB_MANAGER_STORAGE_NAME:
-                throw new UnsupportedOperationException(
-                        "JobManagerCheckpointStorage is not yet implemented");
+                if (logger != null) {

Review comment:
       What do you think about using usual `private static final` logger named by this class (`CheckpointStorageLoader.class`)?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/storage/AbstractFileCheckpointStorage.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.storage;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
+import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * A base class for all checkpoint storage instances that store their metadata (and data) in files.
+ *
+ * <p>This class takes the base checkpoint- and savepoint directory paths, but also accepts null for
+ * both of then, in which case creating externalized checkpoint is not possible, and it is not
+ * possible to create a savepoint with a default path. Null is accepted to enable implementations
+ * that only optionally support default savepoints.
+ *
+ * <h1>Checkpoint Layout</h1>
+ *
+ * <p>The checkpoint storage is configured with a base directory and persists the checkpoint data of
+ * specific checkpoints in specific subdirectories. For example, if the base directory was set to
+ * {@code hdfs://namenode:port/flink-checkpoints/}, the state backend will create a subdirectory
+ * with the job's ID that will contain the actual checkpoints: ({@code
+ * hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b})
+ *
+ * <p>Each checkpoint individually will store all its files in a subdirectory that includes the
+ * checkpoint number, such as {@code
+ * hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b/chk-17/}.
+ *
+ * <h1>Savepoint Layout</h1>
+ *
+ * <p>A savepoint that is set to be stored in path {@code hdfs://namenode:port/flink-savepoints/},
+ * will create a subdirectory {@code savepoint-jobId(0, 6)-randomDigits} in which it stores all
+ * savepoint data. The random digits are added as "entropy" to avoid directory collisions.
+ *
+ * <h1>Metadata File</h1>
+ *
+ * <p>A completed checkpoint writes its metadata into a file '{@value
+ * AbstractFsCheckpointStorageAccess#METADATA_FILE_NAME}'.
+ */
+@PublicEvolving
+public abstract class AbstractFileCheckpointStorage implements CheckpointStorage {

Review comment:
       I made a comment to remove this class (and hierarchy). If we still decide to keep it:
   1. I don't think it should be `@PublicEvolving`
   2. Nor public
   3. Rename `AbstractFileCheckpointStorage` to `AbstractFileSystemCheckpointStorage` for consistency?
   4. The javadoc still refers to state backends sometimes

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/storage/JobManagerCheckpointStorageFactory.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.storage;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.state.CheckpointStorageFactory;
+
+/** A factory that creates an {@link FileSystemCheckpointStorage} from a configuration. */
+@PublicEvolving
+public class JobManagerCheckpointStorageFactory
+        implements CheckpointStorageFactory<JobManagerCheckpointStorage> {
+
+    @Override
+    public JobManagerCheckpointStorage createFromConfig(

Review comment:
       How about moving this method to `JobManagerCheckpointStorage` and making it static?
   (and removing this class)
   
   ditto: `FileSystemCheckpointStorageFactory`

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/storage/JobManagerCheckpointStorage.java
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.storage;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.ConfigurableCheckpointStorage;
+import org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorageAccess;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * The {@link CheckpointStorage} checkpoints state directly to the JobManager's memory (hence the
+ * name), but savepoints will be persisted to a file system.
+ *
+ * <p>This checkpoint storage is primarily for experimentation, quick local setups, or for streaming
+ * applications that have very small state: Because it requires checkpoints to go through the
+ * JobManager's memory, larger state will occupy larger portions of the JobManager's main memory,
+ * reducing operational stability. For any other setup, the {@link FileSystemCheckpointStorage}
+ * should be used. The {@code FileSystemCheckpointStorage} but checkpoints state directly to files
+ * rather than to the JobManager's memory, thus supporting larger state sizes and more highly
+ * available recovery.
+ *
+ * <h1>State Size Considerations</h1>
+ *
+ * <p>State checkpointing with this storage is subject to the following conditions:
+ *
+ * <ul>
+ *   <li>Each individual state must not exceed the configured maximum state size (see {@link
+ *       #getMaxStateSize()}.
+ *   <li>All state from one task (i.e., the sum of all operator states and keyed states from all
+ *       chained operators of the task) must not exceed what the RPC system supports, which is be
+ *       default < 10 MB. That limit can be configured up, but that is typically not advised.
+ *   <li>The sum of all states in the application times all retained checkpoints must comfortably
+ *       fit into the JobManager's JVM heap space.
+ * </ul>
+ *
+ * <h1>Persistence Guarantees</h1>
+ *
+ * <p>For the use cases where the state sizes can be handled by this checkpoint storage, the storage
+ * does guarantee persistence for savepoints, externalized checkpoints (if configured), and
+ * checkpoints (when high-availability is configured).
+ *
+ * <h1>Configuration</h1>
+ *
+ * <p>As for all checkpoint storage, this storage policy can either be configured within the
+ * application (by creating the storage with the respective constructor parameters and setting it on
+ * the execution environment) or by specifying it in the Flink configuration.
+ *
+ * <p>If the checkpoint storage was specified in the application, it may pick up additional
+ * configuration parameters from the Flink configuration. For example, if the storage if configured
+ * in the application without a default savepoint directory, it will pick up a default savepoint
+ * directory specified in the Flink configuration of the running job/cluster. That behavior is
+ * implemented via the {@link #configure(ReadableConfig, ClassLoader)} method.
+ */
+@PublicEvolving
+public class JobManagerCheckpointStorage extends AbstractFileCheckpointStorage
+        implements CheckpointStorage, ConfigurableCheckpointStorage {

Review comment:
       I don't think that inheritance is necessary here. After moving two static methods from `AbstractFileCheckpointStorage`, only two fields an a call to static  `AbstractFsCheckpointStorageAccess.resolveCheckpointPointer` are inherited.
   
   Having JM storage extending FS storage is confusing to me, as TM doesn't actually use FS.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/storage/AbstractFileCheckpointStorage.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.storage;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
+import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * A base class for all checkpoint storage instances that store their metadata (and data) in files.
+ *
+ * <p>This class takes the base checkpoint- and savepoint directory paths, but also accepts null for
+ * both of then, in which case creating externalized checkpoint is not possible, and it is not
+ * possible to create a savepoint with a default path. Null is accepted to enable implementations
+ * that only optionally support default savepoints.
+ *
+ * <h1>Checkpoint Layout</h1>
+ *
+ * <p>The checkpoint storage is configured with a base directory and persists the checkpoint data of
+ * specific checkpoints in specific subdirectories. For example, if the base directory was set to
+ * {@code hdfs://namenode:port/flink-checkpoints/}, the state backend will create a subdirectory
+ * with the job's ID that will contain the actual checkpoints: ({@code
+ * hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b})
+ *
+ * <p>Each checkpoint individually will store all its files in a subdirectory that includes the
+ * checkpoint number, such as {@code
+ * hdfs://namenode:port/flink-checkpoints/1b080b6e710aabbef8993ab18c6de98b/chk-17/}.
+ *
+ * <h1>Savepoint Layout</h1>
+ *
+ * <p>A savepoint that is set to be stored in path {@code hdfs://namenode:port/flink-savepoints/},
+ * will create a subdirectory {@code savepoint-jobId(0, 6)-randomDigits} in which it stores all
+ * savepoint data. The random digits are added as "entropy" to avoid directory collisions.
+ *
+ * <h1>Metadata File</h1>
+ *
+ * <p>A completed checkpoint writes its metadata into a file '{@value
+ * AbstractFsCheckpointStorageAccess#METADATA_FILE_NAME}'.
+ */
+@PublicEvolving
+public abstract class AbstractFileCheckpointStorage implements CheckpointStorage {
+
+    private static final long serialVersionUID = 1L;
+
+    // ------------------------------------------------------------------------
+    //  Checkpoint Storage Properties
+    // ------------------------------------------------------------------------
+
+    /** The path where checkpoints will be stored, or null, if none has been configured. */
+    @Nullable private Path baseCheckpointPath;
+
+    /** The path where savepoints will be stored, or null, if none has been configured. */
+    @Nullable private Path baseSavepointPath;
+
+    @Override
+    public CompletedCheckpointStorageLocation resolveCheckpoint(String pointer) throws IOException {
+        return AbstractFsCheckpointStorageAccess.resolveCheckpointPointer(pointer);
+    }
+
+    /**
+     * Gets the directory where savepoints are stored by default (when no custom path is given to
+     * the savepoint trigger command).
+     *
+     * @return The default directory for savepoints, or null, if no default directory has been
+     *     configured.
+     */
+    @Nullable
+    public Path getSavepointPath() {
+        return baseSavepointPath;
+    }
+
+    /** @param baseSavepointPath The base directory for savepoints. */
+    public void setSavepointPath(String baseSavepointPath) {
+        setSavepointPath(new Path(baseSavepointPath));
+    }
+
+    /** @param baseSavepointPath The base directory for savepoints. */
+    public void setSavepointPath(@Nullable Path baseSavepointPath) {
+        this.baseSavepointPath = baseSavepointPath == null ? null : validatePath(baseSavepointPath);
+    }
+
+    /**
+     * Sets the given savepoint directory, or the values defined in the given configuration. If a
+     * checkpoint parameter is not null, that value takes precedence over the value in the
+     * configuration. If the configuration does not specify a value, it is possible that the
+     * savepoint directory in the savepoint storage will be null.
+     *
+     * @param baseSavepointPath The checkpoint base directory to use (or null).
+     * @param config The configuration to read values from.
+     */
+    public void setSavepointPath(Path baseSavepointPath, ReadableConfig config) {
+        this.baseSavepointPath =
+                parameterOrConfigured(
+                        baseSavepointPath, config, CheckpointingOptions.SAVEPOINT_DIRECTORY);
+    }
+
+    /**
+     * Gets the directory where savepoints are stored by default (when no custom path is given to
+     * the savepoint trigger command).
+     *
+     * @return The default directory for savepoints, or null, if no default directory has been
+     *     configured.
+     */
+    @Nullable
+    public Path getCheckpointPath() {
+        return baseCheckpointPath;
+    }
+
+    /** @param baseCheckpointPath The base directory for checkpoints. */
+    public void setCheckpointPath(String baseCheckpointPath) {
+        setCheckpointPath(new Path(baseCheckpointPath));
+    }
+
+    /** @param baseCheckpointPath The base directory for checkpoints. */
+    public void setCheckpointPath(@Nullable Path baseCheckpointPath) {
+        this.baseCheckpointPath =
+                baseCheckpointPath == null ? null : validatePath(baseCheckpointPath);
+    }
+
+    /**
+     * Sets the given checkpoint directory, or the values defined in the given configuration. If a
+     * checkpoint parameter is not null, that value takes precedence over the value in the
+     * configuration. If the configuration does not specify a value, it is possible that the
+     * checkpoint directory in the checkpoint storage will be null.
+     *
+     * @param baseCheckpointPath The checkpoint base directory to use (or null).
+     * @param config The configuration to read values from.
+     */
+    public void setCheckpointPath(Path baseCheckpointPath, ReadableConfig config) {
+        this.baseCheckpointPath =
+                parameterOrConfigured(
+                        baseCheckpointPath, config, CheckpointingOptions.CHECKPOINTS_DIRECTORY);
+    }
+
+    // ------------------------------------------------------------------------
+    //  Utilities
+    // ------------------------------------------------------------------------
+
+    /**
+     * Checks the validity of the path's scheme and path.
+     *
+     * @param path The path to check.
+     * @return The URI as a Path.
+     * @throws IllegalArgumentException Thrown, if the URI misses scheme or path.
+     */
+    protected static Path validatePath(Path path) {
+        final URI uri = path.toUri();
+        final String scheme = uri.getScheme();
+        final String pathPart = uri.getPath();
+
+        // some validity checks
+        if (scheme == null) {
+            throw new IllegalArgumentException(
+                    "The scheme (hdfs://, file://, etc) is null. "
+                            + "Please specify the file system scheme explicitly in the URI.");
+        }
+        if (pathPart == null) {
+            throw new IllegalArgumentException(
+                    "The path to store the checkpoint data in is null. "
+                            + "Please specify a directory path for the checkpoint data.");
+        }
+        if (pathPart.length() == 0 || pathPart.equals("/")) {
+            throw new IllegalArgumentException("Cannot use the root directory for checkpoints.");
+        }
+
+        return path;
+    }
+
+    @Nullable
+    protected static Path parameterOrConfigured(
+            @Nullable Path path, ReadableConfig config, ConfigOption<String> option) {
+        if (path != null) {
+            return path;
+        } else {
+            String configValue = config.get(option);
+            try {
+                return configValue == null ? null : new Path(configValue);

Review comment:
       nit: `return config.getOptional(option).map(Path::new).orElse(null);` ?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/storage/FileSystemCheckpointStorage.java
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.storage;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CheckpointStorageAccess;
+import org.apache.flink.runtime.state.ConfigurableCheckpointStorage;
+import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageAccess;
+import org.apache.flink.util.MathUtils;
+
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.net.URI;
+
+import static org.apache.flink.configuration.CheckpointingOptions.FS_SMALL_FILE_THRESHOLD;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link FileSystemCheckpointStorage} checkpoints state as files to a file system.
+ *
+ * <p>Each checkpoint individually will store all its files in a subdirectory that includes the
+ * checkpoint number, such as {@code hdfs://namenode:port/flink-checkpoints/chk-17/}.
+ *
+ * <h1>State Size Considerations</h1>
+ *
+ * <p>This checkpoint storage stores small state chunks directly with the metadata, to avoid
+ * creating many small files. The threshold for that is configurable. When increasing this
+ * threshold, the size of the checkpoint metadata increases. The checkpoint metadata of all retained
+ * completed checkpoints needs to fit into the JobManager's heap memory. This is typically not a
+ * problem, unless the threshold {@link #getMinFileSizeThreshold()} is increased significantly.
+ *
+ * <h1>Persistence Guarantees</h1>
+ *
+ * <p>Checkpoints from this checkpoint storage are as persistent and available as filesystem that is
+ * written to. If the file system is a persistent distributed file system, this checkpoint storage
+ * supports highly available setups. The backend additionally supports savepoints and externalized
+ * checkpoints.
+ *
+ * <h1>Configuration</h1>
+ *
+ * <p>As for all checkpoint storage policies, this backend can either be configured within the
+ * application (by creating the backend with the respective constructor parameters and setting it on
+ * the execution environment) or by specifying it in the Flink configuration.
+ *
+ * <p>If the checkpoint storage was specified in the application, it may pick up additional
+ * configuration parameters from the Flink configuration. For example, if the backend if configured
+ * in the application without a default savepoint directory, it will pick up a default savepoint
+ * directory specified in the Flink configuration of the running job/cluster. That behavior is
+ * implemented via the {@link #configure(ReadableConfig, ClassLoader)} method.
+ */
+@PublicEvolving
+public class FileSystemCheckpointStorage extends AbstractFileCheckpointStorage
+        implements CheckpointStorage, ConfigurableCheckpointStorage {
+
+    private static final long serialVersionUID = -8191916350224044011L;
+
+    /** Maximum size of state that is stored with the metadata, rather than in files (1 MiByte). */
+    private static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024;
+
+    // ------------------------------------------------------------------------
+
+    /**
+     * State below this size will be stored as part of the metadata, rather than in files. A value
+     * of '-1' means not yet configured, in which case the default will be used.
+     */
+    private final int fileStateThreshold;
+
+    /**
+     * The write buffer size for created checkpoint stream, this should not be less than file state
+     * threshold when we want state below that threshold stored as part of metadata not files. A
+     * value of '-1' means not yet configured, in which case the default will be used.
+     */
+    private final int writeBufferSize;
+
+    /**
+     * Creates a new checkpoint storage that stores its checkpoint data in the file system and
+     * location defined by the given URI.
+     *
+     * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or
+     * 'S3://') must be accessible via {@link FileSystem#get(URI)}.
+     *
+     * <p>For a Job targeting HDFS, this means that the URI must either specify the authority (host
+     * and port), or that the Hadoop configuration that describes that information must be in the
+     * classpath.
+     *
+     * @param checkpointDirectory The path to write checkpoint metadata to.
+     */
+    public FileSystemCheckpointStorage(String checkpointDirectory) {
+        this(new Path(checkpointDirectory));
+    }
+
+    /**
+     * Creates a new checkpoint storage that stores its checkpoint data in the file system and
+     * location defined by the given URI.
+     *
+     * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or
+     * 'S3://') must be accessible via {@link FileSystem#get(URI)}.
+     *
+     * <p>For a Job targeting HDFS, this means that the URI must either specify the authority (host
+     * and port), or that the Hadoop configuration that describes that information must be in the
+     * classpath.
+     *
+     * @param checkpointDirectory The path to write checkpoint metadata to.
+     */
+    public FileSystemCheckpointStorage(Path checkpointDirectory) {
+        this(checkpointDirectory, -1, -1);
+    }
+
+    /**
+     * Creates a new checkpoint storage that stores its checkpoint data in the file system and
+     * location defined by the given URI.
+     *
+     * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or
+     * 'S3://') must be accessible via {@link FileSystem#get(URI)}.
+     *
+     * <p>For a Job targeting HDFS, this means that the URI must either specify the authority (host
+     * and port), or that the Hadoop configuration that describes that information must be in the
+     * classpath.
+     *
+     * @param checkpointDirectory The path to write checkpoint metadata to.
+     */
+    public FileSystemCheckpointStorage(URI checkpointDirectory) {
+        this(new Path(checkpointDirectory));
+    }
+
+    /**
+     * Creates a new checkpoint storage that stores its checkpoint data in the file system and
+     * location defined by the given URI.
+     *
+     * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or
+     * 'S3://') must be accessible via {@link FileSystem#get(URI)}.
+     *
+     * <p>For a Job targeting HDFS, this means that the URI must either specify the authority (host
+     * and port), or that the Hadoop configuration that describes that information must be in the
+     * classpath.
+     *
+     * @param checkpointDirectory The path to write checkpoint metadata to.
+     * @param fileStateSizeThreshold State below this size will be stored as part of the metadata,
+     *     rather than in files. If -1, the value configured in the runtime configuration will be
+     *     used, or the default value (1KB) if nothing is configured.
+     */
+    public FileSystemCheckpointStorage(URI checkpointDirectory, int fileStateSizeThreshold) {
+        this(new Path(checkpointDirectory), fileStateSizeThreshold, -1);
+    }
+
+    /**
+     * Creates a new checkpoint storage that stores its checkpoint data in the file system and
+     * location defined by the given URI.
+     *
+     * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or
+     * 'S3://') must be accessible via {@link FileSystem#get(URI)}.
+     *
+     * <p>For a Job targeting HDFS, this means that the URI must either specify the authority (host
+     * and port), or that the Hadoop configuration that describes that information must be in the
+     * classpath.
+     *
+     * @param checkpointDirectory The path to write checkpoint metadata to.
+     * @param fileStateSizeThreshold State below this size will be stored as part of the metadata,
+     *     rather than in files. If -1, the value configured in the runtime configuration will be
+     *     used, or the default value (1KB) if nothing is configured.
+     * @param writeBufferSize Write buffer size used to serialize state. If -1, the value configured
+     *     in the runtime configuration will be used, or the default value (4KB) if nothing is
+     *     configured.
+     */
+    public FileSystemCheckpointStorage(
+            Path checkpointDirectory, int fileStateSizeThreshold, int writeBufferSize) {
+
+        checkNotNull(checkpointDirectory, "checkpoint directory is null");
+        checkArgument(
+                fileStateSizeThreshold >= -1 && fileStateSizeThreshold <= MAX_FILE_STATE_THRESHOLD,
+                "The threshold for file state size must be in [-1, %s], where '-1' means to use "
+                        + "the value from the deployment's configuration.",
+                MAX_FILE_STATE_THRESHOLD);
+        checkArgument(
+                writeBufferSize >= -1,
+                "The write buffer size must be not less than '-1', where '-1' means to use "
+                        + "the value from the deployment's configuration.");
+
+        this.fileStateThreshold = fileStateSizeThreshold;
+        this.writeBufferSize = writeBufferSize;
+        setCheckpointPath(checkpointDirectory);
+    }
+
+    /**
+     * Private constructor that creates a re-configured copy of the checkpoint storage.
+     *
+     * @param original The checkpoint storage to re-configure
+     * @param configuration The configuration
+     */
+    private FileSystemCheckpointStorage(
+            FileSystemCheckpointStorage original,
+            ReadableConfig configuration,
+            ClassLoader classLoader) {

Review comment:
       `classLoader` is never used.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org