You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/06/29 14:12:01 UTC
[flink] 01/05: [FLINK-18064][python] Adding unaligned checkpoint
config options.
This is an automated email from the ASF dual-hosted git repository.
zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1e1c81b7089b07a4c0ab5c7881a6617b5b27ac57
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Thu Jun 18 09:51:12 2020 +0200
[FLINK-18064][python] Adding unaligned checkpoint config options.
---
.../pyflink/datastream/checkpoint_config.py | 39 ++++++++++++++++++++++
.../datastream/tests/test_check_point_config.py | 16 +++++++++
2 files changed, 55 insertions(+)
diff --git a/flink-python/pyflink/datastream/checkpoint_config.py b/flink-python/pyflink/datastream/checkpoint_config.py
index 3812c39..084939b 100644
--- a/flink-python/pyflink/datastream/checkpoint_config.py
+++ b/flink-python/pyflink/datastream/checkpoint_config.py
@@ -268,6 +268,45 @@ class CheckpointConfig(object):
cleanup_mode = self._j_checkpoint_config.getExternalizedCheckpointCleanup()
return ExternalizedCheckpointCleanup._from_j_externalized_checkpoint_cleanup(cleanup_mode)
+ def is_unaligned_checkpoints_enabled(self):
+ """
+ Returns whether unaligned checkpoints are enabled.
+
+ :return: ``True`` if unaligned checkpoints are enabled.
+ """
+ return self._j_checkpoint_config.isUnalignedCheckpointsEnabled()
+
+ def enable_unaligned_checkpoints(self, enabled=True):
+ """
+ Enables unaligned checkpoints, which greatly reduce checkpointing times under backpressure.
+
+ Unaligned checkpoints contain data stored in buffers as part of the checkpoint state, which
+ allows checkpoint barriers to overtake these buffers. Thus, the checkpoint duration becomes
+ independent of the current throughput as checkpoint barriers are effectively not embedded
+ into the stream of data anymore.
+
+ Unaligned checkpoints can only be enabled if :func:`get_checkpointing_mode` is
+ :data:`CheckpointingMode.EXACTLY_ONCE`.
+
+ :param enabled: ``True`` if a checkpoints should be taken in unaligned mode.
+ """
+ self._j_checkpoint_config.enableUnalignedCheckpoints(enabled)
+
+ def disable_unaligned_checkpoints(self):
+ """
+ Enables unaligned checkpoints, which greatly reduce checkpointing times under backpressure
+ (experimental).
+
+ Unaligned checkpoints contain data stored in buffers as part of the checkpoint state, which
+ allows checkpoint barriers to overtake these buffers. Thus, the checkpoint duration becomes
+ independent of the current throughput as checkpoint barriers are effectively not embedded
+ into the stream of data anymore.
+
+ Unaligned checkpoints can only be enabled if :func:`get_checkpointing_mode` is
+ :data:`CheckpointingMode.EXACTLY_ONCE`.
+ """
+ self.enable_unaligned_checkpoints(False)
+
class ExternalizedCheckpointCleanup(object):
"""
diff --git a/flink-python/pyflink/datastream/tests/test_check_point_config.py b/flink-python/pyflink/datastream/tests/test_check_point_config.py
index 4a10bb7..67b22d2 100644
--- a/flink-python/pyflink/datastream/tests/test_check_point_config.py
+++ b/flink-python/pyflink/datastream/tests/test_check_point_config.py
@@ -135,3 +135,19 @@ class CheckpointConfigTests(PyFlinkTestCase):
self.checkpoint_config.set_prefer_checkpoint_for_recovery(True)
self.assertTrue(self.checkpoint_config.is_prefer_checkpoint_for_recovery())
+
+ def test_is_unaligned_checkpointing_enabled(self):
+
+ self.assertFalse(self.checkpoint_config.is_unaligned_checkpoints_enabled())
+
+ self.checkpoint_config.enable_unaligned_checkpoints()
+
+ self.assertTrue(self.checkpoint_config.is_unaligned_checkpoints_enabled())
+
+ self.checkpoint_config.disable_unaligned_checkpoints()
+
+ self.assertFalse(self.checkpoint_config.is_unaligned_checkpoints_enabled())
+
+ self.checkpoint_config.enable_unaligned_checkpoints(True)
+
+ self.assertTrue(self.checkpoint_config.is_unaligned_checkpoints_enabled())