You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2022/02/17 02:48:55 UTC

[flink] branch master updated: [FLINK-25490][checkpoint] Complete the Chinese document regarding final checkpoint

This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4682620  [FLINK-25490][checkpoint] Complete the Chinese document regarding final checkpoint
4682620 is described below

commit 46826201f5d69ea5903c9ec1ac7d3370c7212de0
Author: Yun Gao <ga...@gmail.com>
AuthorDate: Tue Feb 15 12:03:26 2022 +0800

    [FLINK-25490][checkpoint] Complete the Chinese document regarding final checkpoint
    
    This closes #18766.
---
 .../datastream/fault-tolerance/checkpointing.md    | 39 ++++++++++++++++++++++
 docs/content.zh/docs/internals/task_lifecycle.md   |  5 ++-
 .../datastream/fault-tolerance/checkpointing.md    |  6 ++--
 3 files changed, 46 insertions(+), 4 deletions(-)

diff --git a/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md b/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md
index 333e6d8..2ad6d90 100644
--- a/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md
+++ b/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md
@@ -195,5 +195,44 @@ Flink 现在为没有迭代(iterations)的作业提供一致性的处理保
 
 请注意在环形边上游走的记录(以及与之相关的状态变化)在故障时会丢失。
 
+## 部分任务结束后的 Checkpoint
+
+从版本 1.14 开始 Flink 支持在部分任务结束后继续进行Checkpoint。
+如果一部分数据源是有限数据集,那么就可以出现这种情况。
+从版本 1.15 开始,这一特性被默认打开。如果想要关闭这一功能,可以执行:
+
+```java
+Configuration config = new Configuration();
+config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, false);
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+```
+
+在这种情况下,结束的任务不会参与 Checkpoint 的过程。在实现自定义的算子或者 UDF (用户自定义函数)时需要考虑这一点。
+
+为了支持部分任务结束后的 Checkpoint 操作,我们调整了 [任务的生命周期]({{<ref "docs/internals/task_lifecycle" >}}) 并且引入了
+{{< javadoc file="org/apache/flink/streaming/api/operators/StreamOperator.html#finish--" name="StreamOperator#finish" >}} 方法。
+在这一方法中,用户需要写出所有缓冲区中的数据。在 finish 方法调用后的 checkpoint 中,这一任务一定不能再有缓冲区中的数据,因为在 `finish()` 后没有办法来输出这些数据。
+在大部分情况下,`finish()` 后这一任务的状态为空,唯一的例外是如果其中某些算子中包含外部系统事务的句柄(例如为了实现恰好一次语义),
+在这种情况下,在 `finish()` 后进行的 checkpoint 操作应该保留这些句柄,并且在结束 checkpoint(即任务退出前所等待的 checkpoint)时提交。
+一个可以参考的例子是满足恰好一次语义的 sink 接口与 `TwoPhaseCommitSinkFunction`。
+
+### 对 operator state 的影响
+
+在部分 Task 结束后的checkpoint中,Flink 对 `UnionListState` 进行了特殊的处理。
+`UnionListState` 一般用于实现对外部系统读取位置的一个全局视图(例如,用于记录所有 Kafka 分区的读取偏移)。
+如果我们在算子的某个并发调用 `close()` 方法后丢弃它的状态,我们就会丢失它所分配的分区的偏移量信息。
+为了解决这一问题,对于使用 `UnionListState` 的算子我们只允许在它的并发都在运行或都已结束的时候才能进行 checkpoint 操作。
+
+`ListState` 一般不会用于类似的场景,但是用户仍然需要注意在调用 `close()` 方法后进行的 checkpoint 会丢弃算子的状态并且
+这些状态在算子重启后不可用。
+
+任何支持并发修改操作的算子也可以支持部分并发实例结束后的恢复操作。从这种类型的快照中恢复等价于将算子的并发改为正在运行的并发实例数。
+
+### 任务结束前等待最后一次 Checkpoint
+
+为了保证使用两阶段提交的算子可以提交所有的数据,任务会在所有算子都调用 `finish()` 方法后等待下一次 checkpoint 成功后退出。
+需要注意的是,这一行为可能会延长任务运行的时间,如果 checkpoint 周期比较大,这一延迟会非常明显。
+极端情况下,如果 checkpoint 的周期被设置为 `Long.MAX_VALUE`,那么任务永远不会结束,因为下一次 checkpoint 不会进行。
+
 {{< top >}}
 
