You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ai...@apache.org on 2021/12/10 11:00:34 UTC

[flink] branch master updated (dfb0bfc -> ef30fe5)

This is an automated email from the ASF dual-hosted git repository.

airblader pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from dfb0bfc  [FLINK-25241][hotfix] Remove resolved violations
     new f4db43a  [FLINK-24987][streaming-java] Add explicit enum value NO_EXTERNAL_CHECKPOINTS as default for externalized-checkpoint-retention
     new ef30fe5  [FLINK-24987][docs] Improve ExternalizedCheckpointCleanup documentation

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../execution_checkpointing_configuration.html     |  4 +-
 .../reader/CoordinatedSourceRescaleITCase.java     |  2 +-
 .../tests/DataStreamAllroundTestJobFactory.java    |  2 +-
 .../StickyAllocationAndLocalRecoveryTestJob.java   |  2 +-
 .../pyflink/datastream/checkpoint_config.py        | 57 ++++++++++++++--
 .../datastream/tests/test_check_point_config.py    |  3 +-
 .../api/environment/CheckpointConfig.java          | 76 ++++++++++++++++++----
 .../environment/ExecutionCheckpointingOptions.java |  6 +-
 .../CheckpointConfigFromConfigurationTest.java     |  2 +-
 .../test/checkpointing/RegionFailoverITCase.java   |  2 +-
 .../ResumeCheckpointManuallyITCase.java            |  2 +-
 .../UnalignedCheckpointCompatibilityITCase.java    |  2 +-
 .../UnalignedCheckpointStressITCase.java           |  2 +-
 .../checkpointing/UnalignedCheckpointTestBase.java |  2 +-
 14 files changed, 132 insertions(+), 32 deletions(-)

[flink] 02/02: [FLINK-24987][docs] Improve ExternalizedCheckpointCleanup documentation

Posted by ai...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

airblader pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ef30fe5b0d01ec901e7e666ee3c5bc0cb51e430d
Author: Nicolaus Weidner <ni...@ververica.com>
AuthorDate: Mon Nov 29 15:04:17 2021 +0100

    [FLINK-24987][docs] Improve ExternalizedCheckpointCleanup documentation
---
 .../execution_checkpointing_configuration.html     |  2 +-
 .../api/environment/CheckpointConfig.java          | 28 ++++++++++++++++++----
 2 files changed, 25 insertions(+), 5 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html b/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html
index 836b2e0..a9fa7ca 100644
--- a/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html
+++ b/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html
@@ -24,7 +24,7 @@
             <td><h5>execution.checkpointing.externalized-checkpoint-retention</h5></td>
             <td style="word-wrap: break-word;">NO_EXTERNALIZED_CHECKPOINTS</td>
             <td><p>Enum</p></td>
