You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/06/29 14:12:01 UTC

[flink] 01/05: [FLINK-18064][python] Adding unaligned checkpoint config options.

This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1e1c81b7089b07a4c0ab5c7881a6617b5b27ac57
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Thu Jun 18 09:51:12 2020 +0200

    [FLINK-18064][python] Adding unaligned checkpoint config options.
---
 .../pyflink/datastream/checkpoint_config.py        | 39 ++++++++++++++++++++++
 .../datastream/tests/test_check_point_config.py    | 16 +++++++++
 2 files changed, 55 insertions(+)

diff --git a/flink-python/pyflink/datastream/checkpoint_config.py b/flink-python/pyflink/datastream/checkpoint_config.py
index 3812c39..084939b 100644
--- a/flink-python/pyflink/datastream/checkpoint_config.py
+++ b/flink-python/pyflink/datastream/checkpoint_config.py
@@ -268,6 +268,45 @@ class CheckpointConfig(object):
         cleanup_mode = self._j_checkpoint_config.getExternalizedCheckpointCleanup()
         return ExternalizedCheckpointCleanup._from_j_externalized_checkpoint_cleanup(cleanup_mode)
 
+    def is_unaligned_checkpoints_enabled(self):
+        """
+        Returns whether unaligned checkpoints are enabled.
+
+        :return: ``True`` if unaligned checkpoints are enabled.
+        """
+        return self._j_checkpoint_config.isUnalignedCheckpointsEnabled()
+
+    def enable_unaligned_checkpoints(self, enabled=True):
+        """
+        Enables unaligned checkpoints, which greatly reduce checkpointing times under backpressure.
+
+        Unaligned checkpoints contain data stored in buffers as part of the checkpoint state, which
+        allows checkpoint barriers to overtake these buffers. Thus, the checkpoint duration becomes
+        independent of the current throughput as checkpoint barriers are effectively not embedded
+        into the stream of data anymore.
+
+        Unaligned checkpoints can only be enabled if :func:`get_checkpointing_mode` is
+        :data:`CheckpointingMode.EXACTLY_ONCE`.
+
+        :param enabled: ``True`` if a checkpoints should be taken in unaligned mode.
+        """
+        self._j_checkpoint_config.enableUnalignedCheckpoints(enabled)
+
+    def disable_unaligned_checkpoints(self):
+        """
+        Enables unaligned checkpoints, which greatly reduce checkpointing times under backpressure
+        (experimental).
+
+        Unaligned checkpoints contain data stored in buffers as part of the checkpoint state, which
+        allows checkpoint barriers to overtake these buffers. Thus, the checkpoint duration becomes
+        independent of the current throughput as checkpoint barriers are effectively not embedded
+        into the stream of data anymore.
+
+        Unaligned checkpoints can only be enabled if :func:`get_checkpointing_mode` is
+        :data:`CheckpointingMode.EXACTLY_ONCE`.
+        """
+        self.enable_unaligned_checkpoints(False)
+
 
 class ExternalizedCheckpointCleanup(object):
     """
diff --git a/flink-python/pyflink/datastream/tests/test_check_point_config.py b/flink-python/pyflink/datastream/tests/test_check_point_config.py
index 4a10bb7..67b22d2 100644
--- a/flink-python/pyflink/datastream/tests/test_check_point_config.py
+++ b/flink-python/pyflink/datastream/tests/test_check_point_config.py
@@ -135,3 +135,19 @@ class CheckpointConfigTests(PyFlinkTestCase):
         self.checkpoint_config.set_prefer_checkpoint_for_recovery(True)
 
         self.assertTrue(self.checkpoint_config.is_prefer_checkpoint_for_recovery())
+
+    def test_is_unaligned_checkpointing_enabled(self):
+
+        self.assertFalse(self.checkpoint_config.is_unaligned_checkpoints_enabled())
+
+        self.checkpoint_config.enable_unaligned_checkpoints()
+
+        self.assertTrue(self.checkpoint_config.is_unaligned_checkpoints_enabled())
+
+        self.checkpoint_config.disable_unaligned_checkpoints()
+
+        self.assertFalse(self.checkpoint_config.is_unaligned_checkpoints_enabled())
+
+        self.checkpoint_config.enable_unaligned_checkpoints(True)
+
+        self.assertTrue(self.checkpoint_config.is_unaligned_checkpoints_enabled())