diff --git a/docs/content.zh/docs/internals/task_lifecycle.md b/docs/content.zh/docs/internals/task_lifecycle.md
index 6c96b91..24d8495 100644
--- a/docs/content.zh/docs/internals/task_lifecycle.md
+++ b/docs/content.zh/docs/internals/task_lifecycle.md
@@ -92,6 +92,7 @@ Task 在没有中断的情况下执行到结束的阶段如下所示:
 	   	    open-operators
 		    run
 		    finish-operators
+		    wait for the final checkponit completed (if enabled)
 		    close-operators
 		    task-specific-cleanup
 		    common-cleanup
@@ -117,7 +118,9 @@ task 里多个连续算子的开启是从后往前依次执行。
 
 现在 task 可以恢复执行,算子可以开始处理新输入的数据。在这里,特定 task 的 `run()` 方法会被调用。这个方法会一直运行直到没有更多输入数据进来(有限的数据流)或者 task 被取消了(人为的或其他的原因)。这里也是算子定义的 `processElement()` 方法和 `processWatermark()` 方法执行的地方。
 
-在运行到完成的情况下,即没有更多的输入数据要处理,从run()方法退出后,task 进入关闭阶段。首先定时器服务停止注册任何新的定时器(比如从正在执行的定时器里注册),清理掉所有还未启动的定时器,并等待当前执行中的定时器运行结束。然后通过调用 `finishAllOperators()` 方法调用每个算子的 `finish()` 方法来通知所有参与计算的算子。然后所有缓存的输出数据会刷出去以便下游 task 处理,最终 task 通过调用每个算子的 `close()` 方法来尝试清理掉算子持有的所有资源。与我们之前提到的开启算子不同是,开启时从后往前依次调用 `open()`;而关闭时刚好相反,从前往后依次调用 `close()`。
+在运行到完成的情况下,即没有更多的输入数据要处理,从run()方法退出后,task 进入关闭阶段。首先定时器服务停止注册任何新的定时器(比如从正在执行的定时器里注册),清理掉所有还未启动的定时器,并等待当前执行中的定时器运行结束。然后通过调用 `finishAllOperators()` 方法调用每个算子的 `finish()` 方法来通知所有参与计算的算子。然后所有缓存的输出数据会刷出去以便下游 task 处理。
+如果开启了部分任务结束后继续 checkpoint 的功能,任务将 [等待下一个 checkpoint 结束]({{< ref "docs/dev/datastream/fault-tolerance/checkpointing#任务结束前等待最后一次-checkpoint" >}}) 来保证使用两阶段提交的算子可能最终提交所有的记录。
+最终 task 通过调用每个算子的 `close()` 方法来尝试清理掉算子持有的所有资源。与我们之前提到的开启算子不同是,开启时从后往前依次调用 `open()`;而关闭时刚好相反,从前往后依次调用 `close()`。
 
 {{< hint info >}}
 
diff --git a/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md b/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md
index c98386a..f763882 100644
--- a/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md
+++ b/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md
@@ -260,16 +260,16 @@ exactly-once sinks and the `TwoPhaseCommitSinkFunction`.
 
 There is a special handling for `UnionListState`, which has often been used to implement a global
 view over offsets in an external system (i.e. storing current offsets of Kafka partitions). If we
-had discarded a state for a single subtask that had its `finish` method called, we would have lost
+had discarded a state for a single subtask that had its `close` method called, we would have lost
 the offsets for partitions that it had been assigned. In order to work around this problem, we let
 checkpoints succeed only if none or all subtasks that use `UnionListState` are finished.
 
 We have not seen `ListState` used in a similar way, but you should be aware that any state
-checkpointed after the `finish` method will be discarded and not be available after a restore.
+checkpointed after the `close` method will be discarded and not be available after a restore.
 
 Any operator that is prepared to be rescaled should work well with tasks that partially finish.
 Restoring from a checkpoint where only a subset of tasks finished is equivalent to restoring such a
-task with the number of new subtasks equal to the number of finished tasks.
+task with the number of new subtasks equal to the number of running tasks.
 
 ### Waiting for the final checkpoint before task exit