You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/01/26 05:56:32 UTC

[GitHub] [flink] curcur commented on a change in pull request #13797: [FLINK-19465][runtime / statebackends] Add CheckpointStorage interface and wire through runtime

curcur commented on a change in pull request #13797:
URL: https://github.com/apache/flink/pull/13797#discussion_r563782279



##########
File path: flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
##########
@@ -34,6 +34,16 @@
                     .noDefaultValue()
                     .withDescription("The state backend to be used to store and checkpoint state.");
 
+    /** The checkpoint storage used to checkpoint state. */
+    @Documentation.Section(value = Documentation.Sections.COMMON_STATE_BACKENDS, position = 2)
+    @Documentation.ExcludeFromDocumentation(
+            "Hidden until FileSystemStorage and JobManagerStorage are implemented")
+    public static final ConfigOption<String> CHECKPOINT_STORAGE =
+            ConfigOptions.key("state.checkpoint-storage")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The state backend to be used to checkpoint state.");

Review comment:
       "The state backend to be used to checkpoint state."  ==>
   
   "The checkpoint storage to be used to store checkpoints"

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
##########
@@ -0,0 +1,67 @@
+/*

Review comment:
       Maybe add a "see also" reference of `CheckpointStorage` in the Java doc of `StateBackend`? The `StateBackend` Java Doc is the same as before which mentions "durable persistent storage".

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
##########
@@ -231,15 +233,15 @@ private static void throwNonRestoredStateException(
     // ------------------------------------------------------------------------
 
     public static void disposeSavepoint(
-            String pointer, StateBackend stateBackend, ClassLoader classLoader)
+            String pointer, CheckpointStorage checkpointStorage, ClassLoader classLoader)
             throws IOException, FlinkException {
 
         checkNotNull(pointer, "location");
-        checkNotNull(stateBackend, "stateBackend");
+        checkNotNull(checkpointStorage, "stateBackend");

Review comment:
       "stateBackend" => "checkpoint storage"

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java
##########
@@ -58,28 +60,35 @@
      *     the factory class was not found or the factory could not be instantiated
      * @throws IllegalConfigurationException May be thrown by the CheckpointStorageFactory when
      *     creating / configuring the checkpoint storage in the factory
-     * @throws IOException May be thrown by the CheckpointStorageFactory when instantiating the
-     *     checkpoint storage
      */
