You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/11/08 13:14:30 UTC

[flink] branch master updated (a77747892b1 -> bb124f4ada4)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from a77747892b1 [FLINK-29878][hive] Fix flink-sql-connector hive error
     new e98e289991b [FLINK-29379][streaming] Migrate most of the CheckpointConfig fields to Configuration
     new a9099b31db8 [FLINK-29379][streaming] Migrate a couple of CheckpointConfig fields to hidden ConfigOption
     new bb124f4ada4 [FLINK-29379][streaming] Drop CheckpointConfig#failOnCheckpointingErrors and use getTolerableCheckpointFailureNumber() instead

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../api/environment/CheckpointConfig.java          | 301 +++++++++++----------
 .../environment/ExecutionCheckpointingOptions.java |  28 ++
 2 files changed, 183 insertions(+), 146 deletions(-)


[flink] 03/03: [FLINK-29379][streaming] Drop CheckpointConfig#failOnCheckpointingErrors and use getTolerableCheckpointFailureNumber() instead

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bb124f4ada402abd98de53e3827fc227997c4e3e
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Fri Nov 4 13:06:24 2022 +0100

    [FLINK-29379][streaming] Drop CheckpointConfig#failOnCheckpointingErrors and use getTolerableCheckpointFailureNumber() instead
---
 .../flink/streaming/api/environment/CheckpointConfig.java   | 13 +------------
 1 file changed, 1 insertion(+), 12 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index 2937f347faf..7e59e0874c9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -128,16 +128,6 @@ public class CheckpointConfig implements java.io.Serializable {
      */
     private final Configuration configuration;
 
-    /**
-     * Task would not fail if there is an error in their checkpointing.
-     *
-     * <p>{@link ExecutionCheckpointingOptions#TOLERABLE_FAILURE_NUMBER} would always overrule this
-     * deprecated field if they have conflicts.
-     *
-     * @deprecated Use {@link ExecutionCheckpointingOptions#TOLERABLE_FAILURE_NUMBER}.
-     */
-    @Deprecated private boolean failOnCheckpointingErrors = true;
-
     /**
      * The checkpoint storage for this application. This field is marked as transient because it may
      * contain user-code.
@@ -380,7 +370,7 @@ public class CheckpointConfig implements java.io.Serializable {
      */
     @Deprecated
     public boolean isFailOnCheckpointingErrors() {
-        return failOnCheckpointingErrors;
+        return getTolerableCheckpointFailureNumber() == 0;
     }
 
     /**
@@ -409,7 +399,6 @@ public class CheckpointConfig implements java.io.Serializable {
                     getTolerableCheckpointFailureNumber());
             return;
         }
-        this.failOnCheckpointingErrors = failOnCheckpointingErrors;
         if (failOnCheckpointingErrors) {
             setTolerableCheckpointFailureNumber(0);
         } else {


[flink] 02/03: [FLINK-29379][streaming] Migrate a couple of CheckpointConfig fields to hidden ConfigOption

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a9099b31db8c37b51b6a4d6873f72274396bff09
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Fri Nov 4 13:04:16 2022 +0100

    [FLINK-29379][streaming] Migrate a couple of CheckpointConfig fields to hidden ConfigOption
---
 .../api/environment/CheckpointConfig.java          | 35 +++++++++++++---------
 .../environment/ExecutionCheckpointingOptions.java | 28 +++++++++++++++++
 2 files changed, 49 insertions(+), 14 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index 8f5200bd8f4..2937f347faf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -120,16 +120,14 @@ public class CheckpointConfig implements java.io.Serializable {
                     .defaultValue()
                     .intValue();
 
-    // ------------------------------------------------------------------------
+    // --------------------------------------------------------------------------------------------
 
+    /**
+     * In the long run, this field should be somehow merged with the {@link Configuration} from
+     * {@link StreamExecutionEnvironment}.
+     */
     private final Configuration configuration;
 
-    /** Flag to force checkpointing in iterative jobs. */
-    private boolean forceCheckpointing;
-
-    /** Flag to enable approximate local recovery. */
-    private boolean approximateLocalRecovery;
-
     /**
      * Task would not fail if there is an error in their checkpointing.
      *
@@ -143,8 +141,10 @@ public class CheckpointConfig implements java.io.Serializable {
     /**
      * The checkpoint storage for this application. This field is marked as transient because it may
      * contain user-code.
+     *
+     * @deprecated this should be moved somehow to {@link #configuration}.
      */
-    private transient CheckpointStorage storage;
+    @Deprecated private transient CheckpointStorage storage;
 
     /**
      * Creates a deep copy of the provided {@link CheckpointConfig}.
@@ -155,8 +155,6 @@ public class CheckpointConfig implements java.io.Serializable {
         checkNotNull(checkpointConfig);
 
         this.configuration = new Configuration(checkpointConfig.configuration);
-        this.approximateLocalRecovery = checkpointConfig.isApproximateLocalRecoveryEnabled();
-        this.forceCheckpointing = checkpointConfig.forceCheckpointing;
         this.storage = checkpointConfig.getCheckpointStorage();
     }
 
@@ -334,7 +332,7 @@ public class CheckpointConfig implements java.io.Serializable {
     @Deprecated
     @PublicEvolving
     public boolean isForceCheckpointing() {
-        return forceCheckpointing;
+        return configuration.get(ExecutionCheckpointingOptions.FORCE_CHECKPOINTING);
     }
 
     /**
@@ -347,7 +345,7 @@ public class CheckpointConfig implements java.io.Serializable {
     @Deprecated
     @PublicEvolving
     public void setForceCheckpointing(boolean forceCheckpointing) {
-        this.forceCheckpointing = forceCheckpointing;
+        configuration.set(ExecutionCheckpointingOptions.FORCE_CHECKPOINTING, forceCheckpointing);
     }
 
     /**
@@ -624,7 +622,7 @@ public class CheckpointConfig implements java.io.Serializable {
      */
     @Experimental
     public boolean isApproximateLocalRecoveryEnabled() {
-        return approximateLocalRecovery;
+        return configuration.get(ExecutionCheckpointingOptions.APPROXIMATE_LOCAL_RECOVERY);
     }
 
     /**
@@ -643,7 +641,7 @@ public class CheckpointConfig implements java.io.Serializable {
      */
     @Experimental
     public void enableApproximateLocalRecovery(boolean enabled) {
-        approximateLocalRecovery = enabled;
+        configuration.set(ExecutionCheckpointingOptions.APPROXIMATE_LOCAL_RECOVERY, enabled);
     }
 
     /**
@@ -861,4 +859,13 @@ public class CheckpointConfig implements java.io.Serializable {
                 .getOptional(CheckpointingOptions.CHECKPOINTS_DIRECTORY)
                 .ifPresent(this::setCheckpointStorage);
     }
+
+    /**
+     * @return A copy of internal {@link #configuration}. Note it is missing all options that are
+     *     stored as plain java fields in {@link CheckpointConfig}, for example {@link #storage}.
+     */
+    @Internal
+    public Configuration toConfiguration() {
+        return new Configuration(configuration);
+    }
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java
index 18135e8267d..02ebbd44e82 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java
@@ -18,7 +18,9 @@
 
 package org.apache.flink.streaming.api.environment;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.Documentation;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
@@ -28,6 +30,7 @@ import org.apache.flink.streaming.api.CheckpointingMode;
 
 import java.time.Duration;
 
+import static org.apache.flink.configuration.ConfigOptions.key;
 import static org.apache.flink.configuration.description.LinkElement.link;
 
 /**
@@ -252,4 +255,29 @@ public class ExecutionCheckpointingOptions {
                                                     "{{.Site.BaseURL}}{{.Site.LanguagePrefix}}/docs/dev/datastream/fault-tolerance/checkpointing/#checkpointing-with-parts-of-the-graph-finished-beta",
                                                     "the important considerations"))
                                     .build());
+
+    /**
+     * Access to this option is officially only supported via {@link
+     * CheckpointConfig#setForceCheckpointing(boolean)}, but there is no good reason behind this.
+     *
+     * @deprecated This will be removed once iterations properly participate in checkpointing.
+     */
+    @Internal @Deprecated @Documentation.ExcludeFromDocumentation
+    public static final ConfigOption<Boolean> FORCE_CHECKPOINTING =
+            key("execution.checkpointing.force")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Flag to force checkpointing in iterative jobs.");
+
+    /**
+     * Access to this option is officially only supported via {@link
+     * CheckpointConfig#enableApproximateLocalRecovery(boolean)}, but there is no good reason behind
+     * this.
+     */
+    @Internal @Documentation.ExcludeFromDocumentation
+    public static final ConfigOption<Boolean> APPROXIMATE_LOCAL_RECOVERY =
+            key("execution.checkpointing.approximate-local-recovery")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Flag to enable approximate local recovery.");
 }


