You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/11/06 20:24:38 UTC

[GitHub] [flink] pnowojski opened a new pull request, #21245: [FLINK-29379][streaming] Migrate CheckpointConfig to use Configuration field and ConfigOption

pnowojski opened a new pull request, #21245:
URL: https://github.com/apache/flink/pull/21245

   
   ## What is the purpose of the change
   
   This PR refactors `CheckpointConfig` to use `Configuration` class
   
   ## Brief change log
   
   Please check individual commits.
   
   ## Verifying this change
   
   This is only a refactor that's covered by existing tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**yes** / no)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know)
     - The S3 file system connector: (yes / no ****/ don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / **no**)
     - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #21245: [FLINK-29379][streaming] Migrate CheckpointConfig to use Configuration field and ConfigOption

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #21245:
URL: https://github.com/apache/flink/pull/21245#discussion_r1015406636


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java:
##########
@@ -186,7 +164,7 @@ public void disableCheckpointing() {
      * @return True if checkpointing is enables, false otherwise.
      */
     public boolean isCheckpointingEnabled() {
-        return checkpointInterval > 0;
+        return configuration.contains(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL);

Review Comment:
   Yes, and I'm -1 for adding an extra `ConfigOption` that would only confuse users and allow for inconsistencies:
   - what if user modifies only `checkpoint.enabled` but doesn't set the interval by mistake?
   - what if user does the opposite?
   
   If anything, I would be ok with a more fancy single property that allows `manual`, `disabled` or `int` for the interval, but that would be quite a bit of work that's IMO not worth it. `Long.MAX_VALUE` for those two power users that will want to to manually trigger checkpoints is perfectly fine for me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #21245: [FLINK-29379][streaming] Migrate CheckpointConfig to use Configuration field and ConfigOption

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #21245:
URL: https://github.com/apache/flink/pull/21245#discussion_r1015351612


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java:
##########
@@ -588,23 +581,26 @@ public Duration getAlignmentTimeout() {
      */
     @PublicEvolving
     public Duration getAlignedCheckpointTimeout() {
-        return alignedCheckpointTimeout;
+        return configuration.get(ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT);
     }
 
     /**
-     * Only relevant if {@link #unalignedCheckpointsEnabled} is enabled.
+     * Only relevant if {@link #isUnalignedCheckpointsEnabled} is enabled.

Review Comment:
   ```suggestion
        * Only relevant if {@link ExecutionCheckpointingOptions.ENABLE_UNALIGNED} is enabled.
   ```



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java:
##########
@@ -186,7 +164,7 @@ public void disableCheckpointing() {
      * @return True if checkpointing is enables, false otherwise.
      */
     public boolean isCheckpointingEnabled() {
-        return checkpointInterval > 0;
+        return configuration.contains(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL);

Review Comment:
   Note: long-term we may want to decouple checkpointing being enabled from the interval, in case a user wants to fully control checkpoints from the outside. (Which you currently _can_ by setting to Long.MAX_VALUE but it's a bit weird)



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java:
##########
@@ -66,85 +68,72 @@ public class CheckpointConfig implements java.io.Serializable {
     private static final Logger LOG = LoggerFactory.getLogger(CheckpointConfig.class);
 
     /** The default checkpoint mode: exactly once. */
-    public static final CheckpointingMode DEFAULT_MODE = CheckpointingMode.EXACTLY_ONCE;
+    public static final CheckpointingMode DEFAULT_MODE =
+            ExecutionCheckpointingOptions.CHECKPOINTING_MODE.defaultValue();
 
     /** The default timeout of a checkpoint attempt: 10 minutes. */
-    public static final long DEFAULT_TIMEOUT = 10 * 60 * 1000;
+    @Deprecated
+    public static final long DEFAULT_TIMEOUT =
+            ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT.defaultValue().toMillis();
 
     /** The default minimum pause to be made between checkpoints: none. */
-    public static final long DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS = 0;
+    @Deprecated
+    public static final long DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS =

Review Comment:
   add a deprecation note to use the config options instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #21245: [FLINK-29379][streaming] Migrate CheckpointConfig to use Configuration field and ConfigOption

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #21245:
URL: https://github.com/apache/flink/pull/21245#discussion_r1016303017


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java:
##########
@@ -154,30 +143,19 @@ public class CheckpointConfig implements java.io.Serializable {
     public CheckpointConfig(final CheckpointConfig checkpointConfig) {
         checkNotNull(checkpointConfig);
 
-        this.checkpointInterval = checkpointConfig.checkpointInterval;
-        this.checkpointingMode = checkpointConfig.checkpointingMode;
-        this.checkpointTimeout = checkpointConfig.checkpointTimeout;
-        this.maxConcurrentCheckpoints = checkpointConfig.maxConcurrentCheckpoints;
-        this.minPauseBetweenCheckpoints = checkpointConfig.minPauseBetweenCheckpoints;
-        this.tolerableCheckpointFailureNumber = checkpointConfig.tolerableCheckpointFailureNumber;
-        this.unalignedCheckpointsEnabled = checkpointConfig.isUnalignedCheckpointsEnabled();
-        this.alignedCheckpointTimeout = checkpointConfig.alignedCheckpointTimeout;
-        this.approximateLocalRecovery = checkpointConfig.isApproximateLocalRecoveryEnabled();
-        this.externalizedCheckpointCleanup = checkpointConfig.externalizedCheckpointCleanup;
-        this.forceCheckpointing = checkpointConfig.forceCheckpointing;
-        this.forceUnalignedCheckpoints = checkpointConfig.forceUnalignedCheckpoints;
+        this.configuration = new Configuration(checkpointConfig.configuration);
         this.storage = checkpointConfig.getCheckpointStorage();
-        this.checkpointIdOfIgnoredInFlightData =
-                checkpointConfig.getCheckpointIdOfIgnoredInFlightData();
     }
 
-    public CheckpointConfig() {}
+    public CheckpointConfig() {
+        configuration = new Configuration();
+    }
 
     // ------------------------------------------------------------------------
 
     /** Disables checkpointing. */
     public void disableCheckpointing() {
-        this.checkpointInterval = -1;
+        configuration.removeConfig(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL);

Review Comment:
   In the offline discussion we decided to leave it as it was (with the `removeConfig` option), and deal with it later, if it causes a problem.
   @twalthr didn't want to change the returned value from `StreamExecutionEnvironment` and `CheckpointConfig` to `0`
   I didn't want to have `StreamExecutionEnvironment` and `CheckpointConfig` to return `-1` while `ConfigOption` return `0`
   Adding support for negative values in the `ConfigOption` would be problematic, as it would have to be done in such way, that all already existing `ConfigOption` would have to fail/reject negative values.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #21245: [FLINK-29379][streaming] Migrate CheckpointConfig to use Configuration field and ConfigOption

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #21245:
URL: https://github.com/apache/flink/pull/21245#discussion_r1015395032


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java:
##########
@@ -66,85 +68,72 @@ public class CheckpointConfig implements java.io.Serializable {
     private static final Logger LOG = LoggerFactory.getLogger(CheckpointConfig.class);
 
     /** The default checkpoint mode: exactly once. */
-    public static final CheckpointingMode DEFAULT_MODE = CheckpointingMode.EXACTLY_ONCE;
+    public static final CheckpointingMode DEFAULT_MODE =
+            ExecutionCheckpointingOptions.CHECKPOINTING_MODE.defaultValue();
 
     /** The default timeout of a checkpoint attempt: 10 minutes. */
-    public static final long DEFAULT_TIMEOUT = 10 * 60 * 1000;
+    @Deprecated
+    public static final long DEFAULT_TIMEOUT =
+            ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT.defaultValue().toMillis();
 
     /** The default minimum pause to be made between checkpoints: none. */
-    public static final long DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS = 0;
+    @Deprecated
+    public static final long DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS =

Review Comment:
   I’m not trying to deprecate the config option, but just the pointless public static DEFAULT_FOO_BAR variables, that are no longer used, they are duplicating ConfigOptions but are part of the `@Public` API.
   
   I will explain that in the java docs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #21245: [FLINK-29379][streaming] Migrate CheckpointConfig to use Configuration field and ConfigOption

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #21245:
URL: https://github.com/apache/flink/pull/21245#discussion_r1015493682


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java:
##########
@@ -154,30 +143,19 @@ public class CheckpointConfig implements java.io.Serializable {
     public CheckpointConfig(final CheckpointConfig checkpointConfig) {
         checkNotNull(checkpointConfig);
 
-        this.checkpointInterval = checkpointConfig.checkpointInterval;
-        this.checkpointingMode = checkpointConfig.checkpointingMode;
-        this.checkpointTimeout = checkpointConfig.checkpointTimeout;
-        this.maxConcurrentCheckpoints = checkpointConfig.maxConcurrentCheckpoints;
-        this.minPauseBetweenCheckpoints = checkpointConfig.minPauseBetweenCheckpoints;
-        this.tolerableCheckpointFailureNumber = checkpointConfig.tolerableCheckpointFailureNumber;
-        this.unalignedCheckpointsEnabled = checkpointConfig.isUnalignedCheckpointsEnabled();
-        this.alignedCheckpointTimeout = checkpointConfig.alignedCheckpointTimeout;
-        this.approximateLocalRecovery = checkpointConfig.isApproximateLocalRecoveryEnabled();
-        this.externalizedCheckpointCleanup = checkpointConfig.externalizedCheckpointCleanup;
-        this.forceCheckpointing = checkpointConfig.forceCheckpointing;
-        this.forceUnalignedCheckpoints = checkpointConfig.forceUnalignedCheckpoints;
+        this.configuration = new Configuration(checkpointConfig.configuration);
         this.storage = checkpointConfig.getCheckpointStorage();
-        this.checkpointIdOfIgnoredInFlightData =
-                checkpointConfig.getCheckpointIdOfIgnoredInFlightData();
     }
 
-    public CheckpointConfig() {}
+    public CheckpointConfig() {
+        configuration = new Configuration();
+    }
 
     // ------------------------------------------------------------------------
 
     /** Disables checkpointing. */
     public void disableCheckpointing() {
-        this.checkpointInterval = -1;
+        configuration.removeConfig(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL);

Review Comment:
   I've replaced that with setting to `Duration.ofMillis(-1)` (please check the fixup commit)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] twalthr commented on a diff in pull request #21245: [FLINK-29379][streaming] Migrate CheckpointConfig to use Configuration field and ConfigOption

Posted by GitBox <gi...@apache.org>.
twalthr commented on code in PR #21245:
URL: https://github.com/apache/flink/pull/21245#discussion_r1015451112


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java:
##########
@@ -66,85 +68,72 @@ public class CheckpointConfig implements java.io.Serializable {
     private static final Logger LOG = LoggerFactory.getLogger(CheckpointConfig.class);
 
     /** The default checkpoint mode: exactly once. */
-    public static final CheckpointingMode DEFAULT_MODE = CheckpointingMode.EXACTLY_ONCE;
+    public static final CheckpointingMode DEFAULT_MODE =
+            ExecutionCheckpointingOptions.CHECKPOINTING_MODE.defaultValue();
 
     /** The default timeout of a checkpoint attempt: 10 minutes. */
-    public static final long DEFAULT_TIMEOUT = 10 * 60 * 1000;
+    @Deprecated
+    public static final long DEFAULT_TIMEOUT =
+            ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT.defaultValue().toMillis();
 
     /** The default minimum pause to be made between checkpoints: none. */
-    public static final long DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS = 0;
+    @Deprecated
+    public static final long DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS =
+            ExecutionCheckpointingOptions.MIN_PAUSE_BETWEEN_CHECKPOINTS.defaultValue().toMillis();
 
     /** The default limit of concurrently happening checkpoints: one. */
-    public static final int DEFAULT_MAX_CONCURRENT_CHECKPOINTS = 1;
+    @Deprecated
+    public static final int DEFAULT_MAX_CONCURRENT_CHECKPOINTS =
+            ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS.defaultValue();
 
-    public static final int UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER = -1;
+    @Deprecated public static final int UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER = -1;
 
     /** Default id of checkpoint for which in-flight data should be ignored on recovery. */
-    public static final int DEFAULT_CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA = -1;
-
-    // ------------------------------------------------------------------------
-
-    /** Checkpointing mode (exactly-once vs. at-least-once). */
-    private CheckpointingMode checkpointingMode = DEFAULT_MODE;
-
-    /** Periodic checkpoint triggering interval. */
-    private long checkpointInterval = -1; // disabled
-
-    /** Maximum time checkpoint may take before being discarded. */
-    private long checkpointTimeout = DEFAULT_TIMEOUT;
-
-    /** Minimal pause between checkpointing attempts. */
-    private long minPauseBetweenCheckpoints = DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS;
-
-    /** Maximum number of checkpoint attempts in progress at the same time. */
-    private int maxConcurrentCheckpoints = DEFAULT_MAX_CONCURRENT_CHECKPOINTS;
-
-    /** Flag to force checkpointing in iterative jobs. */
-    private boolean forceCheckpointing;
-
-    /** Flag to force checkpointing in iterative jobs. */
-    private boolean forceUnalignedCheckpoints;
-
-    /** Flag to enable unaligned checkpoints. */
-    private boolean unalignedCheckpointsEnabled;
-
-    /** Id of checkpoint for which in-flight data should be ignored on recovery. */
-    private long checkpointIdOfIgnoredInFlightData =
-            DEFAULT_CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA;
-
-    /** The delay from the start of checkpoint after which AC switches to UC. */
-    private Duration alignedCheckpointTimeout =
-            ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT.defaultValue();
-
-    /** Flag to enable approximate local recovery. */
-    private boolean approximateLocalRecovery;
-
-    /** Cleanup behaviour for persistent checkpoints. */
-    private ExternalizedCheckpointCleanup externalizedCheckpointCleanup =
-            ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT.defaultValue();
+    @Deprecated
+    public static final int DEFAULT_CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA =
+            ExecutionCheckpointingOptions.CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA
+                    .defaultValue()
+                    .intValue();
 
     /**
-     * Task would not fail if there is an error in their checkpointing.
-     *
-     * <p>{@link #tolerableCheckpointFailureNumber} would always overrule this deprecated field if
-     * they have conflicts.
+     * Internal {@link ConfigOption}s, that are not exposed and it's not possible to configure them
+     * via config files. We are defining them here, so that we can store them in the {@link
+     * #configuration}.
      *
-     * @deprecated Use {@link #tolerableCheckpointFailureNumber}.
+     * <p>If you decide to expose any of those {@link ConfigOption}s, please double-check if the
+     * key, type and descriptions are sensible, as the initial values are arbitrary.
      */
-    @Deprecated private boolean failOnCheckpointingErrors = true;
+    // --------------------------------------------------------------------------------------------
+
+    /** @deprecated This will be removed once iterations properly participate in checkpointing. */
+    @Deprecated
+    private static final ConfigOption<Boolean> FORCE_CHECKPOINTING =
+            key("hidden.force.checkpointing")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Flag to force checkpointing in iterative jobs");
+
+    private static final ConfigOption<Boolean> APPROXIMATE_LOCAL_RECOVERY =

Review Comment:
   `ExecutionCheckpointingOptions` and following the naming scheme there:
   `execution.checkpointing.approximate-local-recovery`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski merged pull request #21245: [FLINK-29379][streaming] Migrate CheckpointConfig to use Configuration field and ConfigOption

Posted by GitBox <gi...@apache.org>.
pnowojski merged PR #21245:
URL: https://github.com/apache/flink/pull/21245


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] twalthr commented on a diff in pull request #21245: [FLINK-29379][streaming] Migrate CheckpointConfig to use Configuration field and ConfigOption

Posted by GitBox <gi...@apache.org>.
twalthr commented on code in PR #21245:
URL: https://github.com/apache/flink/pull/21245#discussion_r1015611874


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java:
##########
@@ -154,30 +143,19 @@ public class CheckpointConfig implements java.io.Serializable {
     public CheckpointConfig(final CheckpointConfig checkpointConfig) {
         checkNotNull(checkpointConfig);
 
-        this.checkpointInterval = checkpointConfig.checkpointInterval;
-        this.checkpointingMode = checkpointConfig.checkpointingMode;
-        this.checkpointTimeout = checkpointConfig.checkpointTimeout;
-        this.maxConcurrentCheckpoints = checkpointConfig.maxConcurrentCheckpoints;
-        this.minPauseBetweenCheckpoints = checkpointConfig.minPauseBetweenCheckpoints;
-        this.tolerableCheckpointFailureNumber = checkpointConfig.tolerableCheckpointFailureNumber;
-        this.unalignedCheckpointsEnabled = checkpointConfig.isUnalignedCheckpointsEnabled();
-        this.alignedCheckpointTimeout = checkpointConfig.alignedCheckpointTimeout;
-        this.approximateLocalRecovery = checkpointConfig.isApproximateLocalRecoveryEnabled();
-        this.externalizedCheckpointCleanup = checkpointConfig.externalizedCheckpointCleanup;
-        this.forceCheckpointing = checkpointConfig.forceCheckpointing;
-        this.forceUnalignedCheckpoints = checkpointConfig.forceUnalignedCheckpoints;
+        this.configuration = new Configuration(checkpointConfig.configuration);
         this.storage = checkpointConfig.getCheckpointStorage();
-        this.checkpointIdOfIgnoredInFlightData =
-                checkpointConfig.getCheckpointIdOfIgnoredInFlightData();
     }
 
-    public CheckpointConfig() {}
+    public CheckpointConfig() {
+        configuration = new Configuration();
+    }
 
     // ------------------------------------------------------------------------
 
     /** Disables checkpointing. */
     public void disableCheckpointing() {
-        this.checkpointInterval = -1;
+        configuration.removeConfig(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL);

Review Comment:
   Not sure if `Duration.ofMillis(-1)` is supported by the entire stack: `org.apache.flink.util.TimeUtils#parseDuration`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] twalthr commented on a diff in pull request #21245: [FLINK-29379][streaming] Migrate CheckpointConfig to use Configuration field and ConfigOption

Posted by GitBox <gi...@apache.org>.
twalthr commented on code in PR #21245:
URL: https://github.com/apache/flink/pull/21245#discussion_r1015453438


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java:
##########
@@ -154,30 +143,19 @@ public class CheckpointConfig implements java.io.Serializable {
     public CheckpointConfig(final CheckpointConfig checkpointConfig) {
         checkNotNull(checkpointConfig);
 
-        this.checkpointInterval = checkpointConfig.checkpointInterval;
-        this.checkpointingMode = checkpointConfig.checkpointingMode;
-        this.checkpointTimeout = checkpointConfig.checkpointTimeout;
-        this.maxConcurrentCheckpoints = checkpointConfig.maxConcurrentCheckpoints;
-        this.minPauseBetweenCheckpoints = checkpointConfig.minPauseBetweenCheckpoints;
-        this.tolerableCheckpointFailureNumber = checkpointConfig.tolerableCheckpointFailureNumber;
-        this.unalignedCheckpointsEnabled = checkpointConfig.isUnalignedCheckpointsEnabled();
-        this.alignedCheckpointTimeout = checkpointConfig.alignedCheckpointTimeout;
-        this.approximateLocalRecovery = checkpointConfig.isApproximateLocalRecoveryEnabled();
-        this.externalizedCheckpointCleanup = checkpointConfig.externalizedCheckpointCleanup;
-        this.forceCheckpointing = checkpointConfig.forceCheckpointing;
-        this.forceUnalignedCheckpoints = checkpointConfig.forceUnalignedCheckpoints;
+        this.configuration = new Configuration(checkpointConfig.configuration);
         this.storage = checkpointConfig.getCheckpointStorage();
-        this.checkpointIdOfIgnoredInFlightData =
-                checkpointConfig.getCheckpointIdOfIgnoredInFlightData();
     }
 
-    public CheckpointConfig() {}
+    public CheckpointConfig() {
+        configuration = new Configuration();
+    }
 
     // ------------------------------------------------------------------------
 
     /** Disables checkpointing. */
     public void disableCheckpointing() {
-        this.checkpointInterval = -1;
+        configuration.removeConfig(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL);

Review Comment:
   Even if `CheckpointConfig` is only a view, it is likely that we use `WritableConfig` which has no `removeConfig` contract. At least for now. Every deletion is modelled like a setting right now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] twalthr commented on a diff in pull request #21245: [FLINK-29379][streaming] Migrate CheckpointConfig to use Configuration field and ConfigOption

Posted by GitBox <gi...@apache.org>.
twalthr commented on code in PR #21245:
URL: https://github.com/apache/flink/pull/21245#discussion_r1015615215


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java:
##########
@@ -154,30 +143,19 @@ public class CheckpointConfig implements java.io.Serializable {
     public CheckpointConfig(final CheckpointConfig checkpointConfig) {
         checkNotNull(checkpointConfig);
 
-        this.checkpointInterval = checkpointConfig.checkpointInterval;
-        this.checkpointingMode = checkpointConfig.checkpointingMode;
-        this.checkpointTimeout = checkpointConfig.checkpointTimeout;
-        this.maxConcurrentCheckpoints = checkpointConfig.maxConcurrentCheckpoints;
-        this.minPauseBetweenCheckpoints = checkpointConfig.minPauseBetweenCheckpoints;
-        this.tolerableCheckpointFailureNumber = checkpointConfig.tolerableCheckpointFailureNumber;
-        this.unalignedCheckpointsEnabled = checkpointConfig.isUnalignedCheckpointsEnabled();
-        this.alignedCheckpointTimeout = checkpointConfig.alignedCheckpointTimeout;
-        this.approximateLocalRecovery = checkpointConfig.isApproximateLocalRecoveryEnabled();
-        this.externalizedCheckpointCleanup = checkpointConfig.externalizedCheckpointCleanup;
-        this.forceCheckpointing = checkpointConfig.forceCheckpointing;
-        this.forceUnalignedCheckpoints = checkpointConfig.forceUnalignedCheckpoints;
+        this.configuration = new Configuration(checkpointConfig.configuration);
         this.storage = checkpointConfig.getCheckpointStorage();
-        this.checkpointIdOfIgnoredInFlightData =
-                checkpointConfig.getCheckpointIdOfIgnoredInFlightData();
     }
 
-    public CheckpointConfig() {}
+    public CheckpointConfig() {
+        configuration = new Configuration();
+    }
 
     // ------------------------------------------------------------------------
 
     /** Disables checkpointing. */
     public void disableCheckpointing() {
-        this.checkpointInterval = -1;
+        configuration.removeConfig(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL);

Review Comment:
   I guess 0 would be the better option?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] twalthr commented on a diff in pull request #21245: [FLINK-29379][streaming] Migrate CheckpointConfig to use Configuration field and ConfigOption

Posted by GitBox <gi...@apache.org>.
twalthr commented on code in PR #21245:
URL: https://github.com/apache/flink/pull/21245#discussion_r1015379814


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java:
##########
@@ -154,30 +143,19 @@ public class CheckpointConfig implements java.io.Serializable {
     public CheckpointConfig(final CheckpointConfig checkpointConfig) {
         checkNotNull(checkpointConfig);
 
-        this.checkpointInterval = checkpointConfig.checkpointInterval;
-        this.checkpointingMode = checkpointConfig.checkpointingMode;
-        this.checkpointTimeout = checkpointConfig.checkpointTimeout;
-        this.maxConcurrentCheckpoints = checkpointConfig.maxConcurrentCheckpoints;
-        this.minPauseBetweenCheckpoints = checkpointConfig.minPauseBetweenCheckpoints;
-        this.tolerableCheckpointFailureNumber = checkpointConfig.tolerableCheckpointFailureNumber;
-        this.unalignedCheckpointsEnabled = checkpointConfig.isUnalignedCheckpointsEnabled();
-        this.alignedCheckpointTimeout = checkpointConfig.alignedCheckpointTimeout;
-        this.approximateLocalRecovery = checkpointConfig.isApproximateLocalRecoveryEnabled();
-        this.externalizedCheckpointCleanup = checkpointConfig.externalizedCheckpointCleanup;
-        this.forceCheckpointing = checkpointConfig.forceCheckpointing;
-        this.forceUnalignedCheckpoints = checkpointConfig.forceUnalignedCheckpoints;
+        this.configuration = new Configuration(checkpointConfig.configuration);
         this.storage = checkpointConfig.getCheckpointStorage();
-        this.checkpointIdOfIgnoredInFlightData =
-                checkpointConfig.getCheckpointIdOfIgnoredInFlightData();
     }
 
-    public CheckpointConfig() {}
+    public CheckpointConfig() {
+        configuration = new Configuration();
+    }
 
     // ------------------------------------------------------------------------
 
     /** Disables checkpointing. */
     public void disableCheckpointing() {
-        this.checkpointInterval = -1;
+        configuration.removeConfig(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL);

Review Comment:
   it would be better to have a set operation here, currently a disabling would be a "fallback to cluster config" if we merge configuration in the future.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java:
##########
@@ -66,85 +68,72 @@ public class CheckpointConfig implements java.io.Serializable {
     private static final Logger LOG = LoggerFactory.getLogger(CheckpointConfig.class);
 
     /** The default checkpoint mode: exactly once. */
-    public static final CheckpointingMode DEFAULT_MODE = CheckpointingMode.EXACTLY_ONCE;
+    public static final CheckpointingMode DEFAULT_MODE =
+            ExecutionCheckpointingOptions.CHECKPOINTING_MODE.defaultValue();
 
     /** The default timeout of a checkpoint attempt: 10 minutes. */
-    public static final long DEFAULT_TIMEOUT = 10 * 60 * 1000;
+    @Deprecated
+    public static final long DEFAULT_TIMEOUT =
+            ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT.defaultValue().toMillis();
 
     /** The default minimum pause to be made between checkpoints: none. */
-    public static final long DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS = 0;
+    @Deprecated
+    public static final long DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS =
+            ExecutionCheckpointingOptions.MIN_PAUSE_BETWEEN_CHECKPOINTS.defaultValue().toMillis();
 
     /** The default limit of concurrently happening checkpoints: one. */
-    public static final int DEFAULT_MAX_CONCURRENT_CHECKPOINTS = 1;
+    @Deprecated
+    public static final int DEFAULT_MAX_CONCURRENT_CHECKPOINTS =
+            ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS.defaultValue();
 
-    public static final int UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER = -1;
+    @Deprecated public static final int UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER = -1;
 
     /** Default id of checkpoint for which in-flight data should be ignored on recovery. */
-    public static final int DEFAULT_CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA = -1;
-
-    // ------------------------------------------------------------------------
-
-    /** Checkpointing mode (exactly-once vs. at-least-once). */
-    private CheckpointingMode checkpointingMode = DEFAULT_MODE;
-
-    /** Periodic checkpoint triggering interval. */
-    private long checkpointInterval = -1; // disabled
-
-    /** Maximum time checkpoint may take before being discarded. */
-    private long checkpointTimeout = DEFAULT_TIMEOUT;
-
-    /** Minimal pause between checkpointing attempts. */
-    private long minPauseBetweenCheckpoints = DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS;
-
-    /** Maximum number of checkpoint attempts in progress at the same time. */
-    private int maxConcurrentCheckpoints = DEFAULT_MAX_CONCURRENT_CHECKPOINTS;
-
-    /** Flag to force checkpointing in iterative jobs. */
-    private boolean forceCheckpointing;
-
-    /** Flag to force checkpointing in iterative jobs. */
-    private boolean forceUnalignedCheckpoints;
-
-    /** Flag to enable unaligned checkpoints. */
-    private boolean unalignedCheckpointsEnabled;
-
-    /** Id of checkpoint for which in-flight data should be ignored on recovery. */
-    private long checkpointIdOfIgnoredInFlightData =
-            DEFAULT_CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA;
-
-    /** The delay from the start of checkpoint after which AC switches to UC. */
-    private Duration alignedCheckpointTimeout =
-            ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT.defaultValue();
-
-    /** Flag to enable approximate local recovery. */
-    private boolean approximateLocalRecovery;
-
-    /** Cleanup behaviour for persistent checkpoints. */
-    private ExternalizedCheckpointCleanup externalizedCheckpointCleanup =
-            ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT.defaultValue();
+    @Deprecated
+    public static final int DEFAULT_CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA =
+            ExecutionCheckpointingOptions.CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA
+                    .defaultValue()
+                    .intValue();
 
     /**
-     * Task would not fail if there is an error in their checkpointing.
-     *
-     * <p>{@link #tolerableCheckpointFailureNumber} would always overrule this deprecated field if
-     * they have conflicts.
+     * Internal {@link ConfigOption}s, that are not exposed and it's not possible to configure them
+     * via config files. We are defining them here, so that we can store them in the {@link
+     * #configuration}.
      *
-     * @deprecated Use {@link #tolerableCheckpointFailureNumber}.
+     * <p>If you decide to expose any of those {@link ConfigOption}s, please double-check if the
+     * key, type and descriptions are sensible, as the initial values are arbitrary.
      */
-    @Deprecated private boolean failOnCheckpointingErrors = true;
+    // --------------------------------------------------------------------------------------------
+
+    /** @deprecated This will be removed once iterations properly participate in checkpointing. */
+    @Deprecated
+    private static final ConfigOption<Boolean> FORCE_CHECKPOINTING =
+            key("hidden.force.checkpointing")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Flag to force checkpointing in iterative jobs");
+
+    private static final ConfigOption<Boolean> APPROXIMATE_LOCAL_RECOVERY =

Review Comment:
   We should avoid hidden config options. Give them a proper key, move them into the `...Options` class they belong and use annotations to hide them from the docs generation. Otherwise it is very difficult to find them across the code base, we recently did in the same in the planner that also had internal options.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java:
##########
@@ -186,7 +164,7 @@ public void disableCheckpointing() {
      * @return True if checkpointing is enables, false otherwise.
      */
     public boolean isCheckpointingEnabled() {
-        return checkpointInterval > 0;
+        return configuration.contains(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL);

Review Comment:
   How about we introduce a new config option `checkpoint.enabled`? This would fix this issue properly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #21245: [FLINK-29379][streaming] Migrate CheckpointConfig to use Configuration field and ConfigOption

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #21245:
URL: https://github.com/apache/flink/pull/21245#discussion_r1016303017


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java:
##########
@@ -154,30 +143,19 @@ public class CheckpointConfig implements java.io.Serializable {
     public CheckpointConfig(final CheckpointConfig checkpointConfig) {
         checkNotNull(checkpointConfig);
 
-        this.checkpointInterval = checkpointConfig.checkpointInterval;
-        this.checkpointingMode = checkpointConfig.checkpointingMode;
-        this.checkpointTimeout = checkpointConfig.checkpointTimeout;
-        this.maxConcurrentCheckpoints = checkpointConfig.maxConcurrentCheckpoints;
-        this.minPauseBetweenCheckpoints = checkpointConfig.minPauseBetweenCheckpoints;
-        this.tolerableCheckpointFailureNumber = checkpointConfig.tolerableCheckpointFailureNumber;
-        this.unalignedCheckpointsEnabled = checkpointConfig.isUnalignedCheckpointsEnabled();
-        this.alignedCheckpointTimeout = checkpointConfig.alignedCheckpointTimeout;
-        this.approximateLocalRecovery = checkpointConfig.isApproximateLocalRecoveryEnabled();
-        this.externalizedCheckpointCleanup = checkpointConfig.externalizedCheckpointCleanup;
-        this.forceCheckpointing = checkpointConfig.forceCheckpointing;
-        this.forceUnalignedCheckpoints = checkpointConfig.forceUnalignedCheckpoints;
+        this.configuration = new Configuration(checkpointConfig.configuration);
         this.storage = checkpointConfig.getCheckpointStorage();
-        this.checkpointIdOfIgnoredInFlightData =
-                checkpointConfig.getCheckpointIdOfIgnoredInFlightData();
     }
 
-    public CheckpointConfig() {}
+    public CheckpointConfig() {
+        configuration = new Configuration();
+    }
 
     // ------------------------------------------------------------------------
 
     /** Disables checkpointing. */
     public void disableCheckpointing() {
-        this.checkpointInterval = -1;
+        configuration.removeConfig(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL);

Review Comment:
   In the offline discussion we decided to leave it as it was (with the `removeConfig` option), and deal with it later, if it causes a problem.
   
   - @twalthr didn't want to change the returned value from `StreamExecutionEnvironment` and `CheckpointConfig` to `0`
   - I didn't want to have `StreamExecutionEnvironment` and `CheckpointConfig` to return `-1` while `ConfigOption` return `0`
   - Adding support for negative values in the `ConfigOption` would be problematic, as it would have to be done in such way, that all already existing `ConfigOption` would have to fail/reject negative values.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] twalthr commented on a diff in pull request #21245: [FLINK-29379][streaming] Migrate CheckpointConfig to use Configuration field and ConfigOption

Posted by GitBox <gi...@apache.org>.
twalthr commented on code in PR #21245:
URL: https://github.com/apache/flink/pull/21245#discussion_r1015614089


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java:
##########
@@ -154,30 +143,19 @@ public class CheckpointConfig implements java.io.Serializable {
     public CheckpointConfig(final CheckpointConfig checkpointConfig) {
         checkNotNull(checkpointConfig);
 
-        this.checkpointInterval = checkpointConfig.checkpointInterval;
-        this.checkpointingMode = checkpointConfig.checkpointingMode;
-        this.checkpointTimeout = checkpointConfig.checkpointTimeout;
-        this.maxConcurrentCheckpoints = checkpointConfig.maxConcurrentCheckpoints;
-        this.minPauseBetweenCheckpoints = checkpointConfig.minPauseBetweenCheckpoints;
-        this.tolerableCheckpointFailureNumber = checkpointConfig.tolerableCheckpointFailureNumber;
-        this.unalignedCheckpointsEnabled = checkpointConfig.isUnalignedCheckpointsEnabled();
-        this.alignedCheckpointTimeout = checkpointConfig.alignedCheckpointTimeout;
-        this.approximateLocalRecovery = checkpointConfig.isApproximateLocalRecoveryEnabled();
-        this.externalizedCheckpointCleanup = checkpointConfig.externalizedCheckpointCleanup;
-        this.forceCheckpointing = checkpointConfig.forceCheckpointing;
-        this.forceUnalignedCheckpoints = checkpointConfig.forceUnalignedCheckpoints;
+        this.configuration = new Configuration(checkpointConfig.configuration);
         this.storage = checkpointConfig.getCheckpointStorage();
-        this.checkpointIdOfIgnoredInFlightData =
-                checkpointConfig.getCheckpointIdOfIgnoredInFlightData();
     }
 
-    public CheckpointConfig() {}
+    public CheckpointConfig() {
+        configuration = new Configuration();
+    }
 
     // ------------------------------------------------------------------------
 
     /** Disables checkpointing. */
     public void disableCheckpointing() {
-        this.checkpointInterval = -1;
+        configuration.removeConfig(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL);

Review Comment:
   Also `org.apache.flink.util.TimeUtils#formatWithHighestUnit` could lead to issues if `ExecutionConfig.toConfiguration().toMap`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #21245: [FLINK-29379][streaming] Migrate CheckpointConfig to use Configuration field and ConfigOption

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #21245:
URL: https://github.com/apache/flink/pull/21245#issuecomment-1304887277

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c050e2fa2a97fbbe07f3ac2be997ddec1bc2535a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c050e2fa2a97fbbe07f3ac2be997ddec1bc2535a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c050e2fa2a97fbbe07f3ac2be997ddec1bc2535a UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] twalthr commented on a diff in pull request #21245: [FLINK-29379][streaming] Migrate CheckpointConfig to use Configuration field and ConfigOption

Posted by GitBox <gi...@apache.org>.
twalthr commented on code in PR #21245:
URL: https://github.com/apache/flink/pull/21245#discussion_r1015607397


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java:
##########
@@ -252,4 +255,29 @@ public class ExecutionCheckpointingOptions {
                                                     "{{.Site.BaseURL}}{{.Site.LanguagePrefix}}/docs/dev/datastream/fault-tolerance/checkpointing/#checkpointing-with-parts-of-the-graph-finished-beta",
                                                     "the important considerations"))
                                     .build());
+
+    /**
+     * Access to this option is officially only supported via {@link
+     * CheckpointConfig#setForceCheckpointing(boolean)}, but there is no good reason behind this.
+     *
+     * @deprecated This will be removed once iterations properly participate in checkpointing.
+     */
+    @Internal @Deprecated @Documentation.ExcludeFromDocumentation
+    public static final ConfigOption<Boolean> FORCE_CHECKPOINTING =
+            key("execution.checkpointing.force")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Flag to force checkpointing in iterative jobs.");
+
+    /**
+     * Access to this option is officially only supported via {@link
+     * CheckpointConfig#enableApproximateLocalRecovery(boolean)}, but there is no good reason behind
+     * this.
+     */
+    @Internal @Documentation.ExcludeFromDocumentation
+    public static final ConfigOption<Boolean> APPROXIMATE_LOCAL_RECOVERY =
+            key("execution.checkpointing..approximate.local.recovery")

Review Comment:
   as mentioned in the other PR, does not adhere to naming conventions and has a typo `..`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #21245: [FLINK-29379][streaming] Migrate CheckpointConfig to use Configuration field and ConfigOption

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #21245:
URL: https://github.com/apache/flink/pull/21245#discussion_r1015386009


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java:
##########
@@ -186,7 +164,7 @@ public void disableCheckpointing() {
      * @return True if checkpointing is enables, false otherwise.
      */
     public boolean isCheckpointingEnabled() {
-        return checkpointInterval > 0;
+        return configuration.contains(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL);

Review Comment:
   IIRC we'd have to touch other components as well that expect the interval to be > 0 if checkpointing is enabled. like the checkpoint coordinator maybe?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #21245: [FLINK-29379][streaming] Migrate CheckpointConfig to use Configuration field and ConfigOption

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #21245:
URL: https://github.com/apache/flink/pull/21245#discussion_r1015408117


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java:
##########
@@ -66,85 +68,72 @@ public class CheckpointConfig implements java.io.Serializable {
     private static final Logger LOG = LoggerFactory.getLogger(CheckpointConfig.class);
 
     /** The default checkpoint mode: exactly once. */
-    public static final CheckpointingMode DEFAULT_MODE = CheckpointingMode.EXACTLY_ONCE;
+    public static final CheckpointingMode DEFAULT_MODE =
+            ExecutionCheckpointingOptions.CHECKPOINTING_MODE.defaultValue();
 
     /** The default timeout of a checkpoint attempt: 10 minutes. */
-    public static final long DEFAULT_TIMEOUT = 10 * 60 * 1000;
+    @Deprecated
+    public static final long DEFAULT_TIMEOUT =
+            ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT.defaultValue().toMillis();
 
     /** The default minimum pause to be made between checkpoints: none. */
-    public static final long DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS = 0;
+    @Deprecated
+    public static final long DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS =
+            ExecutionCheckpointingOptions.MIN_PAUSE_BETWEEN_CHECKPOINTS.defaultValue().toMillis();
 
     /** The default limit of concurrently happening checkpoints: one. */
-    public static final int DEFAULT_MAX_CONCURRENT_CHECKPOINTS = 1;
+    @Deprecated
+    public static final int DEFAULT_MAX_CONCURRENT_CHECKPOINTS =
+            ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS.defaultValue();
 
-    public static final int UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER = -1;
+    @Deprecated public static final int UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER = -1;
 
     /** Default id of checkpoint for which in-flight data should be ignored on recovery. */
-    public static final int DEFAULT_CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA = -1;
-
-    // ------------------------------------------------------------------------
-
-    /** Checkpointing mode (exactly-once vs. at-least-once). */
-    private CheckpointingMode checkpointingMode = DEFAULT_MODE;
-
-    /** Periodic checkpoint triggering interval. */
-    private long checkpointInterval = -1; // disabled
-
-    /** Maximum time checkpoint may take before being discarded. */
-    private long checkpointTimeout = DEFAULT_TIMEOUT;
-
-    /** Minimal pause between checkpointing attempts. */
-    private long minPauseBetweenCheckpoints = DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS;
-
-    /** Maximum number of checkpoint attempts in progress at the same time. */
-    private int maxConcurrentCheckpoints = DEFAULT_MAX_CONCURRENT_CHECKPOINTS;
-
-    /** Flag to force checkpointing in iterative jobs. */
-    private boolean forceCheckpointing;
-
-    /** Flag to force checkpointing in iterative jobs. */
-    private boolean forceUnalignedCheckpoints;
-
-    /** Flag to enable unaligned checkpoints. */
-    private boolean unalignedCheckpointsEnabled;
-
-    /** Id of checkpoint for which in-flight data should be ignored on recovery. */
-    private long checkpointIdOfIgnoredInFlightData =
-            DEFAULT_CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA;
-
-    /** The delay from the start of checkpoint after which AC switches to UC. */
-    private Duration alignedCheckpointTimeout =
-            ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT.defaultValue();
-
-    /** Flag to enable approximate local recovery. */
-    private boolean approximateLocalRecovery;
-
-    /** Cleanup behaviour for persistent checkpoints. */
-    private ExternalizedCheckpointCleanup externalizedCheckpointCleanup =
-            ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT.defaultValue();
+    @Deprecated
+    public static final int DEFAULT_CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA =
+            ExecutionCheckpointingOptions.CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA
+                    .defaultValue()
+                    .intValue();
 
     /**
-     * Task would not fail if there is an error in their checkpointing.
-     *
-     * <p>{@link #tolerableCheckpointFailureNumber} would always overrule this deprecated field if
-     * they have conflicts.
+     * Internal {@link ConfigOption}s, that are not exposed and it's not possible to configure them
+     * via config files. We are defining them here, so that we can store them in the {@link
+     * #configuration}.
      *
-     * @deprecated Use {@link #tolerableCheckpointFailureNumber}.
+     * <p>If you decide to expose any of those {@link ConfigOption}s, please double-check if the
+     * key, type and descriptions are sensible, as the initial values are arbitrary.
      */
-    @Deprecated private boolean failOnCheckpointingErrors = true;
+    // --------------------------------------------------------------------------------------------
+
+    /** @deprecated This will be removed once iterations properly participate in checkpointing. */
+    @Deprecated
+    private static final ConfigOption<Boolean> FORCE_CHECKPOINTING =
+            key("hidden.force.checkpointing")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Flag to force checkpointing in iterative jobs");
+
+    private static final ConfigOption<Boolean> APPROXIMATE_LOCAL_RECOVERY =

Review Comment:
   where would you propose to place them?



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java:
##########
@@ -66,85 +68,72 @@ public class CheckpointConfig implements java.io.Serializable {
     private static final Logger LOG = LoggerFactory.getLogger(CheckpointConfig.class);
 
     /** The default checkpoint mode: exactly once. */
-    public static final CheckpointingMode DEFAULT_MODE = CheckpointingMode.EXACTLY_ONCE;
+    public static final CheckpointingMode DEFAULT_MODE =
+            ExecutionCheckpointingOptions.CHECKPOINTING_MODE.defaultValue();
 
     /** The default timeout of a checkpoint attempt: 10 minutes. */
-    public static final long DEFAULT_TIMEOUT = 10 * 60 * 1000;
+    @Deprecated
+    public static final long DEFAULT_TIMEOUT =
+            ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT.defaultValue().toMillis();
 
     /** The default minimum pause to be made between checkpoints: none. */
-    public static final long DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS = 0;
+    @Deprecated
+    public static final long DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS =
+            ExecutionCheckpointingOptions.MIN_PAUSE_BETWEEN_CHECKPOINTS.defaultValue().toMillis();
 
     /** The default limit of concurrently happening checkpoints: one. */
-    public static final int DEFAULT_MAX_CONCURRENT_CHECKPOINTS = 1;
+    @Deprecated
+    public static final int DEFAULT_MAX_CONCURRENT_CHECKPOINTS =
+            ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS.defaultValue();
 
-    public static final int UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER = -1;
+    @Deprecated public static final int UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER = -1;
 
     /** Default id of checkpoint for which in-flight data should be ignored on recovery. */
-    public static final int DEFAULT_CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA = -1;
-
-    // ------------------------------------------------------------------------
-
-    /** Checkpointing mode (exactly-once vs. at-least-once). */
-    private CheckpointingMode checkpointingMode = DEFAULT_MODE;
-
-    /** Periodic checkpoint triggering interval. */
-    private long checkpointInterval = -1; // disabled
-
-    /** Maximum time checkpoint may take before being discarded. */
-    private long checkpointTimeout = DEFAULT_TIMEOUT;
-
-    /** Minimal pause between checkpointing attempts. */
-    private long minPauseBetweenCheckpoints = DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS;
-
-    /** Maximum number of checkpoint attempts in progress at the same time. */
-    private int maxConcurrentCheckpoints = DEFAULT_MAX_CONCURRENT_CHECKPOINTS;
-
-    /** Flag to force checkpointing in iterative jobs. */
-    private boolean forceCheckpointing;
-
-    /** Flag to force checkpointing in iterative jobs. */
-    private boolean forceUnalignedCheckpoints;
-
-    /** Flag to enable unaligned checkpoints. */
-    private boolean unalignedCheckpointsEnabled;
-
-    /** Id of checkpoint for which in-flight data should be ignored on recovery. */
-    private long checkpointIdOfIgnoredInFlightData =
-            DEFAULT_CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA;
-
-    /** The delay from the start of checkpoint after which AC switches to UC. */
-    private Duration alignedCheckpointTimeout =
-            ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT.defaultValue();
-
-    /** Flag to enable approximate local recovery. */
-    private boolean approximateLocalRecovery;
-
-    /** Cleanup behaviour for persistent checkpoints. */
-    private ExternalizedCheckpointCleanup externalizedCheckpointCleanup =
-            ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT.defaultValue();
+    @Deprecated
+    public static final int DEFAULT_CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA =
+            ExecutionCheckpointingOptions.CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA
+                    .defaultValue()
+                    .intValue();
 
     /**
-     * Task would not fail if there is an error in their checkpointing.
-     *
-     * <p>{@link #tolerableCheckpointFailureNumber} would always overrule this deprecated field if
-     * they have conflicts.
+     * Internal {@link ConfigOption}s, that are not exposed and it's not possible to configure them
+     * via config files. We are defining them here, so that we can store them in the {@link
+     * #configuration}.
      *
-     * @deprecated Use {@link #tolerableCheckpointFailureNumber}.
+     * <p>If you decide to expose any of those {@link ConfigOption}s, please double-check if the
+     * key, type and descriptions are sensible, as the initial values are arbitrary.
      */
-    @Deprecated private boolean failOnCheckpointingErrors = true;
+    // --------------------------------------------------------------------------------------------
+
+    /** @deprecated This will be removed once iterations properly participate in checkpointing. */
+    @Deprecated
+    private static final ConfigOption<Boolean> FORCE_CHECKPOINTING =
+            key("hidden.force.checkpointing")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Flag to force checkpointing in iterative jobs");
+
+    private static final ConfigOption<Boolean> APPROXIMATE_LOCAL_RECOVERY =

Review Comment:
   Where would you propose to place them and how to name them?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #21245: [FLINK-29379][streaming] Migrate CheckpointConfig to use Configuration field and ConfigOption

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #21245:
URL: https://github.com/apache/flink/pull/21245#discussion_r1015409917


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java:
##########
@@ -154,30 +143,19 @@ public class CheckpointConfig implements java.io.Serializable {
     public CheckpointConfig(final CheckpointConfig checkpointConfig) {
         checkNotNull(checkpointConfig);
 
-        this.checkpointInterval = checkpointConfig.checkpointInterval;
-        this.checkpointingMode = checkpointConfig.checkpointingMode;
-        this.checkpointTimeout = checkpointConfig.checkpointTimeout;
-        this.maxConcurrentCheckpoints = checkpointConfig.maxConcurrentCheckpoints;
-        this.minPauseBetweenCheckpoints = checkpointConfig.minPauseBetweenCheckpoints;
-        this.tolerableCheckpointFailureNumber = checkpointConfig.tolerableCheckpointFailureNumber;
-        this.unalignedCheckpointsEnabled = checkpointConfig.isUnalignedCheckpointsEnabled();
-        this.alignedCheckpointTimeout = checkpointConfig.alignedCheckpointTimeout;
-        this.approximateLocalRecovery = checkpointConfig.isApproximateLocalRecoveryEnabled();
-        this.externalizedCheckpointCleanup = checkpointConfig.externalizedCheckpointCleanup;
-        this.forceCheckpointing = checkpointConfig.forceCheckpointing;
-        this.forceUnalignedCheckpoints = checkpointConfig.forceUnalignedCheckpoints;
+        this.configuration = new Configuration(checkpointConfig.configuration);
         this.storage = checkpointConfig.getCheckpointStorage();
-        this.checkpointIdOfIgnoredInFlightData =
-                checkpointConfig.getCheckpointIdOfIgnoredInFlightData();
     }
 
-    public CheckpointConfig() {}
+    public CheckpointConfig() {
+        configuration = new Configuration();
+    }
 
     // ------------------------------------------------------------------------
 
     /** Disables checkpointing. */
     public void disableCheckpointing() {
-        this.checkpointInterval = -1;
+        configuration.removeConfig(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL);

Review Comment:
   ~agrrr, good catch.~
   
   edit: I've misunderstood the issue. Actually I don't see a problem. If anything, next step that I could see is to actually make sure that `CheckpointConfig` and `ExecutionConfig` are using the same `Configuration` instance as the environment, so there would be no merging, and deleting would work as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #21245: [FLINK-29379][streaming] Migrate CheckpointConfig to use Configuration field and ConfigOption

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #21245:
URL: https://github.com/apache/flink/pull/21245#discussion_r1015409917


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java:
##########
@@ -154,30 +143,19 @@ public class CheckpointConfig implements java.io.Serializable {
     public CheckpointConfig(final CheckpointConfig checkpointConfig) {
         checkNotNull(checkpointConfig);
 
-        this.checkpointInterval = checkpointConfig.checkpointInterval;
-        this.checkpointingMode = checkpointConfig.checkpointingMode;
-        this.checkpointTimeout = checkpointConfig.checkpointTimeout;
-        this.maxConcurrentCheckpoints = checkpointConfig.maxConcurrentCheckpoints;
-        this.minPauseBetweenCheckpoints = checkpointConfig.minPauseBetweenCheckpoints;
-        this.tolerableCheckpointFailureNumber = checkpointConfig.tolerableCheckpointFailureNumber;
-        this.unalignedCheckpointsEnabled = checkpointConfig.isUnalignedCheckpointsEnabled();
-        this.alignedCheckpointTimeout = checkpointConfig.alignedCheckpointTimeout;
-        this.approximateLocalRecovery = checkpointConfig.isApproximateLocalRecoveryEnabled();
-        this.externalizedCheckpointCleanup = checkpointConfig.externalizedCheckpointCleanup;
-        this.forceCheckpointing = checkpointConfig.forceCheckpointing;
-        this.forceUnalignedCheckpoints = checkpointConfig.forceUnalignedCheckpoints;
+        this.configuration = new Configuration(checkpointConfig.configuration);
         this.storage = checkpointConfig.getCheckpointStorage();
-        this.checkpointIdOfIgnoredInFlightData =
-                checkpointConfig.getCheckpointIdOfIgnoredInFlightData();
     }
 
-    public CheckpointConfig() {}
+    public CheckpointConfig() {
+        configuration = new Configuration();
+    }
 
     // ------------------------------------------------------------------------
 
     /** Disables checkpointing. */
     public void disableCheckpointing() {
-        this.checkpointInterval = -1;
+        configuration.removeConfig(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL);

Review Comment:
   agrrr, good catch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #21245: [FLINK-29379][streaming] Migrate CheckpointConfig to use Configuration field and ConfigOption

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #21245:
URL: https://github.com/apache/flink/pull/21245#discussion_r1015406636


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java:
##########
@@ -186,7 +164,7 @@ public void disableCheckpointing() {
      * @return True if checkpointing is enables, false otherwise.
      */
     public boolean isCheckpointingEnabled() {
-        return checkpointInterval > 0;
+        return configuration.contains(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL);

Review Comment:
   Yes, and I'm -1 for adding an extra `ConfigOption` that would only confuse users, break compatibility and allow for inconsistencies:
   - what if user modifies only `checkpoint.enabled` but doesn't set the interval by mistake?
   - what if user does the opposite?
   
   If anything, I would be ok with a more fancy single property that allows `manual`, `disabled` or `int` for the interval, but that would be quite a bit of work that's IMO not worth it. `Long.MAX_VALUE` for those two power users that will want to to manually trigger checkpoints is perfectly fine for me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org