-    public static CheckpointStorage loadCheckpointStorageFromConfig(
+    public static Optional<CheckpointStorage> fromConfig(
             ReadableConfig config, ClassLoader classLoader, @Nullable Logger logger)
-            throws IllegalStateException, DynamicCodeLoadingException, IOException {
+            throws IllegalStateException, DynamicCodeLoadingException {
 
         Preconditions.checkNotNull(config, "config");
         Preconditions.checkNotNull(classLoader, "classLoader");
 
         final String storageName = config.get(CheckpointingOptions.CHECKPOINT_STORAGE);
         if (storageName == null) {
-            return null;
+            if (logger != null) {
+                logger.warn(
+                        "The configuration {} has not be set in the current"
+                                + " sessions flink-conf.yaml. Falling back to a default CheckpointStorage"
+                                + " type. Users are strongly encouraged explicitly set this configuration"

Review comment:
       "Users are strongly encouraged explicitly"  =>
   Users are strongly encouraged to explicitly

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.util.DynamicCodeLoadingException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/** This class contains utility methods to load checkpoint storage from configurations. */
+public class CheckpointStorageLoader {
+
+    public static final String JOB_MANAGER_STORAGE_NAME = "jobmanager";
+
+    public static final String FILE_SYSTEM_STORAGE_NAME = "filesystem";
+
+    /**
+     * Loads the checkpoint storage from the configuration, from the parameter
+     * 'state.checkpoint-storage', as defined in {@link CheckpointingOptions#CHECKPOINT_STORAGE}.
+     *
+     * <p>The state backends can be specified either via their shortcut name, or via the class name
+     * of a {@link CheckpointStorageFactory}. If a CheckpointStorageFactory class name is specified,
+     * the factory is instantiated (via its zero-argument constructor) and its {@link
+     * CheckpointStorageFactory#createFromConfig(ReadableConfig, ClassLoader)} method is called.
+     *
+     * <p>Recognized shortcut names are '{@value #JOB_MANAGER_STORAGE_NAME}', and '{@value
+     * #FILE_SYSTEM_STORAGE_NAME}'.
+     *
+     * @param config The configuration to load the checkpoint storage from
+     * @param classLoader The class loader that should be used to load the checkpoint storage
+     * @param logger Optionally, a logger to log actions to (may be null)
+     * @return The instantiated checkpoint storage.
+     * @throws DynamicCodeLoadingException Thrown if a checkpoint storage factory is configured and
+     *     the factory class was not found or the factory could not be instantiated
+     * @throws IllegalConfigurationException May be thrown by the CheckpointStorageFactory when
+     *     creating / configuring the checkpoint storage in the factory
+     * @throws IOException May be thrown by the CheckpointStorageFactory when instantiating the
+     *     checkpoint storage
+     */
+    public static CheckpointStorage loadCheckpointStorageFromConfig(
+            ReadableConfig config, ClassLoader classLoader, @Nullable Logger logger)
+            throws IllegalStateException, DynamicCodeLoadingException, IOException {
+
+        Preconditions.checkNotNull(config, "config");
+        Preconditions.checkNotNull(classLoader, "classLoader");
+
+        final String storageName = config.get(CheckpointingOptions.CHECKPOINT_STORAGE);
+        if (storageName == null) {
+            return null;
+        }
+
+        switch (storageName.toLowerCase()) {
+            case JOB_MANAGER_STORAGE_NAME:
+                throw new IllegalStateException(
+                        "JobManagerCheckpointStorage is not yet implemented");
+
+            case FILE_SYSTEM_STORAGE_NAME:
+                throw new IllegalStateException(
+                        "FileSystemCheckpointStorage is not yet implemented");
+
+            default:
+                if (logger != null) {
+                    logger.info("Loading state backend via factory {}", storageName);
+                }
+
+                CheckpointStorageFactory<?> factory;
+                try {
+                    @SuppressWarnings("rawtypes")
+                    Class<? extends CheckpointStorageFactory> clazz =
+                            Class.forName(storageName, false, classLoader)
+                                    .asSubclass(CheckpointStorageFactory.class);
+
+                    factory = clazz.newInstance();
+                } catch (ClassNotFoundException e) {
+                    throw new DynamicCodeLoadingException(
+                            "Cannot find configured state backend factory class: " + storageName,

Review comment:
       "state backend" => "checkpoint storage"

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.util.DynamicCodeLoadingException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/** This class contains utility methods to load checkpoint storage from configurations. */
+public class CheckpointStorageLoader {
+
+    public static final String JOB_MANAGER_STORAGE_NAME = "jobmanager";
+
+    public static final String FILE_SYSTEM_STORAGE_NAME = "filesystem";
+
+    /**
+     * Loads the checkpoint storage from the configuration, from the parameter
+     * 'state.checkpoint-storage', as defined in {@link CheckpointingOptions#CHECKPOINT_STORAGE}.
+     *
+     * <p>The state backends can be specified either via their shortcut name, or via the class name
+     * of a {@link CheckpointStorageFactory}. If a CheckpointStorageFactory class name is specified,
+     * the factory is instantiated (via its zero-argument constructor) and its {@link
+     * CheckpointStorageFactory#createFromConfig(ReadableConfig, ClassLoader)} method is called.
+     *
+     * <p>Recognized shortcut names are '{@value #JOB_MANAGER_STORAGE_NAME}', and '{@value
+     * #FILE_SYSTEM_STORAGE_NAME}'.
+     *
+     * @param config The configuration to load the checkpoint storage from
+     * @param classLoader The class loader that should be used to load the checkpoint storage
+     * @param logger Optionally, a logger to log actions to (may be null)
+     * @return The instantiated checkpoint storage.
+     * @throws DynamicCodeLoadingException Thrown if a checkpoint storage factory is configured and
+     *     the factory class was not found or the factory could not be instantiated
+     * @throws IllegalConfigurationException May be thrown by the CheckpointStorageFactory when
+     *     creating / configuring the checkpoint storage in the factory
+     * @throws IOException May be thrown by the CheckpointStorageFactory when instantiating the
+     *     checkpoint storage
+     */
+    public static CheckpointStorage loadCheckpointStorageFromConfig(
+            ReadableConfig config, ClassLoader classLoader, @Nullable Logger logger)
+            throws IllegalStateException, DynamicCodeLoadingException, IOException {
+
+        Preconditions.checkNotNull(config, "config");
+        Preconditions.checkNotNull(classLoader, "classLoader");
+
+        final String storageName = config.get(CheckpointingOptions.CHECKPOINT_STORAGE);
+        if (storageName == null) {
+            return null;
+        }
+
+        switch (storageName.toLowerCase()) {
+            case JOB_MANAGER_STORAGE_NAME:
+                throw new IllegalStateException(
+                        "JobManagerCheckpointStorage is not yet implemented");
+
+            case FILE_SYSTEM_STORAGE_NAME:
+                throw new IllegalStateException(
+                        "FileSystemCheckpointStorage is not yet implemented");
+
+            default:
+                if (logger != null) {
+                    logger.info("Loading state backend via factory {}", storageName);

Review comment:
       "state backend" => "checkpoint storage"

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLoader.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.util.DynamicCodeLoadingException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/** This class contains utility methods to load checkpoint storage from configurations. */
+public class CheckpointStorageLoader {
+
+    public static final String JOB_MANAGER_STORAGE_NAME = "jobmanager";
+
+    public static final String FILE_SYSTEM_STORAGE_NAME = "filesystem";
+
+    /**
+     * Loads the checkpoint storage from the configuration, from the parameter
+     * 'state.checkpoint-storage', as defined in {@link CheckpointingOptions#CHECKPOINT_STORAGE}.
+     *
+     * <p>The state backends can be specified either via their shortcut name, or via the class name

Review comment:
       "The state backends can be specified either via their shortcut name" =>
   
   "The checkpoint storage can be specified either via its shortcut name"




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org