[flink] 01/03: [FLINK-29379][streaming] Migrate most of the CheckpointConfig fields to Configuration

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e98e289991bbd82104c13dac1035ae2a60c75168
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu Nov 3 18:04:53 2022 +0100

    [FLINK-29379][streaming] Migrate most of the CheckpointConfig fields to Configuration
---
 .../api/environment/CheckpointConfig.java          | 267 +++++++++++----------
 1 file changed, 140 insertions(+), 127 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index edcadc2c46f..8f5200bd8f4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -24,6 +24,7 @@ import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DescribedEnum;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.configuration.description.InlineElement;
@@ -42,7 +43,6 @@ import javax.annotation.Nullable;
 import java.net.URI;
 import java.time.Duration;
 
-import static java.util.Objects.requireNonNull;
 import static org.apache.flink.configuration.description.TextElement.text;
 import static org.apache.flink.runtime.checkpoint.CheckpointFailureManager.UNLIMITED_TOLERABLE_FAILURE_NUMBER;
 import static org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration.MINIMAL_CHECKPOINT_TIME;
@@ -65,81 +65,81 @@ public class CheckpointConfig implements java.io.Serializable {
 
     private static final Logger LOG = LoggerFactory.getLogger(CheckpointConfig.class);
 
-    /** The default checkpoint mode: exactly once. */
-    public static final CheckpointingMode DEFAULT_MODE = CheckpointingMode.EXACTLY_ONCE;
+    @Deprecated
+    /**
+     * The default checkpoint mode: exactly once.
+     *
+     * @deprecated This field is no longer used. Please use {@link
+     *     ExecutionCheckpointingOptions.CHECKPOINTING_MODE} instead.
+     */
+    public static final CheckpointingMode DEFAULT_MODE =
+            ExecutionCheckpointingOptions.CHECKPOINTING_MODE.defaultValue();
 
-    /** The default timeout of a checkpoint attempt: 10 minutes. */
-    public static final long DEFAULT_TIMEOUT = 10 * 60 * 1000;
+    /**
+     * The default timeout of a checkpoint attempt: 10 minutes.
+     *
+     * @deprecated This field is no longer used. Please use {@link
+     *     ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT} instead.
+     */
+    @Deprecated
+    public static final long DEFAULT_TIMEOUT =
+            ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT.defaultValue().toMillis();
 
-    /** The default minimum pause to be made between checkpoints: none. */
-    public static final long DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS = 0;
+    /**
+     * The default minimum pause to be made between checkpoints: none.
+     *
+     * @deprecated This field is no longer used. Please use {@link
+     *     ExecutionCheckpointingOptions.MIN_PAUSE_BETWEEN_CHECKPOINTS} instead.
+     */
+    @Deprecated
+    public static final long DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS =
+            ExecutionCheckpointingOptions.MIN_PAUSE_BETWEEN_CHECKPOINTS.defaultValue().toMillis();
 
-    /** The default limit of concurrently happening checkpoints: one. */
-    public static final int DEFAULT_MAX_CONCURRENT_CHECKPOINTS = 1;
+    /**
+     * The default limit of concurrently happening checkpoints: one.
+     *
+     * @deprecated This field is no longer used. Please use {@link
+     *     ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS} instead.
+     */
+    @Deprecated
+    public static final int DEFAULT_MAX_CONCURRENT_CHECKPOINTS =
+            ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS.defaultValue();
 
-    public static final int UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER = -1;
+    /** @deprecated This field is no longer used. */
+    @Deprecated public static final int UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER = -1;
 
-    /** Default id of checkpoint for which in-flight data should be ignored on recovery. */
-    public static final int DEFAULT_CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA = -1;
+    /**
+     * Default id of checkpoint for which in-flight data should be ignored on recovery.
+     *
+     * @deprecated This field is no longer used. Please use {@link
+     *     ExecutionCheckpointingOptions.CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA} instead.
+     */
+    @Deprecated
+    public static final int DEFAULT_CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA =
+            ExecutionCheckpointingOptions.CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA
+                    .defaultValue()
+                    .intValue();
 
     // ------------------------------------------------------------------------
 
-    /** Checkpointing mode (exactly-once vs. at-least-once). */
-    private CheckpointingMode checkpointingMode = DEFAULT_MODE;
-
-    /** Periodic checkpoint triggering interval. */
-    private long checkpointInterval = -1; // disabled
-
-    /** Maximum time checkpoint may take before being discarded. */
-    private long checkpointTimeout = DEFAULT_TIMEOUT;
-
-    /** Minimal pause between checkpointing attempts. */
-    private long minPauseBetweenCheckpoints = DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS;
-
-    /** Maximum number of checkpoint attempts in progress at the same time. */
-    private int maxConcurrentCheckpoints = DEFAULT_MAX_CONCURRENT_CHECKPOINTS;
+    private final Configuration configuration;
 
     /** Flag to force checkpointing in iterative jobs. */
     private boolean forceCheckpointing;
 
-    /** Flag to force checkpointing in iterative jobs. */
-    private boolean forceUnalignedCheckpoints;
-
-    /** Flag to enable unaligned checkpoints. */
-    private boolean unalignedCheckpointsEnabled;
-
-    /** Id of checkpoint for which in-flight data should be ignored on recovery. */
-    private long checkpointIdOfIgnoredInFlightData =
-            DEFAULT_CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA;
-
-    /** The delay from the start of checkpoint after which AC switches to UC. */
-    private Duration alignedCheckpointTimeout =
-            ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT.defaultValue();
-
     /** Flag to enable approximate local recovery. */
     private boolean approximateLocalRecovery;
 
-    /** Cleanup behaviour for persistent checkpoints. */
-    private ExternalizedCheckpointCleanup externalizedCheckpointCleanup =
-            ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT.defaultValue();
-
     /**
      * Task would not fail if there is an error in their checkpointing.
      *
-     * <p>{@link #tolerableCheckpointFailureNumber} would always overrule this deprecated field if
-     * they have conflicts.
+     * <p>{@link ExecutionCheckpointingOptions#TOLERABLE_FAILURE_NUMBER} would always overrule this
+     * deprecated field if they have conflicts.
      *
-     * @deprecated Use {@link #tolerableCheckpointFailureNumber}.
+     * @deprecated Use {@link ExecutionCheckpointingOptions#TOLERABLE_FAILURE_NUMBER}.
      */
     @Deprecated private boolean failOnCheckpointingErrors = true;
 
-    /**
-     * Determines the threshold that we tolerance declined checkpoint failure number. The default
-     * value is -1 meaning undetermined and not set via {@link
-     * #setTolerableCheckpointFailureNumber(int)}.
-     */
-    private int tolerableCheckpointFailureNumber = UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER;
-
     /**
      * The checkpoint storage for this application. This field is marked as transient because it may
      * contain user-code.
@@ -154,30 +154,21 @@ public class CheckpointConfig implements java.io.Serializable {
     public CheckpointConfig(final CheckpointConfig checkpointConfig) {
         checkNotNull(checkpointConfig);
 
-        this.checkpointInterval = checkpointConfig.checkpointInterval;
-        this.checkpointingMode = checkpointConfig.checkpointingMode;
-        this.checkpointTimeout = checkpointConfig.checkpointTimeout;
-        this.maxConcurrentCheckpoints = checkpointConfig.maxConcurrentCheckpoints;
-        this.minPauseBetweenCheckpoints = checkpointConfig.minPauseBetweenCheckpoints;
-        this.tolerableCheckpointFailureNumber = checkpointConfig.tolerableCheckpointFailureNumber;
-        this.unalignedCheckpointsEnabled = checkpointConfig.isUnalignedCheckpointsEnabled();
-        this.alignedCheckpointTimeout = checkpointConfig.alignedCheckpointTimeout;
+        this.configuration = new Configuration(checkpointConfig.configuration);
         this.approximateLocalRecovery = checkpointConfig.isApproximateLocalRecoveryEnabled();
-        this.externalizedCheckpointCleanup = checkpointConfig.externalizedCheckpointCleanup;
         this.forceCheckpointing = checkpointConfig.forceCheckpointing;
-        this.forceUnalignedCheckpoints = checkpointConfig.forceUnalignedCheckpoints;
         this.storage = checkpointConfig.getCheckpointStorage();
-        this.checkpointIdOfIgnoredInFlightData =
-                checkpointConfig.getCheckpointIdOfIgnoredInFlightData();
     }
 
-    public CheckpointConfig() {}
+    public CheckpointConfig() {
+        configuration = new Configuration();
+    }
 
     // ------------------------------------------------------------------------
 
     /** Disables checkpointing. */
     public void disableCheckpointing() {
-        this.checkpointInterval = -1;
+        configuration.removeConfig(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL);
     }
 
     /**
@@ -186,7 +177,7 @@ public class CheckpointConfig implements java.io.Serializable {
      * @return True if checkpointing is enables, false otherwise.
      */
     public boolean isCheckpointingEnabled() {
-        return checkpointInterval > 0;
+        return getCheckpointInterval() > 0;
     }
 
     /**
@@ -195,7 +186,7 @@ public class CheckpointConfig implements java.io.Serializable {
      * @return The checkpointing mode.
      */
     public CheckpointingMode getCheckpointingMode() {
-        return checkpointingMode;
+        return configuration.get(ExecutionCheckpointingOptions.CHECKPOINTING_MODE);
     }
 
     /**
@@ -204,7 +195,7 @@ public class CheckpointConfig implements java.io.Serializable {
      * @param checkpointingMode The checkpointing mode.
      */
     public void setCheckpointingMode(CheckpointingMode checkpointingMode) {
-        this.checkpointingMode = requireNonNull(checkpointingMode);
+        configuration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, checkpointingMode);
     }
 
     /**
@@ -216,7 +207,10 @@ public class CheckpointConfig implements java.io.Serializable {
      * @return The checkpoint interval, in milliseconds.
      */
     public long getCheckpointInterval() {
-        return checkpointInterval;
+        return configuration
+                .getOptional(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL)
+                .map(Duration::toMillis)
+                .orElse(-1L);
     }
 
     /**
@@ -235,7 +229,9 @@ public class CheckpointConfig implements java.io.Serializable {
                             "Checkpoint interval must be larger than or equal to %s ms",
                             MINIMAL_CHECKPOINT_TIME));
         }
-        this.checkpointInterval = checkpointInterval;
+        configuration.set(
+                ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
+                Duration.ofMillis(checkpointInterval));
     }
 
     /**
@@ -244,7 +240,7 @@ public class CheckpointConfig implements java.io.Serializable {
      * @return The checkpoint timeout, in milliseconds.
      */
     public long getCheckpointTimeout() {
-        return checkpointTimeout;
+        return configuration.get(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT).toMillis();
     }
 
     /**
@@ -259,7 +255,9 @@ public class CheckpointConfig implements java.io.Serializable {
                             "Checkpoint timeout must be larger than or equal to %s ms",
                             MINIMAL_CHECKPOINT_TIME));
         }
-        this.checkpointTimeout = checkpointTimeout;
+        configuration.set(
+                ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT,
+                Duration.ofMillis(checkpointTimeout));
     }
 
     /**
@@ -271,7 +269,9 @@ public class CheckpointConfig implements java.io.Serializable {
      * @return The minimal pause before the next checkpoint is triggered.
      */
     public long getMinPauseBetweenCheckpoints() {
-        return minPauseBetweenCheckpoints;
+        return configuration
+                .get(ExecutionCheckpointingOptions.MIN_PAUSE_BETWEEN_CHECKPOINTS)
+                .toMillis();
     }
 
     /**
@@ -290,7 +290,9 @@ public class CheckpointConfig implements java.io.Serializable {
         if (minPauseBetweenCheckpoints < 0) {
             throw new IllegalArgumentException("Pause value must be zero or positive");
         }
-        this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
+        configuration.set(
+                ExecutionCheckpointingOptions.MIN_PAUSE_BETWEEN_CHECKPOINTS,
+                Duration.ofMillis(minPauseBetweenCheckpoints));
     }
 
     /**
@@ -302,7 +304,7 @@ public class CheckpointConfig implements java.io.Serializable {
      * @return The maximum number of concurrent checkpoint attempts.
      */
     public int getMaxConcurrentCheckpoints() {
-        return maxConcurrentCheckpoints;
+        return configuration.get(ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS);
     }
 
     /**
@@ -318,7 +320,8 @@ public class CheckpointConfig implements java.io.Serializable {
             throw new IllegalArgumentException(
                     "The maximum number of concurrent attempts must be at least one.");
         }
-        this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
+        configuration.set(
+                ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS, maxConcurrentCheckpoints);
     }
 
     /**
@@ -354,7 +357,7 @@ public class CheckpointConfig implements java.io.Serializable {
      */
     @PublicEvolving
     public boolean isForceUnalignedCheckpoints() {
-        return forceUnalignedCheckpoints;
+        return configuration.get(ExecutionCheckpointingOptions.FORCE_UNALIGNED);
     }
 
     /**
@@ -365,7 +368,7 @@ public class CheckpointConfig implements java.io.Serializable {
      */
     @PublicEvolving
     public void setForceUnalignedCheckpoints(boolean forceUnalignedCheckpoints) {
-        this.forceUnalignedCheckpoints = forceUnalignedCheckpoints;
+        configuration.set(ExecutionCheckpointingOptions.FORCE_UNALIGNED, forceUnalignedCheckpoints);
     }
 
     /**
@@ -397,19 +400,22 @@ public class CheckpointConfig implements java.io.Serializable {
      */
     @Deprecated
     public void setFailOnCheckpointingErrors(boolean failOnCheckpointingErrors) {
-        if (tolerableCheckpointFailureNumber != UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER) {
+        if (configuration
+                .getOptional(ExecutionCheckpointingOptions.TOLERABLE_FAILURE_NUMBER)
+                .isPresent()) {
             LOG.warn(
-                    "Since tolerableCheckpointFailureNumber has been configured as {}, deprecated #setFailOnCheckpointingErrors(boolean) "
-                            + "method would not take any effect and please use #setTolerableCheckpointFailureNumber(int) method to "
+                    "Since ExecutionCheckpointingOptions.TOLERABLE_FAILURE_NUMBER has been configured as {}, deprecated "
+                            + "#setFailOnCheckpointingErrors(boolean) method would not take any effect and please use "
+                            + "#setTolerableCheckpointFailureNumber(int) method to "
                             + "determine your expected behaviour when checkpoint errors on task side.",
-                    tolerableCheckpointFailureNumber);
+                    getTolerableCheckpointFailureNumber());
             return;
         }
         this.failOnCheckpointingErrors = failOnCheckpointingErrors;
         if (failOnCheckpointingErrors) {
-            this.tolerableCheckpointFailureNumber = 0;
+            setTolerableCheckpointFailureNumber(0);
         } else {
-            this.tolerableCheckpointFailureNumber = UNLIMITED_TOLERABLE_FAILURE_NUMBER;
+            setTolerableCheckpointFailureNumber(UNLIMITED_TOLERABLE_FAILURE_NUMBER);
         }
     }
 
