You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/11/07 12:48:22 UTC

[GitHub] [flink] twalthr commented on a diff in pull request #21245: [FLINK-29379][streaming] Migrate CheckpointConfig to use Configuration field and ConfigOption

twalthr commented on code in PR #21245:
URL: https://github.com/apache/flink/pull/21245#discussion_r1015379814


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java:
##########
@@ -154,30 +143,19 @@ public class CheckpointConfig implements java.io.Serializable {
     public CheckpointConfig(final CheckpointConfig checkpointConfig) {
         checkNotNull(checkpointConfig);
 
-        this.checkpointInterval = checkpointConfig.checkpointInterval;
-        this.checkpointingMode = checkpointConfig.checkpointingMode;
-        this.checkpointTimeout = checkpointConfig.checkpointTimeout;
-        this.maxConcurrentCheckpoints = checkpointConfig.maxConcurrentCheckpoints;
-        this.minPauseBetweenCheckpoints = checkpointConfig.minPauseBetweenCheckpoints;
-        this.tolerableCheckpointFailureNumber = checkpointConfig.tolerableCheckpointFailureNumber;
-        this.unalignedCheckpointsEnabled = checkpointConfig.isUnalignedCheckpointsEnabled();
-        this.alignedCheckpointTimeout = checkpointConfig.alignedCheckpointTimeout;
-        this.approximateLocalRecovery = checkpointConfig.isApproximateLocalRecoveryEnabled();
-        this.externalizedCheckpointCleanup = checkpointConfig.externalizedCheckpointCleanup;
-        this.forceCheckpointing = checkpointConfig.forceCheckpointing;
-        this.forceUnalignedCheckpoints = checkpointConfig.forceUnalignedCheckpoints;
+        this.configuration = new Configuration(checkpointConfig.configuration);
         this.storage = checkpointConfig.getCheckpointStorage();
-        this.checkpointIdOfIgnoredInFlightData =
-                checkpointConfig.getCheckpointIdOfIgnoredInFlightData();
     }
 
-    public CheckpointConfig() {}
+    public CheckpointConfig() {
+        configuration = new Configuration();
+    }
 
     // ------------------------------------------------------------------------
 
     /** Disables checkpointing. */
     public void disableCheckpointing() {
-        this.checkpointInterval = -1;
+        configuration.removeConfig(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL);

Review Comment:
   it would be better to have a set operation here, currently a disabling would be a "fallback to cluster config" if we merge configuration in the future.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java:
##########
@@ -66,85 +68,72 @@ public class CheckpointConfig implements java.io.Serializable {
     private static final Logger LOG = LoggerFactory.getLogger(CheckpointConfig.class);
 
     /** The default checkpoint mode: exactly once. */
-    public static final CheckpointingMode DEFAULT_MODE = CheckpointingMode.EXACTLY_ONCE;
+    public static final CheckpointingMode DEFAULT_MODE =
+            ExecutionCheckpointingOptions.CHECKPOINTING_MODE.defaultValue();
 
     /** The default timeout of a checkpoint attempt: 10 minutes. */
-    public static final long DEFAULT_TIMEOUT = 10 * 60 * 1000;
+    @Deprecated
+    public static final long DEFAULT_TIMEOUT =
+            ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT.defaultValue().toMillis();
 
     /** The default minimum pause to be made between checkpoints: none. */
-    public static final long DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS = 0;
+    @Deprecated
+    public static final long DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS =
+            ExecutionCheckpointingOptions.MIN_PAUSE_BETWEEN_CHECKPOINTS.defaultValue().toMillis();
 
     /** The default limit of concurrently happening checkpoints: one. */
-    public static final int DEFAULT_MAX_CONCURRENT_CHECKPOINTS = 1;
+    @Deprecated
+    public static final int DEFAULT_MAX_CONCURRENT_CHECKPOINTS =
+            ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS.defaultValue();
 
-    public static final int UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER = -1;
+    @Deprecated public static final int UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER = -1;
 
     /** Default id of checkpoint for which in-flight data should be ignored on recovery. */
-    public static final int DEFAULT_CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA = -1;
-
-    // ------------------------------------------------------------------------
-
-    /** Checkpointing mode (exactly-once vs. at-least-once). */
-    private CheckpointingMode checkpointingMode = DEFAULT_MODE;
-
-    /** Periodic checkpoint triggering interval. */
-    private long checkpointInterval = -1; // disabled
-
-    /** Maximum time checkpoint may take before being discarded. */
-    private long checkpointTimeout = DEFAULT_TIMEOUT;
-
-    /** Minimal pause between checkpointing attempts. */
-    private long minPauseBetweenCheckpoints = DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS;
-
-    /** Maximum number of checkpoint attempts in progress at the same time. */
-    private int maxConcurrentCheckpoints = DEFAULT_MAX_CONCURRENT_CHECKPOINTS;
-
-    /** Flag to force checkpointing in iterative jobs. */
-    private boolean forceCheckpointing;
-
-    /** Flag to force checkpointing in iterative jobs. */
-    private boolean forceUnalignedCheckpoints;
-
-    /** Flag to enable unaligned checkpoints. */
-    private boolean unalignedCheckpointsEnabled;
-
-    /** Id of checkpoint for which in-flight data should be ignored on recovery. */
-    private long checkpointIdOfIgnoredInFlightData =
-            DEFAULT_CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA;
-
-    /** The delay from the start of checkpoint after which AC switches to UC. */
-    private Duration alignedCheckpointTimeout =
-            ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT.defaultValue();
-
-    /** Flag to enable approximate local recovery. */
-    private boolean approximateLocalRecovery;
-
-    /** Cleanup behaviour for persistent checkpoints. */
-    private ExternalizedCheckpointCleanup externalizedCheckpointCleanup =
-            ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT.defaultValue();
+    @Deprecated
+    public static final int DEFAULT_CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA =
+            ExecutionCheckpointingOptions.CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA
+                    .defaultValue()
+                    .intValue();
 
     /**
-     * Task would not fail if there is an error in their checkpointing.
-     *
-     * <p>{@link #tolerableCheckpointFailureNumber} would always overrule this deprecated field if
-     * they have conflicts.
+     * Internal {@link ConfigOption}s, that are not exposed and it's not possible to configure them
+     * via config files. We are defining them here, so that we can store them in the {@link
+     * #configuration}.
      *
-     * @deprecated Use {@link #tolerableCheckpointFailureNumber}.
+     * <p>If you decide to expose any of those {@link ConfigOption}s, please double-check if the
+     * key, type and descriptions are sensible, as the initial values are arbitrary.
      */
-    @Deprecated private boolean failOnCheckpointingErrors = true;
+    // --------------------------------------------------------------------------------------------
+
+    /** @deprecated This will be removed once iterations properly participate in checkpointing. */
+    @Deprecated
+    private static final ConfigOption<Boolean> FORCE_CHECKPOINTING =
+            key("hidden.force.checkpointing")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Flag to force checkpointing in iterative jobs");
+
+    private static final ConfigOption<Boolean> APPROXIMATE_LOCAL_RECOVERY =

Review Comment:
   We should avoid hidden config options. Give them a proper key, move them into the `...Options` class they belong and use annotations to hide them from the docs generation. Otherwise it is very difficult to find them across the code base, we recently did in the same in the planner that also had internal options.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java:
##########
@@ -186,7 +164,7 @@ public void disableCheckpointing() {
      * @return True if checkpointing is enables, false otherwise.
      */
     public boolean isCheckpointingEnabled() {
-        return checkpointInterval > 0;
+        return configuration.contains(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL);

Review Comment:
   How about we introduce a new config option `checkpoint.enabled`? This would fix this issue properly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org