-            <td>Externalized checkpoints write their meta data out to persistent storage and are not automatically cleaned up when the owning job fails or is suspended (terminating with job status <code class="highlighter-rouge">JobStatus#FAILED</code> or <code class="highlighter-rouge">JobStatus#SUSPENDED</code>). In this case, you have to manually clean up the checkpoint state, both the meta data and actual program state.<br /><br />The mode defines how an externalized checkpoint shoul [...]
+            <td>Externalized checkpoints write their meta data out to persistent storage and are not automatically cleaned up when the owning job fails or is suspended (terminating with job status <code class="highlighter-rouge">JobStatus#FAILED</code> or <code class="highlighter-rouge">JobStatus#SUSPENDED</code>). In this case, you have to manually clean up the checkpoint state, both the meta data and actual program state.<br /><br />The mode defines how an externalized checkpoint shoul [...]
         </tr>
         <tr>
             <td><h5>execution.checkpointing.interval</h5></td>
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index b2a2d8a..8983baa 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -19,11 +19,14 @@
 package org.apache.flink.streaming.api.environment;
 
 import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.DescribedEnum;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.configuration.description.InlineElement;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.StateBackend;
@@ -40,6 +43,7 @@ import java.net.URI;
 import java.time.Duration;
 
 import static java.util.Objects.requireNonNull;
+import static org.apache.flink.configuration.description.TextElement.text;
 import static org.apache.flink.runtime.checkpoint.CheckpointFailureManager.UNLIMITED_TOLERABLE_FAILURE_NUMBER;
 import static org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration.MINIMAL_CHECKPOINT_TIME;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -733,7 +737,7 @@ public class CheckpointConfig implements java.io.Serializable {
 
     /** Cleanup behaviour for externalized checkpoints when the job is cancelled. */
     @PublicEvolving
-    public enum ExternalizedCheckpointCleanup {
+    public enum ExternalizedCheckpointCleanup implements DescribedEnum {
 
         /**
          * Delete externalized checkpoints on job cancellation.
@@ -745,7 +749,10 @@ public class CheckpointConfig implements java.io.Serializable {
          * <p>Note that checkpoint state is always kept if the job terminates with state {@link
          * JobStatus#FAILED}.
          */
-        DELETE_ON_CANCELLATION,
+        DELETE_ON_CANCELLATION(
+                text(
+                        "Checkpoint state is only kept when the owning job fails. It is deleted if "
+                                + "the job is cancelled.")),
 
         /**
          * Retain externalized checkpoints on job cancellation.
@@ -756,10 +763,17 @@ public class CheckpointConfig implements java.io.Serializable {
          * <p>Note that checkpoint state is always kept if the job terminates with state {@link
          * JobStatus#FAILED}.
          */
-        RETAIN_ON_CANCELLATION,
+        RETAIN_ON_CANCELLATION(
+                text("Checkpoint state is kept when the owning job is cancelled or fails.")),
 
         /** Externalized checkpoints are disabled completely. */
-        NO_EXTERNALIZED_CHECKPOINTS;
+        NO_EXTERNALIZED_CHECKPOINTS(text("Externalized checkpoints are disabled."));
+
+        private final InlineElement description;
+
+        ExternalizedCheckpointCleanup(InlineElement description) {
+            this.description = description;
+        }
 
         /**
          * Returns whether persistent checkpoints shall be discarded on cancellation of the job.
@@ -770,6 +784,12 @@ public class CheckpointConfig implements java.io.Serializable {
         public boolean deleteOnCancellation() {
             return this == DELETE_ON_CANCELLATION;
         }
+
+        @Override
+        @Internal
+        public InlineElement getDescription() {
+            return description;
+        }
     }
 
     /**

[flink] 01/02: [FLINK-24987][streaming-java] Add explicit enum value NO_EXTERNAL_CHECKPOINTS as default for externalized-checkpoint-retention

Posted by ai...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

airblader pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f4db43a3a8d7147f3ebd1279addcb35fb2c5e38b
Author: Nicolaus Weidner <ni...@ververica.com>
AuthorDate: Wed Nov 24 09:17:34 2021 +0100

    [FLINK-24987][streaming-java] Add explicit enum value NO_EXTERNAL_CHECKPOINTS as default for externalized-checkpoint-retention
---
 .../execution_checkpointing_configuration.html     |  4 +-
 .../reader/CoordinatedSourceRescaleITCase.java     |  2 +-
 .../tests/DataStreamAllroundTestJobFactory.java    |  2 +-
 .../StickyAllocationAndLocalRecoveryTestJob.java   |  2 +-
 .../pyflink/datastream/checkpoint_config.py        | 57 +++++++++++++++++++--
 .../datastream/tests/test_check_point_config.py    |  3 +-
 .../api/environment/CheckpointConfig.java          | 58 ++++++++++++++++------
 .../environment/ExecutionCheckpointingOptions.java |  6 ++-
 .../CheckpointConfigFromConfigurationTest.java     |  2 +-
 .../test/checkpointing/RegionFailoverITCase.java   |  2 +-
 .../ResumeCheckpointManuallyITCase.java            |  2 +-
 .../UnalignedCheckpointCompatibilityITCase.java    |  2 +-
 .../UnalignedCheckpointStressITCase.java           |  2 +-
 .../checkpointing/UnalignedCheckpointTestBase.java |  2 +-
 14 files changed, 113 insertions(+), 33 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html b/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html
index 04e00bd..836b2e0 100644
--- a/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html
+++ b/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html
@@ -22,9 +22,9 @@
         </tr>
         <tr>
             <td><h5>execution.checkpointing.externalized-checkpoint-retention</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
+            <td style="word-wrap: break-word;">NO_EXTERNALIZED_CHECKPOINTS</td>
             <td><p>Enum</p></td>
-            <td>Externalized checkpoints write their meta data out to persistent storage and are not automatically cleaned up when the owning job fails or is suspended (terminating with job status <code class="highlighter-rouge">JobStatus#FAILED</code> or <code class="highlighter-rouge">JobStatus#SUSPENDED</code>. In this case, you have to manually clean up the checkpoint state, both the meta data and actual program state.<br /><br />The mode defines how an externalized checkpoint should [...]
+            <td>Externalized checkpoints write their meta data out to persistent storage and are not automatically cleaned up when the owning job fails or is suspended (terminating with job status <code class="highlighter-rouge">JobStatus#FAILED</code> or <code class="highlighter-rouge">JobStatus#SUSPENDED</code>). In this case, you have to manually clean up the checkpoint state, both the meta data and actual program state.<br /><br />The mode defines how an externalized checkpoint shoul [...]
         </tr>
         <tr>
             <td><h5>execution.checkpointing.interval</h5></td>
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java
index f9e0e27..0120fee 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java
@@ -118,7 +118,7 @@ public class CoordinatedSourceRescaleITCase extends TestLogger {
                 StreamExecutionEnvironment.createLocalEnvironment(p, conf);
         env.enableCheckpointing(100);
         env.getCheckpointConfig()
-                .enableExternalizedCheckpoints(
+                .setExternalizedCheckpointCleanup(
                         CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
         env.setRestartStrategy(RestartStrategies.noRestart());
 
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
index 83e2cde..e219c46 100644
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
@@ -295,7 +295,7 @@ public class DataStreamAllroundTestJobFactory {
                             "Unknown clean up mode for externalized checkpoints: "
                                     + cleanupModeConfig);
             }
-            env.getCheckpointConfig().enableExternalizedCheckpoints(cleanupMode);
+            env.getCheckpointConfig().setExternalizedCheckpointCleanup(cleanupMode);
 
             final int tolerableDeclinedCheckpointNumber =
                     pt.getInt(
diff --git a/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
index 62b6ff5..1a707e4 100644
--- a/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
+++ b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
@@ -92,7 +92,7 @@ public class StickyAllocationAndLocalRecoveryTestJob {
                         Integer.MAX_VALUE, pt.getInt("restartDelay", 0)));
         if (pt.getBoolean("externalizedCheckpoints", false)) {
             env.getCheckpointConfig()
-                    .enableExternalizedCheckpoints(
+                    .setExternalizedCheckpointCleanup(
                             CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
         }
 
diff --git a/flink-python/pyflink/datastream/checkpoint_config.py b/flink-python/pyflink/datastream/checkpoint_config.py
index 4119a91..f03ecf2 100644
--- a/flink-python/pyflink/datastream/checkpoint_config.py
+++ b/flink-python/pyflink/datastream/checkpoint_config.py
@@ -247,7 +247,9 @@ class CheckpointConfig(object):
             self,
             cleanup_mode: 'ExternalizedCheckpointCleanup') -> 'CheckpointConfig':
         """
-        Enables checkpoints to be persisted externally.
+        Sets the mode for externalized checkpoint clean-up. Externalized checkpoints will be enabled
+        automatically unless the mode is set to
+        :data:`ExternalizedCheckpointCleanup.NO_EXTERNALIZED_CHECKPOINTS`.
 
         Externalized checkpoints write their meta data out to persistent storage and are **not**
         automatically cleaned up when the owning job fails or is suspended (terminating with job
@@ -256,7 +258,7 @@ class CheckpointConfig(object):
 
         The :class:`ExternalizedCheckpointCleanup` mode defines how an externalized checkpoint
         should be cleaned up on job cancellation. If you choose to retain externalized checkpoints
-        on cancellation you have you handle checkpoint clean up manually when you cancel the job as
+        on cancellation you have to handle checkpoint clean-up manually when you cancel the job as
         well (terminating with job status ``CANCELED``).
 
         The target directory for externalized checkpoints is configured via
@@ -268,14 +270,53 @@ class CheckpointConfig(object):
             >>> config.enable_externalized_checkpoints(
             ...     ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
 
-        :param cleanup_mode: Externalized checkpoint cleanup behaviour, the mode could be
-                             :data:`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION` or
-                             :data:`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`
+        :param cleanup_mode: Externalized checkpoint clean-up behaviour, the mode could be
+                             :data:`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`,
+                             :data:`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION` or
+                             :data:`ExternalizedCheckpointCleanup.NO_EXTERNALIZED_CHECKPOINTS`
+
+        .. note:: Deprecated in 1.15. Use :func:`set_externalized_checkpoint_cleanup` instead.
         """
         self._j_checkpoint_config.enableExternalizedCheckpoints(
             ExternalizedCheckpointCleanup._to_j_externalized_checkpoint_cleanup(cleanup_mode))
         return self
 
+    def set_externalized_checkpoint_cleanup(
+            self,
+            cleanup_mode: 'ExternalizedCheckpointCleanup') -> 'CheckpointConfig':
+        """
+        Sets the mode for externalized checkpoint clean-up. Externalized checkpoints will be enabled
+        automatically unless the mode is set to
+        :data:`ExternalizedCheckpointCleanup.NO_EXTERNALIZED_CHECKPOINTS`.
+
+        Externalized checkpoints write their meta data out to persistent storage and are **not**
+        automatically cleaned up when the owning job fails or is suspended (terminating with job
+        status ``FAILED`` or ``SUSPENDED``). In this case, you have to manually clean up the
+        checkpoint state, both the meta data and actual program state.
+
+        The :class:`ExternalizedCheckpointCleanup` mode defines how an externalized checkpoint
+        should be cleaned up on job cancellation. If you choose to retain externalized checkpoints
+        on cancellation you have to handle checkpoint clean-up manually when you cancel the job as
+        well (terminating with job status ``CANCELED``).
+
+        The target directory for externalized checkpoints is configured via
+        ``org.apache.flink.configuration.CheckpointingOptions#CHECKPOINTS_DIRECTORY``.
+
+        Example:
+        ::
+
+            >>> config.set_externalized_checkpoint_cleanup(
+            ...     ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
+
+        :param cleanup_mode: Externalized checkpoint clean-up behaviour, the mode could be
+                             :data:`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`,
+                             :data:`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION` or
+                             :data:`ExternalizedCheckpointCleanup.NO_EXTERNALIZED_CHECKPOINTS`
+        """
+        self._j_checkpoint_config.setExternalizedCheckpointCleanup(
+            ExternalizedCheckpointCleanup._to_j_externalized_checkpoint_cleanup(cleanup_mode))
+        return self
+
     def is_externalized_checkpoints_enabled(self) -> bool:
         """
         Returns whether checkpoints should be persisted externally.
@@ -449,12 +490,18 @@ class ExternalizedCheckpointCleanup(Enum):
 
     Note that checkpoint state is always kept if the job terminates
     with state ``FAILED``.
+
+    :data:`NO_EXTERNALIZED_CHECKPOINTS`:
+
+    Externalized checkpoints are disabled completely.
     """
 
     DELETE_ON_CANCELLATION = 0
 
     RETAIN_ON_CANCELLATION = 1
 
+    NO_EXTERNALIZED_CHECKPOINTS = 2
+
     @staticmethod
     def _from_j_externalized_checkpoint_cleanup(j_cleanup_mode) \
             -> 'ExternalizedCheckpointCleanup':
diff --git a/flink-python/pyflink/datastream/tests/test_check_point_config.py b/flink-python/pyflink/datastream/tests/test_check_point_config.py
index 11420f1..b7eba60 100644
--- a/flink-python/pyflink/datastream/tests/test_check_point_config.py
+++ b/flink-python/pyflink/datastream/tests/test_check_point_config.py
@@ -121,7 +121,8 @@ class CheckpointConfigTests(PyFlinkTestCase):
 
         self.assertFalse(self.checkpoint_config.is_externalized_checkpoints_enabled())
 
-        self.assertIsNone(self.checkpoint_config.get_externalized_checkpoint_cleanup())
+        self.assertEqual(self.checkpoint_config.get_externalized_checkpoint_cleanup(),
+                         ExternalizedCheckpointCleanup.NO_EXTERNALIZED_CHECKPOINTS)
 
         self.checkpoint_config.enable_externalized_checkpoints(
             ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index f9d42b2..b2a2d8a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -107,7 +107,8 @@ public class CheckpointConfig implements java.io.Serializable {
     private boolean approximateLocalRecovery;
 
     /** Cleanup behaviour for persistent checkpoints. */
-    private ExternalizedCheckpointCleanup externalizedCheckpointCleanup;
+    private ExternalizedCheckpointCleanup externalizedCheckpointCleanup =
+            ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT.defaultValue();
 
     /**
      * Task would not fail if there is an error in their checkpointing.
@@ -428,7 +429,9 @@ public class CheckpointConfig implements java.io.Serializable {
     }
 
     /**
-     * Enables checkpoints to be persisted externally.
+     * Sets the mode for externalized checkpoint clean-up. Externalized checkpoints will be enabled
+     * automatically unless the mode is set to {@link
+     * ExternalizedCheckpointCleanup#NO_EXTERNALIZED_CHECKPOINTS}.
      *
      * <p>Externalized checkpoints write their meta data out to persistent storage and are
      * <strong>not</strong> automatically cleaned up when the owning job fails or is suspended
@@ -438,15 +441,44 @@ public class CheckpointConfig implements java.io.Serializable {
      *
      * <p>The {@link ExternalizedCheckpointCleanup} mode defines how an externalized checkpoint
      * should be cleaned up on job cancellation. If you choose to retain externalized checkpoints on
-     * cancellation you have you handle checkpoint clean up manually when you cancel the job as well
+     * cancellation you have to handle checkpoint clean-up manually when you cancel the job as well
      * (terminating with job status {@link JobStatus#CANCELED}).
      *
      * <p>The target directory for externalized checkpoints is configured via {@link
      * org.apache.flink.configuration.CheckpointingOptions#CHECKPOINTS_DIRECTORY}.
      *
-     * @param cleanupMode Externalized checkpoint cleanup behaviour.
+     * @param cleanupMode Externalized checkpoint clean-up behaviour.
      */
     @PublicEvolving
+    public void setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup cleanupMode) {
+        this.externalizedCheckpointCleanup = checkNotNull(cleanupMode);
+    }
+
+    /**
+     * Sets the mode for externalized checkpoint clean-up. Externalized checkpoints will be enabled
+     * automatically unless the mode is set to {@link
+     * ExternalizedCheckpointCleanup#NO_EXTERNALIZED_CHECKPOINTS}.
+     *
+     * <p>Externalized checkpoints write their meta data out to persistent storage and are
+     * <strong>not</strong> automatically cleaned up when the owning job fails or is suspended
+     * (terminating with job status {@link JobStatus#FAILED} or {@link JobStatus#SUSPENDED}). In
+     * this case, you have to manually clean up the checkpoint state, both the meta data and actual
+     * program state.
+     *
+     * <p>The {@link ExternalizedCheckpointCleanup} mode defines how an externalized checkpoint
+     * should be cleaned up on job cancellation. If you choose to retain externalized checkpoints on
+     * cancellation you have to handle checkpoint clean-up manually when you cancel the job as well
+     * (terminating with job status {@link JobStatus#CANCELED}).
+     *
+     * <p>The target directory for externalized checkpoints is configured via {@link
+     * org.apache.flink.configuration.CheckpointingOptions#CHECKPOINTS_DIRECTORY}.
+     *
+     * @param cleanupMode Externalized checkpoint clean-up behaviour.
+     * @deprecated use {@link #setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup)}
+     *     instead.
+     */
+    @PublicEvolving
+    @Deprecated
     public void enableExternalizedCheckpoints(ExternalizedCheckpointCleanup cleanupMode) {
         this.externalizedCheckpointCleanup = checkNotNull(cleanupMode);
     }
@@ -458,7 +490,8 @@ public class CheckpointConfig implements java.io.Serializable {
      */
     @PublicEvolving
     public boolean isExternalizedCheckpointsEnabled() {
-        return externalizedCheckpointCleanup != null;
+        return externalizedCheckpointCleanup
+                != ExternalizedCheckpointCleanup.NO_EXTERNALIZED_CHECKPOINTS;
     }
 
     /**
@@ -712,7 +745,7 @@ public class CheckpointConfig implements java.io.Serializable {
          * <p>Note that checkpoint state is always kept if the job terminates with state {@link
          * JobStatus#FAILED}.
          */
-        DELETE_ON_CANCELLATION(true),
+        DELETE_ON_CANCELLATION,
 
         /**
          * Retain externalized checkpoints on job cancellation.
@@ -723,13 +756,10 @@ public class CheckpointConfig implements java.io.Serializable {
          * <p>Note that checkpoint state is always kept if the job terminates with state {@link
          * JobStatus#FAILED}.
          */
-        RETAIN_ON_CANCELLATION(false);
+        RETAIN_ON_CANCELLATION,
 
-        private final boolean deleteOnCancellation;
-
-        ExternalizedCheckpointCleanup(boolean deleteOnCancellation) {
-            this.deleteOnCancellation = deleteOnCancellation;
-        }
+        /** Externalized checkpoints are disabled completely. */
+        NO_EXTERNALIZED_CHECKPOINTS;
 
         /**
          * Returns whether persistent checkpoints shall be discarded on cancellation of the job.
@@ -738,7 +768,7 @@ public class CheckpointConfig implements java.io.Serializable {
          *     the job.
          */
         public boolean deleteOnCancellation() {
-            return deleteOnCancellation;
+            return this == DELETE_ON_CANCELLATION;
         }
     }
 
@@ -772,7 +802,7 @@ public class CheckpointConfig implements java.io.Serializable {
                 .ifPresent(this::setTolerableCheckpointFailureNumber);
         configuration
                 .getOptional(ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT)
-                .ifPresent(this::enableExternalizedCheckpoints);
+                .ifPresent(this::setExternalizedCheckpointCleanup);
         configuration
                 .getOptional(ExecutionCheckpointingOptions.ENABLE_UNALIGNED)
                 .ifPresent(this::enableUnalignedCheckpoints);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java
index 7b83772..536788b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java
@@ -91,13 +91,15 @@ public class ExecutionCheckpointingOptions {
             EXTERNALIZED_CHECKPOINT =
                     ConfigOptions.key("execution.checkpointing.externalized-checkpoint-retention")
                             .enumType(CheckpointConfig.ExternalizedCheckpointCleanup.class)
-                            .noDefaultValue()
+                            .defaultValue(
+                                    CheckpointConfig.ExternalizedCheckpointCleanup
+                                            .NO_EXTERNALIZED_CHECKPOINTS)
                             .withDescription(
                                     Description.builder()
                                             .text(
                                                     "Externalized checkpoints write their meta data out to persistent storage and are not "
                                                             + "automatically cleaned up when the owning job fails or is suspended (terminating with job "
-                                                            + "status %s or %s. In this case, you have to manually clean up the checkpoint state, both the "
+                                                            + "status %s or %s). In this case, you have to manually clean up the checkpoint state, both the "
                                                             + "meta data and actual program state.",
                                                     TextElement.code("JobStatus#FAILED"),
                                                     TextElement.code("JobStatus#SUSPENDED"))
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java
index 6bb1d1d..fb8d59c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java
@@ -74,7 +74,7 @@ public class CheckpointConfigFromConfigurationTest {
                         .whenSetFromFile(
                                 "execution.checkpointing.externalized-checkpoint-retention",
                                 "RETAIN_ON_CANCELLATION")
-                        .viaSetter(CheckpointConfig::enableExternalizedCheckpoints)
+                        .viaSetter(CheckpointConfig::setExternalizedCheckpointCleanup)
                         .getterVia(CheckpointConfig::getExternalizedCheckpointCleanup)
                         .nonDefaultValue(
                                 CheckpointConfig.ExternalizedCheckpointCleanup
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
index 7a46dd1..0bf295b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
@@ -173,7 +173,7 @@ public class RegionFailoverITCase extends TestLogger {
         env.setMaxParallelism(MAX_PARALLELISM);
         env.enableCheckpointing(200, CheckpointingMode.EXACTLY_ONCE);
         env.getCheckpointConfig()
-                .enableExternalizedCheckpoints(
+                .setExternalizedCheckpointCleanup(
                         CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
         env.disableOperatorChaining();
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
index c324931..eeda1b0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
@@ -371,7 +371,7 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
         env.setStateBackend(backend);
         env.setParallelism(PARALLELISM);
         env.getCheckpointConfig()
-                .enableExternalizedCheckpoints(
+                .setExternalizedCheckpointCleanup(
                         CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
 
         env.addSource(new NotifyingInfiniteTupleSource(10_000))
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCompatibilityITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCompatibilityITCase.java
index 0b8d570..c116621 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCompatibilityITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointCompatibilityITCase.java
@@ -198,7 +198,7 @@ public class UnalignedCheckpointCompatibilityITCase extends TestLogger {
         env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration());
         env.getCheckpointConfig().enableUnalignedCheckpoints(!isAligned);
         env.getCheckpointConfig().setAlignmentTimeout(Duration.ZERO);
-        env.getCheckpointConfig().enableExternalizedCheckpoints(RETAIN_ON_CANCELLATION);
+        env.getCheckpointConfig().setExternalizedCheckpointCleanup(RETAIN_ON_CANCELLATION);
         if (checkpointingInterval > 0) {
             env.enableCheckpointing(checkpointingInterval);
         }
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase.java
index 06e80f5..91ddada 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase.java
@@ -307,7 +307,7 @@ public class UnalignedCheckpointStressITCase extends TestLogger {
         env.getCheckpointConfig().enableUnalignedCheckpoints();
         env.setRestartStrategy(RestartStrategies.noRestart());
         env.getCheckpointConfig()
-                .enableExternalizedCheckpoints(
+                .setExternalizedCheckpointCleanup(
                         ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
         return env;
     }
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
index 422fc83..bdfba39 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
@@ -743,7 +743,7 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger {
             env.getCheckpointConfig().setForceUnalignedCheckpoints(true);
             if (generateCheckpoint) {
                 env.getCheckpointConfig()
-                        .enableExternalizedCheckpoints(
+                        .setExternalizedCheckpointCleanup(
                                 CheckpointConfig.ExternalizedCheckpointCleanup
                                         .RETAIN_ON_CANCELLATION);
             }