@@ -417,15 +423,14 @@ public class CheckpointConfig implements java.io.Serializable {
      * Get the defined number of consecutive checkpoint failures that will be tolerated, before the
      * whole job is failed over.
      *
-     * <p>If the {@link #tolerableCheckpointFailureNumber} has not been configured, this method
-     * would return 0 which means the checkpoint failure manager would not tolerate any declined
-     * checkpoint failure.
+     * <p>If the {@link ExecutionCheckpointingOptions#TOLERABLE_FAILURE_NUMBER} has not been
+     * configured, this method would return 0 which means the checkpoint failure manager would not
+     * tolerate any declined checkpoint failure.
      */
     public int getTolerableCheckpointFailureNumber() {
-        if (tolerableCheckpointFailureNumber == UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER) {
-            return 0;
-        }
-        return tolerableCheckpointFailureNumber;
+        return configuration
+                .getOptional(ExecutionCheckpointingOptions.TOLERABLE_FAILURE_NUMBER)
+                .orElse(0);
     }
 
     /**
@@ -438,7 +443,9 @@ public class CheckpointConfig implements java.io.Serializable {
             throw new IllegalArgumentException(
                     "The tolerable failure checkpoint number must be non-negative.");
         }
-        this.tolerableCheckpointFailureNumber = tolerableCheckpointFailureNumber;
+        configuration.set(
+                ExecutionCheckpointingOptions.TOLERABLE_FAILURE_NUMBER,
+                tolerableCheckpointFailureNumber);
     }
 
     /**
@@ -458,13 +465,13 @@ public class CheckpointConfig implements java.io.Serializable {
      * (terminating with job status {@link JobStatus#CANCELED}).
      *
      * <p>The target directory for externalized checkpoints is configured via {@link
-     * org.apache.flink.configuration.CheckpointingOptions#CHECKPOINTS_DIRECTORY}.
+     * CheckpointingOptions#CHECKPOINTS_DIRECTORY}.
      *
      * @param cleanupMode Externalized checkpoint clean-up behaviour.
      */
     @PublicEvolving
     public void setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup cleanupMode) {
-        this.externalizedCheckpointCleanup = checkNotNull(cleanupMode);
+        configuration.set(ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT, cleanupMode);
     }
 
     /**
@@ -484,7 +491,7 @@ public class CheckpointConfig implements java.io.Serializable {
      * (terminating with job status {@link JobStatus#CANCELED}).
      *
      * <p>The target directory for externalized checkpoints is configured via {@link
-     * org.apache.flink.configuration.CheckpointingOptions#CHECKPOINTS_DIRECTORY}.
+     * CheckpointingOptions#CHECKPOINTS_DIRECTORY}.
      *
      * @param cleanupMode Externalized checkpoint clean-up behaviour.
      * @deprecated use {@link #setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup)}
@@ -493,7 +500,7 @@ public class CheckpointConfig implements java.io.Serializable {
     @PublicEvolving
     @Deprecated
     public void enableExternalizedCheckpoints(ExternalizedCheckpointCleanup cleanupMode) {
-        this.externalizedCheckpointCleanup = checkNotNull(cleanupMode);
+        setExternalizedCheckpointCleanup(cleanupMode);
     }
 
     /**
@@ -503,7 +510,7 @@ public class CheckpointConfig implements java.io.Serializable {
      */
     @PublicEvolving
     public boolean isExternalizedCheckpointsEnabled() {
-        return externalizedCheckpointCleanup
+        return getExternalizedCheckpointCleanup()
                 != ExternalizedCheckpointCleanup.NO_EXTERNALIZED_CHECKPOINTS;
     }
 
@@ -515,14 +522,14 @@ public class CheckpointConfig implements java.io.Serializable {
      * becomes independent of the current throughput as checkpoint barriers are effectively not
      * embedded into the stream of data anymore.
      *
-     * <p>Unaligned checkpoints can only be enabled if {@link #checkpointingMode} is {@link
-     * CheckpointingMode#EXACTLY_ONCE}.
+     * <p>Unaligned checkpoints can only be enabled if {@link
+     * ExecutionCheckpointingOptions#CHECKPOINTING_MODE} is {@link CheckpointingMode#EXACTLY_ONCE}.
      *
      * @param enabled Flag to indicate whether unaligned are enabled.
      */
     @PublicEvolving
     public void enableUnalignedCheckpoints(boolean enabled) {
-        unalignedCheckpointsEnabled = enabled;
+        configuration.set(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, enabled);
     }
 
     /**
@@ -533,8 +540,8 @@ public class CheckpointConfig implements java.io.Serializable {
      * becomes independent of the current throughput as checkpoint barriers are effectively not
      * embedded into the stream of data anymore.
      *
-     * <p>Unaligned checkpoints can only be enabled if {@link #checkpointingMode} is {@link
-     * CheckpointingMode#EXACTLY_ONCE}.
+     * <p>Unaligned checkpoints can only be enabled if {@link
+     * ExecutionCheckpointingOptions#CHECKPOINTING_MODE} is {@link CheckpointingMode#EXACTLY_ONCE}.
      */
     @PublicEvolving
     public void enableUnalignedCheckpoints() {
@@ -548,26 +555,26 @@ public class CheckpointConfig implements java.io.Serializable {
      */
     @PublicEvolving
     public boolean isUnalignedCheckpointsEnabled() {
-        return unalignedCheckpointsEnabled;
+        return configuration.get(ExecutionCheckpointingOptions.ENABLE_UNALIGNED);
     }
 
     /**
-     * Only relevant if {@link #unalignedCheckpointsEnabled} is enabled.
+     * Only relevant if {@link #isUnalignedCheckpointsEnabled} is enabled.
      *
-     * <p>If {@link #alignedCheckpointTimeout} has value equal to <code>0</code>, checkpoints will
-     * always start unaligned.
+     * <p>If {@link ExecutionCheckpointingOptions#ALIGNED_CHECKPOINT_TIMEOUT} has value equal to
+     * <code>0</code>, checkpoints will always start unaligned.
      *
-     * <p>If {@link #alignedCheckpointTimeout} has value greater then <code>0</code>, checkpoints
-     * will start aligned. If during checkpointing, checkpoint start delay exceeds this {@link
-     * #alignedCheckpointTimeout}, alignment will timeout and checkpoint will start working as
-     * unaligned checkpoint.
+     * <p>If {@link ExecutionCheckpointingOptions#ALIGNED_CHECKPOINT_TIMEOUT} has value greater then
+     * <code>0</code>, checkpoints will start aligned. If during checkpointing, checkpoint start
+     * delay exceeds this {@link ExecutionCheckpointingOptions#ALIGNED_CHECKPOINT_TIMEOUT},
+     * alignment will timeout and checkpoint will start working as unaligned checkpoint.
      *
      * @deprecated Use {@link #setAlignedCheckpointTimeout(Duration)} instead.
      */
     @Deprecated
     @PublicEvolving
     public void setAlignmentTimeout(Duration alignmentTimeout) {
-        this.alignedCheckpointTimeout = alignmentTimeout;
+        setAlignedCheckpointTimeout(alignmentTimeout);
     }
 
     /**
@@ -578,7 +585,7 @@ public class CheckpointConfig implements java.io.Serializable {
     @Deprecated
     @PublicEvolving
     public Duration getAlignmentTimeout() {
-        return alignedCheckpointTimeout;
+        return getAlignedCheckpointTimeout();
     }
 
     /**
@@ -588,23 +595,26 @@ public class CheckpointConfig implements java.io.Serializable {
      */
     @PublicEvolving
     public Duration getAlignedCheckpointTimeout() {
-        return alignedCheckpointTimeout;
+        return configuration.get(ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT);
     }
 
     /**
-     * Only relevant if {@link #unalignedCheckpointsEnabled} is enabled.
+     * Only relevant if {@link ExecutionCheckpointingOptions.ENABLE_UNALIGNED} is enabled.
+     *
+     * <p>If {@link ExecutionCheckpointingOptions#ALIGNED_CHECKPOINT_TIMEOUT} has value equal to
+     * <code>0</code>, checkpoints will
      *
-     * <p>If {@link #alignedCheckpointTimeout} has value equal to <code>0</code>, checkpoints will
-     * always start unaligned.
+     * <p>always start unaligned.
      *
-     * <p>If {@link #alignedCheckpointTimeout} has value greater then <code>0</code>, checkpoints
-     * will start aligned. If during checkpointing, checkpoint start delay exceeds this {@link
-     * #alignedCheckpointTimeout}, alignment will timeout and checkpoint will start working as
-     * unaligned checkpoint.
+     * <p>If {@link ExecutionCheckpointingOptions#ALIGNED_CHECKPOINT_TIMEOUT} has value greater then
+     * <code>0</code>, checkpoints will start aligned. If during checkpointing, checkpoint start
+     * delay exceeds this {@link ExecutionCheckpointingOptions#ALIGNED_CHECKPOINT_TIMEOUT},
+     * alignment will timeout and checkpoint will start working as unaligned checkpoint.
      */
     @PublicEvolving
     public void setAlignedCheckpointTimeout(Duration alignedCheckpointTimeout) {
-        this.alignedCheckpointTimeout = alignedCheckpointTimeout;
+        configuration.set(
+                ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT, alignedCheckpointTimeout);
     }
 
     /**
@@ -644,7 +654,7 @@ public class CheckpointConfig implements java.io.Serializable {
      */
     @PublicEvolving
     public ExternalizedCheckpointCleanup getExternalizedCheckpointCleanup() {
-        return externalizedCheckpointCleanup;
+        return configuration.get(ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT);
     }
 
     /**
@@ -732,7 +742,9 @@ public class CheckpointConfig implements java.io.Serializable {
      */
     @PublicEvolving
     public void setCheckpointIdOfIgnoredInFlightData(long checkpointIdOfIgnoredInFlightData) {
-        this.checkpointIdOfIgnoredInFlightData = checkpointIdOfIgnoredInFlightData;
+        configuration.set(
+                ExecutionCheckpointingOptions.CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA,
+                checkpointIdOfIgnoredInFlightData);
     }
 
     /**
@@ -741,7 +753,8 @@ public class CheckpointConfig implements java.io.Serializable {
      */
     @PublicEvolving
     public long getCheckpointIdOfIgnoredInFlightData() {
-        return checkpointIdOfIgnoredInFlightData;
+        return configuration.get(
+                ExecutionCheckpointingOptions.CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA);
     }
 
     /** Cleanup behaviour for externalized checkpoints when the job is cancelled. */