You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/06 08:05:02 UTC

[GitHub] [flink-web] gaoyunhaii opened a new pull request, #545: Add blogs for FLIP-147 support checkpoints after tasks finished

gaoyunhaii opened a new pull request, #545:
URL: https://github.com/apache/flink-web/pull/545

   This pr adds the blog for FLIP-147: support checkpoints after tasks finished


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-web] zhuzhurk commented on a diff in pull request #545: Add blogs for FLIP-147 support checkpoints after tasks finished

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #545:
URL: https://github.com/apache/flink-web/pull/545#discussion_r898882746


##########
_posts/2022-06-01-final-checkpoint-part2.md:
##########
@@ -0,0 +1,240 @@
+---
+layout: post 
+title:  "FLIP-147: Support Checkpoints After Tasks Finished - Part Two"
+date: 2022-06-01T00:00:00.000Z 
+authors:
+- Yun Gao:
+  name: "Yun Gao"
+- Dawid Wysakowicz:
+  name: "Dawid Wysakowicz"
+- Daisy Tsang:
+  name: "Daisy Tsang"
+excerpt: This post presents more details on the changes on the checkpoint procedure and task finish process made by the final checkpoint mechanism.
+
+---
+
+In the [first part]({{site.baseurl}}/2022/06/01/final-checkpoint-part1.html) of this blog,
+we have briefly introduced the work to support checkpoints after tasks get
+finished and revise the process of finishing. In this part we will present more details on the implementation,
+including how we support checkpoints with finished tasks and the revised protocol of the finish process.
+
+# Implementation of support Checkpointing with Finished Tasks
+
+As described in part one,
+to support checkpoints after some tasks are finished, the core idea is to mark
+the finished operators in checkpoints and skip executing these operators after recovery. To implement this idea,
+we enhanced the checkpointing procedure to generate the flag and use the flag on recovery. This section presents
+more details on the process of taking checkpoints with finished tasks and recovery from such checkpoints. 
+
+Previously, checkpointing only worked when all tasks were running. As shown in the Figure 1, in this case the
+checkpoint coordinator first notify all the source tasks, and then the source tasks further notify the
+downstream tasks to take snapshots via barrier events. Similarly, if there are finished tasks, we need to
+find the new "source" tasks to initiate the checkpoint, namely those tasks that are still running but have
+no running precedent tasks. CheckpointCoordinator does the computation atomically at the JobManager side
+based on the latest states recorded in the execution graph.
+
+It is also possible that tasks finished before they actually get triggered: when the checkpoint coordinator
+decides to trigger one task and starts emitting the RPC, it is possible that the task is just finished and
+reporting the FINISHED status to JobManager. In this case, the RPC message would fail and the checkpoint would be aborted.
+
+<center>
+<img vspace="20" style="width:50%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/checkpoint_trigger.png" />
+<p style="font-size: 0.6em">
+  Figure 1. The tasks chosen as the new sources when taking checkpoint with finished tasks. The principle is to
+  choose the running tasks whose precedent tasks are all finished. 
+</p>
+</center>
+
+In order to keep track of the finish status of each operator, we need to extend the checkpoint format.
+A checkpoint consists of the states of all the stateful operators, and the state of one operator consists of the
+entries from all its parallel instances. Note that the concept of Task is not reflected in the checkpoint. Task
+is more of a physical execution container that drives the behavior of operators. It is not well-defined across
+multiple executions of the same job since job upgrades might modify the operators contained in one task. 
+Therefore, the finished status should also be attached to the operators.
+
+As shown in the Figure 2, operators could be classified into three types according to their finished status:
+
+1. Fully finished: If all the instances of an operator are finished, we could view the logic of the operators as
+fully executed and we should skip the execution of the operator after recovery. We need to store a special flag for this
+kind of operator. 
+2. Partially finished: If only some instances of an operator are finished, then we still need to continue executing the
+remaining logic of this operator. As a whole we could view the state of the operator as the set of entries collected from all the
+running instances, which represents the remaining workload for this operator.
+3. No finished instances: In this case, the state of the operator is the same as the one taken when no tasks are finished.
+
+<center>
+<img vspace="20" style="width:50%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/checkpoint_format.png" />
+<p style="font-size: 0.6em">
+  Figure 2. An illustration of the extended checkpoint format.
+</p>
+</center>
+
+If the job is later restored from a checkpoint taken with finished tasks, we would skip executing all the logic for fully
+finished operators, and execute normally for the operators with no finished instances.
+
+However, this would be a bit complex for the partially finished operators. The state of partially finished operators would be
+redistributed to all the instances, similar to rescaling when the parallelism is changed. Among all the types of states that
+Flink offers, the [keyed state](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#using-keyed-state)
+and [operator state](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#using-operator-state)
+with even-split redistribution would work normally, but the 
+[broadcast state](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#broadcast-state) and 
+[operator state with union redistribution](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#using-operator-state)
+would be affected for the following reasons: 
+
+1. The broadcast state always replicates the state of the first subtask to the other subtasks. If the first subtask is finished,
+an empty state would be distributed and the operator would run from scratch, which is not correct.
+2. The operator state with union distribution merges the states of all the subtasks and then scatters the merged state to all the

Review Comment:
   `scatters` may be misleading, maybe `sends`?



##########
_posts/2022-06-01-final-checkpoint-part2.md:
##########
@@ -0,0 +1,240 @@
+---
+layout: post 
+title:  "FLIP-147: Support Checkpoints After Tasks Finished - Part Two"
+date: 2022-06-01T00:00:00.000Z 
+authors:
+- Yun Gao:
+  name: "Yun Gao"
+- Dawid Wysakowicz:
+  name: "Dawid Wysakowicz"
+- Daisy Tsang:
+  name: "Daisy Tsang"
+excerpt: This post presents more details on the changes on the checkpoint procedure and task finish process made by the final checkpoint mechanism.
+
+---
+
+In the [first part]({{site.baseurl}}/2022/06/01/final-checkpoint-part1.html) of this blog,
+we have briefly introduced the work to support checkpoints after tasks get
+finished and revise the process of finishing. In this part we will present more details on the implementation,
+including how we support checkpoints with finished tasks and the revised protocol of the finish process.
+
+# Implementation of support Checkpointing with Finished Tasks
+
+As described in part one,
+to support checkpoints after some tasks are finished, the core idea is to mark
+the finished operators in checkpoints and skip executing these operators after recovery. To implement this idea,
+we enhanced the checkpointing procedure to generate the flag and use the flag on recovery. This section presents
+more details on the process of taking checkpoints with finished tasks and recovery from such checkpoints. 
+
+Previously, checkpointing only worked when all tasks were running. As shown in the Figure 1, in this case the
+checkpoint coordinator first notify all the source tasks, and then the source tasks further notify the
+downstream tasks to take snapshots via barrier events. Similarly, if there are finished tasks, we need to
+find the new "source" tasks to initiate the checkpoint, namely those tasks that are still running but have
+no running precedent tasks. CheckpointCoordinator does the computation atomically at the JobManager side
+based on the latest states recorded in the execution graph.
+
+It is also possible that tasks finished before they actually get triggered: when the checkpoint coordinator
+decides to trigger one task and starts emitting the RPC, it is possible that the task is just finished and
+reporting the FINISHED status to JobManager. In this case, the RPC message would fail and the checkpoint would be aborted.
+
+<center>
+<img vspace="20" style="width:50%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/checkpoint_trigger.png" />
+<p style="font-size: 0.6em">
+  Figure 1. The tasks chosen as the new sources when taking checkpoint with finished tasks. The principle is to
+  choose the running tasks whose precedent tasks are all finished. 
+</p>
+</center>
+
+In order to keep track of the finish status of each operator, we need to extend the checkpoint format.
+A checkpoint consists of the states of all the stateful operators, and the state of one operator consists of the
+entries from all its parallel instances. Note that the concept of Task is not reflected in the checkpoint. Task
+is more of a physical execution container that drives the behavior of operators. It is not well-defined across
+multiple executions of the same job since job upgrades might modify the operators contained in one task. 
+Therefore, the finished status should also be attached to the operators.
+
+As shown in the Figure 2, operators could be classified into three types according to their finished status:
+
+1. Fully finished: If all the instances of an operator are finished, we could view the logic of the operators as
+fully executed and we should skip the execution of the operator after recovery. We need to store a special flag for this
+kind of operator. 
+2. Partially finished: If only some instances of an operator are finished, then we still need to continue executing the
+remaining logic of this operator. As a whole we could view the state of the operator as the set of entries collected from all the
+running instances, which represents the remaining workload for this operator.
+3. No finished instances: In this case, the state of the operator is the same as the one taken when no tasks are finished.
+
+<center>
+<img vspace="20" style="width:50%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/checkpoint_format.png" />
+<p style="font-size: 0.6em">
+  Figure 2. An illustration of the extended checkpoint format.
+</p>
+</center>
+
+If the job is later restored from a checkpoint taken with finished tasks, we would skip executing all the logic for fully
+finished operators, and execute normally for the operators with no finished instances.
+
+However, this would be a bit complex for the partially finished operators. The state of partially finished operators would be
+redistributed to all the instances, similar to rescaling when the parallelism is changed. Among all the types of states that
+Flink offers, the [keyed state](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#using-keyed-state)
+and [operator state](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#using-operator-state)
+with even-split redistribution would work normally, but the 
+[broadcast state](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#broadcast-state) and 
+[operator state with union redistribution](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#using-operator-state)
+would be affected for the following reasons: 
+
+1. The broadcast state always replicates the state of the first subtask to the other subtasks. If the first subtask is finished,
+an empty state would be distributed and the operator would run from scratch, which is not correct.
+2. The operator state with union distribution merges the states of all the subtasks and then scatters the merged state to all the
+subtasks. Based on this behavior, some operators may choose one subtask to store a shared value and after restarting this value will
+be distributed to all the subtasks. However, if this chosen task is finished, the state would be lost. 
+
+These two issues would not occur in when rescaling since there would be no finished tasks in that scenario. To address
+these issues, we chose one of the running subtasks instead to acquire the current state for the broadcast state. For the operator
+state with union redistribution, we have to collect the states of all the subtasks to maintain the semantics. Thus, currently we
+abort the checkpoint if parts of subtasks finished for operators using this kind of state. 
+
+In principle, you should be able to modify your job (which changes the dataflow graph) and restore from a previous checkpoint. That said,
+there are certain graph modifications that are not supported. These kinds of changes include adding a new operator before a fully finished
+one. Flink would check for such modifications and throw exceptions while restoring.
+
+# The Revised Process of Finishing
+
+As described in the part one, based on the ability to take checkpoints with finished tasks, we revised the process of finishing
+so that we could always commit all the data for two-phase-commit sinks. We’ll show the detailed protocol of the finished process in this
+section.
+
+## How did Jobs in Flink Finish Before?
+
+A job might finish in two ways: all sources finish or users execute
+[`stop-with-savepoint [--drain]`](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/cli/#stopping-a-job-gracefully-creating-a-final-savepoint).
+Let’s first have a look at the detailed process of finishing before. 
+
+### When sources finish
+
+If all the sources are bounded, a job would finish after all the sources finished. In this case, the sources would first

Review Comment:
   As mentioned in another comment, "a job would finish after all the sources finished" is theoretically not correct.



##########
_posts/2022-06-01-final-checkpoint-part2.md:
##########
@@ -0,0 +1,240 @@
+---
+layout: post 
+title:  "FLIP-147: Support Checkpoints After Tasks Finished - Part Two"
+date: 2022-06-01T00:00:00.000Z 
+authors:
+- Yun Gao:
+  name: "Yun Gao"
+- Dawid Wysakowicz:
+  name: "Dawid Wysakowicz"
+- Daisy Tsang:
+  name: "Daisy Tsang"
+excerpt: This post presents more details on the changes on the checkpoint procedure and task finish process made by the final checkpoint mechanism.
+
+---
+
+In the [first part]({{site.baseurl}}/2022/06/01/final-checkpoint-part1.html) of this blog,
+we have briefly introduced the work to support checkpoints after tasks get
+finished and revise the process of finishing. In this part we will present more details on the implementation,
+including how we support checkpoints with finished tasks and the revised protocol of the finish process.
+
+# Implementation of support Checkpointing with Finished Tasks
+
+As described in part one,
+to support checkpoints after some tasks are finished, the core idea is to mark
+the finished operators in checkpoints and skip executing these operators after recovery. To implement this idea,
+we enhanced the checkpointing procedure to generate the flag and use the flag on recovery. This section presents
+more details on the process of taking checkpoints with finished tasks and recovery from such checkpoints. 
+
+Previously, checkpointing only worked when all tasks were running. As shown in the Figure 1, in this case the
+checkpoint coordinator first notify all the source tasks, and then the source tasks further notify the
+downstream tasks to take snapshots via barrier events. Similarly, if there are finished tasks, we need to
+find the new "source" tasks to initiate the checkpoint, namely those tasks that are still running but have
+no running precedent tasks. CheckpointCoordinator does the computation atomically at the JobManager side
+based on the latest states recorded in the execution graph.
+
+It is also possible that tasks finished before they actually get triggered: when the checkpoint coordinator
+decides to trigger one task and starts emitting the RPC, it is possible that the task is just finished and
+reporting the FINISHED status to JobManager. In this case, the RPC message would fail and the checkpoint would be aborted.
+
+<center>
+<img vspace="20" style="width:50%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/checkpoint_trigger.png" />
+<p style="font-size: 0.6em">
+  Figure 1. The tasks chosen as the new sources when taking checkpoint with finished tasks. The principle is to
+  choose the running tasks whose precedent tasks are all finished. 
+</p>
+</center>
+
+In order to keep track of the finish status of each operator, we need to extend the checkpoint format.
+A checkpoint consists of the states of all the stateful operators, and the state of one operator consists of the
+entries from all its parallel instances. Note that the concept of Task is not reflected in the checkpoint. Task
+is more of a physical execution container that drives the behavior of operators. It is not well-defined across
+multiple executions of the same job since job upgrades might modify the operators contained in one task. 
+Therefore, the finished status should also be attached to the operators.
+
+As shown in the Figure 2, operators could be classified into three types according to their finished status:
+
+1. Fully finished: If all the instances of an operator are finished, we could view the logic of the operators as
+fully executed and we should skip the execution of the operator after recovery. We need to store a special flag for this
+kind of operator. 
+2. Partially finished: If only some instances of an operator are finished, then we still need to continue executing the
+remaining logic of this operator. As a whole we could view the state of the operator as the set of entries collected from all the
+running instances, which represents the remaining workload for this operator.
+3. No finished instances: In this case, the state of the operator is the same as the one taken when no tasks are finished.
+
+<center>
+<img vspace="20" style="width:50%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/checkpoint_format.png" />
+<p style="font-size: 0.6em">
+  Figure 2. An illustration of the extended checkpoint format.
+</p>
+</center>
+
+If the job is later restored from a checkpoint taken with finished tasks, we would skip executing all the logic for fully
+finished operators, and execute normally for the operators with no finished instances.
+
+However, this would be a bit complex for the partially finished operators. The state of partially finished operators would be
+redistributed to all the instances, similar to rescaling when the parallelism is changed. Among all the types of states that
+Flink offers, the [keyed state](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#using-keyed-state)
+and [operator state](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#using-operator-state)
+with even-split redistribution would work normally, but the 
+[broadcast state](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#broadcast-state) and 
+[operator state with union redistribution](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#using-operator-state)
+would be affected for the following reasons: 
+
+1. The broadcast state always replicates the state of the first subtask to the other subtasks. If the first subtask is finished,
+an empty state would be distributed and the operator would run from scratch, which is not correct.
+2. The operator state with union distribution merges the states of all the subtasks and then scatters the merged state to all the
+subtasks. Based on this behavior, some operators may choose one subtask to store a shared value and after restarting this value will
+be distributed to all the subtasks. However, if this chosen task is finished, the state would be lost. 
+
+These two issues would not occur in when rescaling since there would be no finished tasks in that scenario. To address
+these issues, we chose one of the running subtasks instead to acquire the current state for the broadcast state. For the operator
+state with union redistribution, we have to collect the states of all the subtasks to maintain the semantics. Thus, currently we
+abort the checkpoint if parts of subtasks finished for operators using this kind of state. 
+
+In principle, you should be able to modify your job (which changes the dataflow graph) and restore from a previous checkpoint. That said,
+there are certain graph modifications that are not supported. These kinds of changes include adding a new operator before a fully finished
+one. Flink would check for such modifications and throw exceptions while restoring.
+
+# The Revised Process of Finishing
+
+As described in the part one, based on the ability to take checkpoints with finished tasks, we revised the process of finishing
+so that we could always commit all the data for two-phase-commit sinks. We’ll show the detailed protocol of the finished process in this
+section.
+
+## How did Jobs in Flink Finish Before?
+
+A job might finish in two ways: all sources finish or users execute
+[`stop-with-savepoint [--drain]`](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/cli/#stopping-a-job-gracefully-creating-a-final-savepoint).
+Let’s first have a look at the detailed process of finishing before. 
+
+### When sources finish
+
+If all the sources are bounded, a job would finish after all the sources finished. In this case, the sources would first
+emit a `MAX_WATERMARK` (`Long.MAX_VALUE`) and then start to terminate the task. On termination, a task would call `endOfInput()`,
+`close()` and `dispose()` for all the operators, then emit an `EndOfPartitionEvent` to the downstream tasks. The intermediate tasks
+would start terminating after receiving an `EndOfPartitionEvent` from all the input channels, and this process will continue
+until the last task is finished. 
+
+```
+1. Source operators emit MAX_WATERMARK
+2. On received MAX_WATERMARK for non-source operators
+    a. Trigger all the event-time timers
+    b. Emit MAX_WATERMARK
+3. Source tasks finished
+    a. endInput(inputId) for all the operators
+    b. close() for all the operators
+    c. dispose() for all the operators
+    d. Emit EndOfPartitionEvent
+    e. Task cleanup
+4. On received EndOfPartitionEvent for non-source tasks
+    a. endInput(int inputId) for all the operators
+    b. close() for all the operators
+    c. dispose() for all the operators
+    d. Emit EndOfPartitionEvent
+    e. Task cleanup
+```
+
+### When users execute stop-with-savepoint [--drain]
+
+Users could execute the command stop-with-savepoint [--drain] for both bounded and unbounded jobs to trigger jobs to finish.
+In this case, Flink first triggers a synchronous savepoint and all the tasks would stall after seeing the synchronous
+savepoint. If the savepoint succeeds, all the source operators would finish actively and the job would finish the same as the above scenario. 
+
+```
+1. Trigger a savepoint
+2. Sources received savepoint trigger RPC
+    a. If with –-drain
+        i. source operators emit MAX_WATERMARK
+    b. Source emits savepoint barrier
+3. On received MAX_WATERMARK for non-source operators
+    a. Trigger all the event times
+    b. Emit MAX_WATERMARK
+4. On received savepoint barrier for non-source operators
+    a. The task blocks till the savepoint succeed
+5. Finish the source tasks actively
+    a. If with –-drain
+        ii. endInput(inputId) for all the operators
+    b. close() for all the operators
+    c. dispose() for all the operators
+    d. Emit EndOfPartitionEvent
+    e. Task cleanup
+6. On received EndOfPartitionEvent for non-source tasks
+    a. If with –-drain
+        i. endInput(int inputId) for all the operators
+    b. close() for all the operators
+    c. dispose() for all the operators
+    d. Emit EndOfPartitionEvent
+    e. Task cleanup
+```
+
+A parameter `–-drain` is supported with `stop-with-savepoint`: if not specified, the job is expected to resume from this savepoint,
+otherwise the job is expected to terminate permanently. Thus we only emit `MAX_WATERMARK` to trigger all the event timers and call
+`endInput()` for the later case. 
+
+## Revise the Finishing Steps
+
+As described in part one, after revising the process of finishing, we have decoupled the process of "finishing operator logic"
+and "finishing task" by introducing a new `EndOfData` event. After the revision each task will first
+notify the descendants with an `EndOfData` event after executing all the logic
+so that the descendants also have chances to finish executing the operator logic, then
+all the tasks could wait for the next checkpoint or the specified savepoint concurrently to commit all the remaining data. 
+This section will present the detailed protocol of the revised process. Since we have renamed
+`close()` /`dispose()` to `close()` / `finish()`, we’ll stick to the new terminologies in the following description.
+
+The revised process of finishing is shown as follows:
+
+```
+1. Source tasks finished due to no more records or stop-with-savepoint. 
+    a. if no more records or stop-with-savepoint –-drain
+        i. source operators emit MAX_WATERMARK
+        ii. endInput(inputId) for all the operators
+        iii. finish() for all the operators
+    b. emit EndOfData[isDrain = true] event
+    c. Wait for the next checkpoint / the savepoint after operator finished complete
+    d. close() for all the operators
+    e. Emit EndOfPartitionEvent
+    f. Task cleanup
+2. On received MAX_WATERMARK for non-source operators
+    a. Trigger all the event times
+    b. Emit MAX_WATERMARK
+3. On received EndOfData for non-source tasks
+    a. If isDrain
+        i. endInput(int inputId) for all the operators
+        ii. finish() for all the operators
+    b. Emit EndOfData[isDrain = the flag value of the received event]
+4. On received EndOfPartitionEvent for non-source tasks
+    a. Wait for the next checkpoint / the savepoint after operator finished complete
+    b. close() for all the operators
+    c. Emit EndOfPartitionEvent
+    d. Task cleanup
+```
+
+<center>
+<img vspace="20" style="width:60%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/example_job_finish.png" />
+<p style="font-size: 0.6em">
+  Figure 3. An example job of the revised process of finishing.
+</p>
+</center>
+
+An example of the process of job finishing is shown in Figure 3. 
+
+If Task `C` finishes after processing all the records, it first emits the max-watermark, then finishes the operators and emits
+the `EndOfData` event. After that, it waits for the next checkpoint to complete and then emits the `EndOfPartitionEvent`. 
+
+Task `D` finishes all the operators right after receiving the `EndOfData` event. Since any checkpoints taken after operators finish
+can commit all the pending records and be the final checkpoint, Task `D`’s final checkpoint would be the same as Task `C`’s since
+the barrier must be emitted after the `EndOfData` event. 
+
+Task `E` is a bit different in that it has two inputs. Task `A` might continue to run for a while and, thus, Task `E` needs to wait
+until it receives an `EndOfData` event also from the other input before finishing operators and its final checkpoint might be different. 
+
+On the other hand, when using `stop-with-savepoint`, the process is similar except that all the tasks need to wait for the exact
+savepoint before finishing instead of just any checkpoints. Moreover, since both Task `C` and Task `A` would finish at the same time,
+Task `E` would also be able to wait for this particular savepoint before finishing. 
+
+# Conclusion
+
+In this part we have presented more details of how the checkpoint are taken with finished tasks and the revised process

Review Comment:
   checkpoint -> checkpoints



##########
_posts/2022-06-01-final-checkpoint-part2.md:
##########
@@ -0,0 +1,240 @@
+---
+layout: post 
+title:  "FLIP-147: Support Checkpoints After Tasks Finished - Part Two"
+date: 2022-06-01T00:00:00.000Z 
+authors:
+- Yun Gao:
+  name: "Yun Gao"
+- Dawid Wysakowicz:
+  name: "Dawid Wysakowicz"
+- Daisy Tsang:
+  name: "Daisy Tsang"
+excerpt: This post presents more details on the changes on the checkpoint procedure and task finish process made by the final checkpoint mechanism.
+
+---
+
+In the [first part]({{site.baseurl}}/2022/06/01/final-checkpoint-part1.html) of this blog,
+we have briefly introduced the work to support checkpoints after tasks get
+finished and revise the process of finishing. In this part we will present more details on the implementation,
+including how we support checkpoints with finished tasks and the revised protocol of the finish process.
+
+# Implementation of support Checkpointing with Finished Tasks
+
+As described in part one,
+to support checkpoints after some tasks are finished, the core idea is to mark
+the finished operators in checkpoints and skip executing these operators after recovery. To implement this idea,
+we enhanced the checkpointing procedure to generate the flag and use the flag on recovery. This section presents
+more details on the process of taking checkpoints with finished tasks and recovery from such checkpoints. 
+
+Previously, checkpointing only worked when all tasks were running. As shown in the Figure 1, in this case the
+checkpoint coordinator first notify all the source tasks, and then the source tasks further notify the
+downstream tasks to take snapshots via barrier events. Similarly, if there are finished tasks, we need to
+find the new "source" tasks to initiate the checkpoint, namely those tasks that are still running but have
+no running precedent tasks. CheckpointCoordinator does the computation atomically at the JobManager side
+based on the latest states recorded in the execution graph.
+
+It is also possible that tasks finished before they actually get triggered: when the checkpoint coordinator

Review Comment:
   maybe "finished before they actually get triggered" to "race conditions can happen", otherwise the statement is a bit duplicated.



##########
_posts/2022-06-01-final-checkpoint-part2.md:
##########
@@ -0,0 +1,240 @@
+---
+layout: post 
+title:  "FLIP-147: Support Checkpoints After Tasks Finished - Part Two"
+date: 2022-06-01T00:00:00.000Z 
+authors:
+- Yun Gao:
+  name: "Yun Gao"
+- Dawid Wysakowicz:
+  name: "Dawid Wysakowicz"
+- Daisy Tsang:
+  name: "Daisy Tsang"
+excerpt: This post presents more details on the changes on the checkpoint procedure and task finish process made by the final checkpoint mechanism.
+
+---
+
+In the [first part]({{site.baseurl}}/2022/06/01/final-checkpoint-part1.html) of this blog,
+we have briefly introduced the work to support checkpoints after tasks get
+finished and revise the process of finishing. In this part we will present more details on the implementation,
+including how we support checkpoints with finished tasks and the revised protocol of the finish process.
+
+# Implementation of support Checkpointing with Finished Tasks
+
+As described in part one,
+to support checkpoints after some tasks are finished, the core idea is to mark
+the finished operators in checkpoints and skip executing these operators after recovery. To implement this idea,
+we enhanced the checkpointing procedure to generate the flag and use the flag on recovery. This section presents
+more details on the process of taking checkpoints with finished tasks and recovery from such checkpoints. 
+
+Previously, checkpointing only worked when all tasks were running. As shown in the Figure 1, in this case the
+checkpoint coordinator first notify all the source tasks, and then the source tasks further notify the
+downstream tasks to take snapshots via barrier events. Similarly, if there are finished tasks, we need to
+find the new "source" tasks to initiate the checkpoint, namely those tasks that are still running but have
+no running precedent tasks. CheckpointCoordinator does the computation atomically at the JobManager side
+based on the latest states recorded in the execution graph.
+
+It is also possible that tasks finished before they actually get triggered: when the checkpoint coordinator
+decides to trigger one task and starts emitting the RPC, it is possible that the task is just finished and
+reporting the FINISHED status to JobManager. In this case, the RPC message would fail and the checkpoint would be aborted.
+
+<center>
+<img vspace="20" style="width:50%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/checkpoint_trigger.png" />
+<p style="font-size: 0.6em">
+  Figure 1. The tasks chosen as the new sources when taking checkpoint with finished tasks. The principle is to
+  choose the running tasks whose precedent tasks are all finished. 
+</p>
+</center>
+
+In order to keep track of the finish status of each operator, we need to extend the checkpoint format.
+A checkpoint consists of the states of all the stateful operators, and the state of one operator consists of the
+entries from all its parallel instances. Note that the concept of Task is not reflected in the checkpoint. Task
+is more of a physical execution container that drives the behavior of operators. It is not well-defined across
+multiple executions of the same job since job upgrades might modify the operators contained in one task. 
+Therefore, the finished status should also be attached to the operators.
+
+As shown in the Figure 2, operators could be classified into three types according to their finished status:
+
+1. Fully finished: If all the instances of an operator are finished, we could view the logic of the operators as
+fully executed and we should skip the execution of the operator after recovery. We need to store a special flag for this
+kind of operator. 
+2. Partially finished: If only some instances of an operator are finished, then we still need to continue executing the
+remaining logic of this operator. As a whole we could view the state of the operator as the set of entries collected from all the
+running instances, which represents the remaining workload for this operator.
+3. No finished instances: In this case, the state of the operator is the same as the one taken when no tasks are finished.
+
+<center>
+<img vspace="20" style="width:50%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/checkpoint_format.png" />
+<p style="font-size: 0.6em">
+  Figure 2. An illustration of the extended checkpoint format.
+</p>
+</center>
+
+If the job is later restored from a checkpoint taken with finished tasks, we would skip executing all the logic for fully
+finished operators, and execute normally for the operators with no finished instances.
+
+However, this would be a bit complex for the partially finished operators. The state of partially finished operators would be
+redistributed to all the instances, similar to rescaling when the parallelism is changed. Among all the types of states that
+Flink offers, the [keyed state](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#using-keyed-state)
+and [operator state](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#using-operator-state)
+with even-split redistribution would work normally, but the 
+[broadcast state](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#broadcast-state) and 
+[operator state with union redistribution](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#using-operator-state)
+would be affected for the following reasons: 
+
+1. The broadcast state always replicates the state of the first subtask to the other subtasks. If the first subtask is finished,
+an empty state would be distributed and the operator would run from scratch, which is not correct.
+2. The operator state with union distribution merges the states of all the subtasks and then scatters the merged state to all the
+subtasks. Based on this behavior, some operators may choose one subtask to store a shared value and after restarting this value will
+be distributed to all the subtasks. However, if this chosen task is finished, the state would be lost. 
+
+These two issues would not occur in when rescaling since there would be no finished tasks in that scenario. To address
+these issues, we chose one of the running subtasks instead to acquire the current state for the broadcast state. For the operator
+state with union redistribution, we have to collect the states of all the subtasks to maintain the semantics. Thus, currently we
+abort the checkpoint if parts of subtasks finished for operators using this kind of state. 
+
+In principle, you should be able to modify your job (which changes the dataflow graph) and restore from a previous checkpoint. That said,
+there are certain graph modifications that are not supported. These kinds of changes include adding a new operator before a fully finished
+one. Flink would check for such modifications and throw exceptions while restoring.
+
+# The Revised Process of Finishing
+
+As described in the part one, based on the ability to take checkpoints with finished tasks, we revised the process of finishing
+so that we could always commit all the data for two-phase-commit sinks. We’ll show the detailed protocol of the finished process in this
+section.
+
+## How did Jobs in Flink Finish Before?
+
+A job might finish in two ways: all sources finish or users execute
+[`stop-with-savepoint [--drain]`](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/cli/#stopping-a-job-gracefully-creating-a-final-savepoint).
+Let’s first have a look at the detailed process of finishing before. 
+
+### When sources finish
+
+If all the sources are bounded, a job would finish after all the sources finished. In this case, the sources would first
+emit a `MAX_WATERMARK` (`Long.MAX_VALUE`) and then start to terminate the task. On termination, a task would call `endOfInput()`,
+`close()` and `dispose()` for all the operators, then emit an `EndOfPartitionEvent` to the downstream tasks. The intermediate tasks
+would start terminating after receiving an `EndOfPartitionEvent` from all the input channels, and this process will continue
+until the last task is finished. 
+
+```
+1. Source operators emit MAX_WATERMARK
+2. On received MAX_WATERMARK for non-source operators
+    a. Trigger all the event-time timers
+    b. Emit MAX_WATERMARK
+3. Source tasks finished
+    a. endInput(inputId) for all the operators
+    b. close() for all the operators
+    c. dispose() for all the operators
+    d. Emit EndOfPartitionEvent
+    e. Task cleanup
+4. On received EndOfPartitionEvent for non-source tasks
+    a. endInput(int inputId) for all the operators
+    b. close() for all the operators
+    c. dispose() for all the operators
+    d. Emit EndOfPartitionEvent
+    e. Task cleanup
+```
+
+### When users execute stop-with-savepoint [--drain]
+
+Users could execute the command stop-with-savepoint [--drain] for both bounded and unbounded jobs to trigger jobs to finish.
+In this case, Flink first triggers a synchronous savepoint and all the tasks would stall after seeing the synchronous
+savepoint. If the savepoint succeeds, all the source operators would finish actively and the job would finish the same as the above scenario. 
+
+```
+1. Trigger a savepoint
+2. Sources received savepoint trigger RPC
+    a. If with –-drain
+        i. source operators emit MAX_WATERMARK
+    b. Source emits savepoint barrier
+3. On received MAX_WATERMARK for non-source operators
+    a. Trigger all the event times
+    b. Emit MAX_WATERMARK
+4. On received savepoint barrier for non-source operators
+    a. The task blocks till the savepoint succeed
+5. Finish the source tasks actively
+    a. If with –-drain
+        ii. endInput(inputId) for all the operators
+    b. close() for all the operators
+    c. dispose() for all the operators
+    d. Emit EndOfPartitionEvent
+    e. Task cleanup
+6. On received EndOfPartitionEvent for non-source tasks
+    a. If with –-drain
+        i. endInput(int inputId) for all the operators
+    b. close() for all the operators
+    c. dispose() for all the operators
+    d. Emit EndOfPartitionEvent
+    e. Task cleanup
+```
+
+A parameter `–-drain` is supported with `stop-with-savepoint`: if not specified, the job is expected to resume from this savepoint,
+otherwise the job is expected to terminate permanently. Thus we only emit `MAX_WATERMARK` to trigger all the event timers and call
+`endInput()` for the later case. 

Review Comment:
   for -> in



##########
_posts/2022-06-01-final-checkpoint-part2.md:
##########
@@ -0,0 +1,240 @@
+---
+layout: post 
+title:  "FLIP-147: Support Checkpoints After Tasks Finished - Part Two"
+date: 2022-06-01T00:00:00.000Z 
+authors:
+- Yun Gao:
+  name: "Yun Gao"
+- Dawid Wysakowicz:
+  name: "Dawid Wysakowicz"
+- Daisy Tsang:
+  name: "Daisy Tsang"
+excerpt: This post presents more details on the changes on the checkpoint procedure and task finish process made by the final checkpoint mechanism.
+
+---
+
+In the [first part]({{site.baseurl}}/2022/06/01/final-checkpoint-part1.html) of this blog,
+we have briefly introduced the work to support checkpoints after tasks get
+finished and revise the process of finishing. In this part we will present more details on the implementation,
+including how we support checkpoints with finished tasks and the revised protocol of the finish process.
+
+# Implementation of support Checkpointing with Finished Tasks
+
+As described in part one,
+to support checkpoints after some tasks are finished, the core idea is to mark
+the finished operators in checkpoints and skip executing these operators after recovery. To implement this idea,
+we enhanced the checkpointing procedure to generate the flag and use the flag on recovery. This section presents
+more details on the process of taking checkpoints with finished tasks and recovery from such checkpoints. 
+
+Previously, checkpointing only worked when all tasks were running. As shown in the Figure 1, in this case the
+checkpoint coordinator first notify all the source tasks, and then the source tasks further notify the
+downstream tasks to take snapshots via barrier events. Similarly, if there are finished tasks, we need to
+find the new "source" tasks to initiate the checkpoint, namely those tasks that are still running but have
+no running precedent tasks. CheckpointCoordinator does the computation atomically at the JobManager side
+based on the latest states recorded in the execution graph.
+
+It is also possible that tasks finished before they actually get triggered: when the checkpoint coordinator
+decides to trigger one task and starts emitting the RPC, it is possible that the task is just finished and
+reporting the FINISHED status to JobManager. In this case, the RPC message would fail and the checkpoint would be aborted.
+
+<center>
+<img vspace="20" style="width:50%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/checkpoint_trigger.png" />
+<p style="font-size: 0.6em">
+  Figure 1. The tasks chosen as the new sources when taking checkpoint with finished tasks. The principle is to
+  choose the running tasks whose precedent tasks are all finished. 
+</p>
+</center>
+
+In order to keep track of the finish status of each operator, we need to extend the checkpoint format.
+A checkpoint consists of the states of all the stateful operators, and the state of one operator consists of the
+entries from all its parallel instances. Note that the concept of Task is not reflected in the checkpoint. Task
+is more of a physical execution container that drives the behavior of operators. It is not well-defined across
+multiple executions of the same job since job upgrades might modify the operators contained in one task. 
+Therefore, the finished status should also be attached to the operators.
+
+As shown in the Figure 2, operators could be classified into three types according to their finished status:
+
+1. Fully finished: If all the instances of an operator are finished, we could view the logic of the operators as
+fully executed and we should skip the execution of the operator after recovery. We need to store a special flag for this
+kind of operator. 
+2. Partially finished: If only some instances of an operator are finished, then we still need to continue executing the
+remaining logic of this operator. As a whole we could view the state of the operator as the set of entries collected from all the
+running instances, which represents the remaining workload for this operator.
+3. No finished instances: In this case, the state of the operator is the same as the one taken when no tasks are finished.
+
+<center>
+<img vspace="20" style="width:50%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/checkpoint_format.png" />
+<p style="font-size: 0.6em">
+  Figure 2. An illustration of the extended checkpoint format.
+</p>
+</center>
+
+If the job is later restored from a checkpoint taken with finished tasks, we would skip executing all the logic for fully
+finished operators, and execute normally for the operators with no finished instances.
+
+However, this would be a bit complex for the partially finished operators. The state of partially finished operators would be
+redistributed to all the instances, similar to rescaling when the parallelism is changed. Among all the types of states that
+Flink offers, the [keyed state](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#using-keyed-state)
+and [operator state](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#using-operator-state)
+with even-split redistribution would work normally, but the 
+[broadcast state](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#broadcast-state) and 
+[operator state with union redistribution](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#using-operator-state)
+would be affected for the following reasons: 
+
+1. The broadcast state always replicates the state of the first subtask to the other subtasks. If the first subtask is finished,
+an empty state would be distributed and the operator would run from scratch, which is not correct.
+2. The operator state with union distribution merges the states of all the subtasks and then scatters the merged state to all the
+subtasks. Based on this behavior, some operators may choose one subtask to store a shared value and after restarting this value will
+be distributed to all the subtasks. However, if this chosen task is finished, the state would be lost. 
+
+These two issues would not occur in when rescaling since there would be no finished tasks in that scenario. To address
+these issues, we chose one of the running subtasks instead to acquire the current state for the broadcast state. For the operator
+state with union redistribution, we have to collect the states of all the subtasks to maintain the semantics. Thus, currently we
+abort the checkpoint if parts of subtasks finished for operators using this kind of state. 
+
+In principle, you should be able to modify your job (which changes the dataflow graph) and restore from a previous checkpoint. That said,
+there are certain graph modifications that are not supported. These kinds of changes include adding a new operator before a fully finished
+one. Flink would check for such modifications and throw exceptions while restoring.
+
+# The Revised Process of Finishing
+
+As described in the part one, based on the ability to take checkpoints with finished tasks, we revised the process of finishing
+so that we could always commit all the data for two-phase-commit sinks. We’ll show the detailed protocol of the finished process in this
+section.
+
+## How did Jobs in Flink Finish Before?
+
+A job might finish in two ways: all sources finish or users execute
+[`stop-with-savepoint [--drain]`](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/cli/#stopping-a-job-gracefully-creating-a-final-savepoint).
+Let’s first have a look at the detailed process of finishing before. 
+
+### When sources finish
+
+If all the sources are bounded, a job would finish after all the sources finished. In this case, the sources would first
+emit a `MAX_WATERMARK` (`Long.MAX_VALUE`) and then start to terminate the task. On termination, a task would call `endOfInput()`,
+`close()` and `dispose()` for all the operators, then emit an `EndOfPartitionEvent` to the downstream tasks. The intermediate tasks
+would start terminating after receiving an `EndOfPartitionEvent` from all the input channels, and this process will continue
+until the last task is finished. 
+
+```
+1. Source operators emit MAX_WATERMARK
+2. On received MAX_WATERMARK for non-source operators
+    a. Trigger all the event-time timers
+    b. Emit MAX_WATERMARK
+3. Source tasks finished
+    a. endInput(inputId) for all the operators
+    b. close() for all the operators
+    c. dispose() for all the operators
+    d. Emit EndOfPartitionEvent
+    e. Task cleanup
+4. On received EndOfPartitionEvent for non-source tasks
+    a. endInput(int inputId) for all the operators
+    b. close() for all the operators
+    c. dispose() for all the operators
+    d. Emit EndOfPartitionEvent
+    e. Task cleanup
+```
+
+### When users execute stop-with-savepoint [--drain]
+
+Users could execute the command stop-with-savepoint [--drain] for both bounded and unbounded jobs to trigger jobs to finish.
+In this case, Flink first triggers a synchronous savepoint and all the tasks would stall after seeing the synchronous
+savepoint. If the savepoint succeeds, all the source operators would finish actively and the job would finish the same as the above scenario. 
+
+```
+1. Trigger a savepoint
+2. Sources received savepoint trigger RPC
+    a. If with –-drain
+        i. source operators emit MAX_WATERMARK
+    b. Source emits savepoint barrier
+3. On received MAX_WATERMARK for non-source operators
+    a. Trigger all the event times
+    b. Emit MAX_WATERMARK
+4. On received savepoint barrier for non-source operators
+    a. The task blocks till the savepoint succeed
+5. Finish the source tasks actively
+    a. If with –-drain
+        ii. endInput(inputId) for all the operators
+    b. close() for all the operators
+    c. dispose() for all the operators
+    d. Emit EndOfPartitionEvent
+    e. Task cleanup
+6. On received EndOfPartitionEvent for non-source tasks
+    a. If with –-drain
+        i. endInput(int inputId) for all the operators
+    b. close() for all the operators
+    c. dispose() for all the operators
+    d. Emit EndOfPartitionEvent
+    e. Task cleanup
+```
+
+A parameter `–-drain` is supported with `stop-with-savepoint`: if not specified, the job is expected to resume from this savepoint,
+otherwise the job is expected to terminate permanently. Thus we only emit `MAX_WATERMARK` to trigger all the event timers and call
+`endInput()` for the later case. 
+
+## Revise the Finishing Steps
+
+As described in part one, after revising the process of finishing, we have decoupled the process of "finishing operator logic"
+and "finishing task" by introducing a new `EndOfData` event. After the revision each task will first
+notify the descendants with an `EndOfData` event after executing all the logic
+so that the descendants also have chances to finish executing the operator logic, then
+all the tasks could wait for the next checkpoint or the specified savepoint concurrently to commit all the remaining data. 
+This section will present the detailed protocol of the revised process. Since we have renamed
+`close()` /`dispose()` to `close()` / `finish()`, we’ll stick to the new terminologies in the following description.
+
+The revised process of finishing is shown as follows:
+
+```
+1. Source tasks finished due to no more records or stop-with-savepoint. 
+    a. if no more records or stop-with-savepoint –-drain
+        i. source operators emit MAX_WATERMARK
+        ii. endInput(inputId) for all the operators
+        iii. finish() for all the operators
+    b. emit EndOfData[isDrain = true] event

Review Comment:
   Why `isDrain = true` even if no `--drain`?



##########
_posts/2022-06-01-final-checkpoint-part2.md:
##########
@@ -0,0 +1,240 @@
+---
+layout: post 
+title:  "FLIP-147: Support Checkpoints After Tasks Finished - Part Two"
+date: 2022-06-01T00:00:00.000Z 
+authors:
+- Yun Gao:
+  name: "Yun Gao"
+- Dawid Wysakowicz:
+  name: "Dawid Wysakowicz"
+- Daisy Tsang:
+  name: "Daisy Tsang"
+excerpt: This post presents more details on the changes on the checkpoint procedure and task finish process made by the final checkpoint mechanism.
+
+---
+
+In the [first part]({{site.baseurl}}/2022/06/01/final-checkpoint-part1.html) of this blog,
+we have briefly introduced the work to support checkpoints after tasks get
+finished and revise the process of finishing. In this part we will present more details on the implementation,
+including how we support checkpoints with finished tasks and the revised protocol of the finish process.
+
+# Implementation of support Checkpointing with Finished Tasks
+
+As described in part one,
+to support checkpoints after some tasks are finished, the core idea is to mark
+the finished operators in checkpoints and skip executing these operators after recovery. To implement this idea,
+we enhanced the checkpointing procedure to generate the flag and use the flag on recovery. This section presents
+more details on the process of taking checkpoints with finished tasks and recovery from such checkpoints. 
+
+Previously, checkpointing only worked when all tasks were running. As shown in the Figure 1, in this case the
+checkpoint coordinator first notify all the source tasks, and then the source tasks further notify the
+downstream tasks to take snapshots via barrier events. Similarly, if there are finished tasks, we need to
+find the new "source" tasks to initiate the checkpoint, namely those tasks that are still running but have
+no running precedent tasks. CheckpointCoordinator does the computation atomically at the JobManager side
+based on the latest states recorded in the execution graph.
+
+It is also possible that tasks finished before they actually get triggered: when the checkpoint coordinator
+decides to trigger one task and starts emitting the RPC, it is possible that the task is just finished and
+reporting the FINISHED status to JobManager. In this case, the RPC message would fail and the checkpoint would be aborted.
+
+<center>
+<img vspace="20" style="width:50%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/checkpoint_trigger.png" />
+<p style="font-size: 0.6em">
+  Figure 1. The tasks chosen as the new sources when taking checkpoint with finished tasks. The principle is to
+  choose the running tasks whose precedent tasks are all finished. 
+</p>
+</center>
+
+In order to keep track of the finish status of each operator, we need to extend the checkpoint format.
+A checkpoint consists of the states of all the stateful operators, and the state of one operator consists of the
+entries from all its parallel instances. Note that the concept of Task is not reflected in the checkpoint. Task
+is more of a physical execution container that drives the behavior of operators. It is not well-defined across
+multiple executions of the same job since job upgrades might modify the operators contained in one task. 
+Therefore, the finished status should also be attached to the operators.
+
+As shown in the Figure 2, operators could be classified into three types according to their finished status:
+
+1. Fully finished: If all the instances of an operator are finished, we could view the logic of the operators as
+fully executed and we should skip the execution of the operator after recovery. We need to store a special flag for this
+kind of operator. 
+2. Partially finished: If only some instances of an operator are finished, then we still need to continue executing the
+remaining logic of this operator. As a whole we could view the state of the operator as the set of entries collected from all the
+running instances, which represents the remaining workload for this operator.
+3. No finished instances: In this case, the state of the operator is the same as the one taken when no tasks are finished.
+
+<center>
+<img vspace="20" style="width:50%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/checkpoint_format.png" />
+<p style="font-size: 0.6em">
+  Figure 2. An illustration of the extended checkpoint format.
+</p>
+</center>
+
+If the job is later restored from a checkpoint taken with finished tasks, we would skip executing all the logic for fully
+finished operators, and execute normally for the operators with no finished instances.
+
+However, this would be a bit complex for the partially finished operators. The state of partially finished operators would be
+redistributed to all the instances, similar to rescaling when the parallelism is changed. Among all the types of states that
+Flink offers, the [keyed state](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#using-keyed-state)
+and [operator state](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#using-operator-state)
+with even-split redistribution would work normally, but the 
+[broadcast state](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#broadcast-state) and 
+[operator state with union redistribution](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#using-operator-state)
+would be affected for the following reasons: 
+
+1. The broadcast state always replicates the state of the first subtask to the other subtasks. If the first subtask is finished,
+an empty state would be distributed and the operator would run from scratch, which is not correct.
+2. The operator state with union distribution merges the states of all the subtasks and then scatters the merged state to all the
+subtasks. Based on this behavior, some operators may choose one subtask to store a shared value and after restarting this value will
+be distributed to all the subtasks. However, if this chosen task is finished, the state would be lost. 
+
+These two issues would not occur in when rescaling since there would be no finished tasks in that scenario. To address
+these issues, we chose one of the running subtasks instead to acquire the current state for the broadcast state. For the operator
+state with union redistribution, we have to collect the states of all the subtasks to maintain the semantics. Thus, currently we
+abort the checkpoint if parts of subtasks finished for operators using this kind of state. 
+
+In principle, you should be able to modify your job (which changes the dataflow graph) and restore from a previous checkpoint. That said,
+there are certain graph modifications that are not supported. These kinds of changes include adding a new operator before a fully finished
+one. Flink would check for such modifications and throw exceptions while restoring.
+
+# The Revised Process of Finishing
+
+As described in the part one, based on the ability to take checkpoints with finished tasks, we revised the process of finishing
+so that we could always commit all the data for two-phase-commit sinks. We’ll show the detailed protocol of the finished process in this
+section.
+
+## How did Jobs in Flink Finish Before?
+
+A job might finish in two ways: all sources finish or users execute
+[`stop-with-savepoint [--drain]`](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/cli/#stopping-a-job-gracefully-creating-a-final-savepoint).
+Let’s first have a look at the detailed process of finishing before. 

Review Comment:
   maybe `before ` -> `before FLIP-147`



##########
_posts/2022-06-01-final-checkpoint-part2.md:
##########
@@ -0,0 +1,240 @@
+---
+layout: post 
+title:  "FLIP-147: Support Checkpoints After Tasks Finished - Part Two"
+date: 2022-06-01T00:00:00.000Z 
+authors:
+- Yun Gao:
+  name: "Yun Gao"
+- Dawid Wysakowicz:
+  name: "Dawid Wysakowicz"
+- Daisy Tsang:
+  name: "Daisy Tsang"
+excerpt: This post presents more details on the changes on the checkpoint procedure and task finish process made by the final checkpoint mechanism.
+
+---
+
+In the [first part]({{site.baseurl}}/2022/06/01/final-checkpoint-part1.html) of this blog,
+we have briefly introduced the work to support checkpoints after tasks get
+finished and revise the process of finishing. In this part we will present more details on the implementation,
+including how we support checkpoints with finished tasks and the revised protocol of the finish process.
+
+# Implementation of support Checkpointing with Finished Tasks
+
+As described in part one,
+to support checkpoints after some tasks are finished, the core idea is to mark
+the finished operators in checkpoints and skip executing these operators after recovery. To implement this idea,
+we enhanced the checkpointing procedure to generate the flag and use the flag on recovery. This section presents
+more details on the process of taking checkpoints with finished tasks and recovery from such checkpoints. 
+
+Previously, checkpointing only worked when all tasks were running. As shown in the Figure 1, in this case the
+checkpoint coordinator first notify all the source tasks, and then the source tasks further notify the
+downstream tasks to take snapshots via barrier events. Similarly, if there are finished tasks, we need to
+find the new "source" tasks to initiate the checkpoint, namely those tasks that are still running but have
+no running precedent tasks. CheckpointCoordinator does the computation atomically at the JobManager side
+based on the latest states recorded in the execution graph.
+
+It is also possible that tasks finished before they actually get triggered: when the checkpoint coordinator
+decides to trigger one task and starts emitting the RPC, it is possible that the task is just finished and
+reporting the FINISHED status to JobManager. In this case, the RPC message would fail and the checkpoint would be aborted.
+
+<center>
+<img vspace="20" style="width:50%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/checkpoint_trigger.png" />
+<p style="font-size: 0.6em">
+  Figure 1. The tasks chosen as the new sources when taking checkpoint with finished tasks. The principle is to
+  choose the running tasks whose precedent tasks are all finished. 
+</p>
+</center>
+
+In order to keep track of the finish status of each operator, we need to extend the checkpoint format.
+A checkpoint consists of the states of all the stateful operators, and the state of one operator consists of the
+entries from all its parallel instances. Note that the concept of Task is not reflected in the checkpoint. Task
+is more of a physical execution container that drives the behavior of operators. It is not well-defined across
+multiple executions of the same job since job upgrades might modify the operators contained in one task. 
+Therefore, the finished status should also be attached to the operators.
+
+As shown in the Figure 2, operators could be classified into three types according to their finished status:
+
+1. Fully finished: If all the instances of an operator are finished, we could view the logic of the operators as
+fully executed and we should skip the execution of the operator after recovery. We need to store a special flag for this
+kind of operator. 
+2. Partially finished: If only some instances of an operator are finished, then we still need to continue executing the
+remaining logic of this operator. As a whole we could view the state of the operator as the set of entries collected from all the
+running instances, which represents the remaining workload for this operator.
+3. No finished instances: In this case, the state of the operator is the same as the one taken when no tasks are finished.
+
+<center>
+<img vspace="20" style="width:50%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/checkpoint_format.png" />
+<p style="font-size: 0.6em">
+  Figure 2. An illustration of the extended checkpoint format.
+</p>
+</center>
+
+If the job is later restored from a checkpoint taken with finished tasks, we would skip executing all the logic for fully
+finished operators, and execute normally for the operators with no finished instances.
+
+However, this would be a bit complex for the partially finished operators. The state of partially finished operators would be
+redistributed to all the instances, similar to rescaling when the parallelism is changed. Among all the types of states that
+Flink offers, the [keyed state](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#using-keyed-state)
+and [operator state](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#using-operator-state)
+with even-split redistribution would work normally, but the 
+[broadcast state](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#broadcast-state) and 
+[operator state with union redistribution](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#using-operator-state)
+would be affected for the following reasons: 
+
+1. The broadcast state always replicates the state of the first subtask to the other subtasks. If the first subtask is finished,
+an empty state would be distributed and the operator would run from scratch, which is not correct.
+2. The operator state with union distribution merges the states of all the subtasks and then scatters the merged state to all the
+subtasks. Based on this behavior, some operators may choose one subtask to store a shared value and after restarting this value will
+be distributed to all the subtasks. However, if this chosen task is finished, the state would be lost. 
+
+These two issues would not occur in when rescaling since there would be no finished tasks in that scenario. To address
+these issues, we chose one of the running subtasks instead to acquire the current state for the broadcast state. For the operator
+state with union redistribution, we have to collect the states of all the subtasks to maintain the semantics. Thus, currently we
+abort the checkpoint if parts of subtasks finished for operators using this kind of state. 
+
+In principle, you should be able to modify your job (which changes the dataflow graph) and restore from a previous checkpoint. That said,
+there are certain graph modifications that are not supported. These kinds of changes include adding a new operator before a fully finished

Review Comment:
   before -> as the precedent of



##########
_posts/2022-06-01-final-checkpoint-part2.md:
##########
@@ -0,0 +1,240 @@
+---
+layout: post 
+title:  "FLIP-147: Support Checkpoints After Tasks Finished - Part Two"
+date: 2022-06-01T00:00:00.000Z 
+authors:
+- Yun Gao:
+  name: "Yun Gao"
+- Dawid Wysakowicz:
+  name: "Dawid Wysakowicz"
+- Daisy Tsang:
+  name: "Daisy Tsang"
+excerpt: This post presents more details on the changes on the checkpoint procedure and task finish process made by the final checkpoint mechanism.
+
+---
+
+In the [first part]({{site.baseurl}}/2022/06/01/final-checkpoint-part1.html) of this blog,
+we have briefly introduced the work to support checkpoints after tasks get
+finished and revise the process of finishing. In this part we will present more details on the implementation,
+including how we support checkpoints with finished tasks and the revised protocol of the finish process.
+
+# Implementation of support Checkpointing with Finished Tasks
+
+As described in part one,
+to support checkpoints after some tasks are finished, the core idea is to mark
+the finished operators in checkpoints and skip executing these operators after recovery. To implement this idea,
+we enhanced the checkpointing procedure to generate the flag and use the flag on recovery. This section presents
+more details on the process of taking checkpoints with finished tasks and recovery from such checkpoints. 
+
+Previously, checkpointing only worked when all tasks were running. As shown in the Figure 1, in this case the
+checkpoint coordinator first notify all the source tasks, and then the source tasks further notify the
+downstream tasks to take snapshots via barrier events. Similarly, if there are finished tasks, we need to
+find the new "source" tasks to initiate the checkpoint, namely those tasks that are still running but have
+no running precedent tasks. CheckpointCoordinator does the computation atomically at the JobManager side
+based on the latest states recorded in the execution graph.
+
+It is also possible that tasks finished before they actually get triggered: when the checkpoint coordinator
+decides to trigger one task and starts emitting the RPC, it is possible that the task is just finished and
+reporting the FINISHED status to JobManager. In this case, the RPC message would fail and the checkpoint would be aborted.
+
+<center>
+<img vspace="20" style="width:50%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/checkpoint_trigger.png" />
+<p style="font-size: 0.6em">
+  Figure 1. The tasks chosen as the new sources when taking checkpoint with finished tasks. The principle is to
+  choose the running tasks whose precedent tasks are all finished. 
+</p>
+</center>
+
+In order to keep track of the finish status of each operator, we need to extend the checkpoint format.
+A checkpoint consists of the states of all the stateful operators, and the state of one operator consists of the
+entries from all its parallel instances. Note that the concept of Task is not reflected in the checkpoint. Task
+is more of a physical execution container that drives the behavior of operators. It is not well-defined across
+multiple executions of the same job since job upgrades might modify the operators contained in one task. 
+Therefore, the finished status should also be attached to the operators.
+
+As shown in the Figure 2, operators could be classified into three types according to their finished status:
+
+1. Fully finished: If all the instances of an operator are finished, we could view the logic of the operators as
+fully executed and we should skip the execution of the operator after recovery. We need to store a special flag for this
+kind of operator. 
+2. Partially finished: If only some instances of an operator are finished, then we still need to continue executing the
+remaining logic of this operator. As a whole we could view the state of the operator as the set of entries collected from all the
+running instances, which represents the remaining workload for this operator.
+3. No finished instances: In this case, the state of the operator is the same as the one taken when no tasks are finished.
+
+<center>
+<img vspace="20" style="width:50%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/checkpoint_format.png" />
+<p style="font-size: 0.6em">
+  Figure 2. An illustration of the extended checkpoint format.
+</p>
+</center>
+
+If the job is later restored from a checkpoint taken with finished tasks, we would skip executing all the logic for fully
+finished operators, and execute normally for the operators with no finished instances.
+
+However, this would be a bit complex for the partially finished operators. The state of partially finished operators would be
+redistributed to all the instances, similar to rescaling when the parallelism is changed. Among all the types of states that
+Flink offers, the [keyed state](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#using-keyed-state)
+and [operator state](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#using-operator-state)
+with even-split redistribution would work normally, but the 
+[broadcast state](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#broadcast-state) and 
+[operator state with union redistribution](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#using-operator-state)
+would be affected for the following reasons: 
+
+1. The broadcast state always replicates the state of the first subtask to the other subtasks. If the first subtask is finished,
+an empty state would be distributed and the operator would run from scratch, which is not correct.
+2. The operator state with union distribution merges the states of all the subtasks and then scatters the merged state to all the
+subtasks. Based on this behavior, some operators may choose one subtask to store a shared value and after restarting this value will
+be distributed to all the subtasks. However, if this chosen task is finished, the state would be lost. 
+
+These two issues would not occur in when rescaling since there would be no finished tasks in that scenario. To address
+these issues, we chose one of the running subtasks instead to acquire the current state for the broadcast state. For the operator
+state with union redistribution, we have to collect the states of all the subtasks to maintain the semantics. Thus, currently we
+abort the checkpoint if parts of subtasks finished for operators using this kind of state. 
+
+In principle, you should be able to modify your job (which changes the dataflow graph) and restore from a previous checkpoint. That said,
+there are certain graph modifications that are not supported. These kinds of changes include adding a new operator before a fully finished
+one. Flink would check for such modifications and throw exceptions while restoring.
+
+# The Revised Process of Finishing
+
+As described in the part one, based on the ability to take checkpoints with finished tasks, we revised the process of finishing
+so that we could always commit all the data for two-phase-commit sinks. We’ll show the detailed protocol of the finished process in this
+section.
+
+## How did Jobs in Flink Finish Before?
+
+A job might finish in two ways: all sources finish or users execute
+[`stop-with-savepoint [--drain]`](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/cli/#stopping-a-job-gracefully-creating-a-final-savepoint).
+Let’s first have a look at the detailed process of finishing before. 
+
+### When sources finish
+
+If all the sources are bounded, a job would finish after all the sources finished. In this case, the sources would first
+emit a `MAX_WATERMARK` (`Long.MAX_VALUE`) and then start to terminate the task. On termination, a task would call `endOfInput()`,
+`close()` and `dispose()` for all the operators, then emit an `EndOfPartitionEvent` to the downstream tasks. The intermediate tasks
+would start terminating after receiving an `EndOfPartitionEvent` from all the input channels, and this process will continue
+until the last task is finished. 
+
+```
+1. Source operators emit MAX_WATERMARK
+2. On received MAX_WATERMARK for non-source operators
+    a. Trigger all the event-time timers
+    b. Emit MAX_WATERMARK
+3. Source tasks finished
+    a. endInput(inputId) for all the operators
+    b. close() for all the operators
+    c. dispose() for all the operators
+    d. Emit EndOfPartitionEvent
+    e. Task cleanup
+4. On received EndOfPartitionEvent for non-source tasks
+    a. endInput(int inputId) for all the operators
+    b. close() for all the operators
+    c. dispose() for all the operators
+    d. Emit EndOfPartitionEvent
+    e. Task cleanup
+```
+
+### When users execute stop-with-savepoint [--drain]
+
+Users could execute the command stop-with-savepoint [--drain] for both bounded and unbounded jobs to trigger jobs to finish.
+In this case, Flink first triggers a synchronous savepoint and all the tasks would stall after seeing the synchronous
+savepoint. If the savepoint succeeds, all the source operators would finish actively and the job would finish the same as the above scenario. 
+
+```
+1. Trigger a savepoint
+2. Sources received savepoint trigger RPC
+    a. If with –-drain
+        i. source operators emit MAX_WATERMARK
+    b. Source emits savepoint barrier
+3. On received MAX_WATERMARK for non-source operators
+    a. Trigger all the event times
+    b. Emit MAX_WATERMARK
+4. On received savepoint barrier for non-source operators
+    a. The task blocks till the savepoint succeed
+5. Finish the source tasks actively
+    a. If with –-drain
+        ii. endInput(inputId) for all the operators
+    b. close() for all the operators
+    c. dispose() for all the operators
+    d. Emit EndOfPartitionEvent
+    e. Task cleanup
+6. On received EndOfPartitionEvent for non-source tasks
+    a. If with –-drain
+        i. endInput(int inputId) for all the operators
+    b. close() for all the operators
+    c. dispose() for all the operators
+    d. Emit EndOfPartitionEvent
+    e. Task cleanup
+```
+
+A parameter `–-drain` is supported with `stop-with-savepoint`: if not specified, the job is expected to resume from this savepoint,
+otherwise the job is expected to terminate permanently. Thus we only emit `MAX_WATERMARK` to trigger all the event timers and call
+`endInput()` for the later case. 
+
+## Revise the Finishing Steps
+
+As described in part one, after revising the process of finishing, we have decoupled the process of "finishing operator logic"
+and "finishing task" by introducing a new `EndOfData` event. After the revision each task will first
+notify the descendants with an `EndOfData` event after executing all the logic
+so that the descendants also have chances to finish executing the operator logic, then
+all the tasks could wait for the next checkpoint or the specified savepoint concurrently to commit all the remaining data. 
+This section will present the detailed protocol of the revised process. Since we have renamed
+`close()` /`dispose()` to `close()` / `finish()`, we’ll stick to the new terminologies in the following description.
+
+The revised process of finishing is shown as follows:
+
+```
+1. Source tasks finished due to no more records or stop-with-savepoint. 
+    a. if no more records or stop-with-savepoint –-drain
+        i. source operators emit MAX_WATERMARK
+        ii. endInput(inputId) for all the operators
+        iii. finish() for all the operators
+    b. emit EndOfData[isDrain = true] event
+    c. Wait for the next checkpoint / the savepoint after operator finished complete
+    d. close() for all the operators
+    e. Emit EndOfPartitionEvent
+    f. Task cleanup
+2. On received MAX_WATERMARK for non-source operators
+    a. Trigger all the event times
+    b. Emit MAX_WATERMARK
+3. On received EndOfData for non-source tasks
+    a. If isDrain
+        i. endInput(int inputId) for all the operators
+        ii. finish() for all the operators
+    b. Emit EndOfData[isDrain = the flag value of the received event]
+4. On received EndOfPartitionEvent for non-source tasks
+    a. Wait for the next checkpoint / the savepoint after operator finished complete
+    b. close() for all the operators
+    c. Emit EndOfPartitionEvent
+    d. Task cleanup
+```
+
+<center>
+<img vspace="20" style="width:60%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/example_job_finish.png" />
+<p style="font-size: 0.6em">
+  Figure 3. An example job of the revised process of finishing.
+</p>
+</center>
+
+An example of the process of job finishing is shown in Figure 3. 
+
+If Task `C` finishes after processing all the records, it first emits the max-watermark, then finishes the operators and emits
+the `EndOfData` event. After that, it waits for the next checkpoint to complete and then emits the `EndOfPartitionEvent`. 
+
+Task `D` finishes all the operators right after receiving the `EndOfData` event. Since any checkpoints taken after operators finish
+can commit all the pending records and be the final checkpoint, Task `D`’s final checkpoint would be the same as Task `C`’s since
+the barrier must be emitted after the `EndOfData` event. 
+
+Task `E` is a bit different in that it has two inputs. Task `A` might continue to run for a while and, thus, Task `E` needs to wait
+until it receives an `EndOfData` event also from the other input before finishing operators and its final checkpoint might be different. 
+
+On the other hand, when using `stop-with-savepoint`, the process is similar except that all the tasks need to wait for the exact

Review Comment:
   Well, I actually thought The above lines are describe a `stop-with-savepoint` until I see this line. Maybe explain in ahead that it is describing the all-source-finished scenario?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-web] gaoyunhaii commented on pull request #545: Add blogs for FLIP-147 support checkpoints after tasks finished

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on PR #545:
URL: https://github.com/apache/flink-web/pull/545#issuecomment-1157179944

   Thanks @zhuzhurk @wanglijie95 for the review! I have updated the blogs according to the comments and also rewrite some part. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-web] gaoyunhaii commented on a diff in pull request #545: Add blogs for FLIP-147 support checkpoints after tasks finished

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on code in PR #545:
URL: https://github.com/apache/flink-web/pull/545#discussion_r901234152


##########
_posts/2022-06-01-final-checkpoint-part1.md:
##########
@@ -81,59 +81,61 @@ running subtasks. The states could be repartitioned on restarting and all the ne
 To support creating such a checkpoint for jobs with finished tasks, we extended the checkpoint procedure.
 Previously the checkpoint coordinator inside the JobManager first notifies all the sources to report snapshots,
 then all the sources further notify their descendants via broadcasting barrier events. Since now the sources might
-already finish, the checkpoint coordinator would instead treat the running tasks who do not have running precedent
-tasks as “new sources”, and notifies these tasks to initiate the checkpoints. The checkpoint could then deduce
-which operator is fully finished based on the task states when triggering checkpoint and the received snapshots. 
+have already finished, the checkpoint coordinator would instead treat the running tasks who also do not have running
+precedent tasks as "new sources", and it notifies these tasks to initiate the checkpoints. Finally, if the subtasks of
+an operator are either finished on triggering checkpoint or have finished processing all the data on snapshotting states,
+the operator would be marked as fully finished.
 
 The changes of the checkpoint procedure are transparent to users except that for checkpoints indeed containing
-finished tasks, we disallowed adding new operators before the fully finished ones, since it would make the fully
+finished tasks, we disallowed adding new operators as precedents of the fully finished ones, since it would make the fully
 finished operators have running precedents after restarting, which conflicts with the design that tasks finished
 in topological order.
 
 # Revise the Process of Finishing
 
 Based on the ability to take checkpoints with finished tasks, we could then solve the issue that two-phase-commit
-operators could not commit all the data when running in streaming mode. As the background, currently Flink jobs
+operators could not commit all the data when running in streaming mode. As the background, Flink jobs
 have two ways to finish:
 
-1.	All sources are bound and they processed all the input records. The job will start to finish after all the sources are finished.
-2.	Users execute `stop-with-savepoint [--drain]`. The job will take a savepoint and then finish. If the `–-drain` parameter is not set,
-the savepoint might be used to start new jobs and the operators will not flush all the event times or call methods marking all
-records processed (like `endInput()`).
+1.	All sources are bound and they processed all the input records. The job will finish after all the sources are finished.
+2.	Users execute `stop-with-savepoint [--drain]`. The job will take a savepoint and then finish. With `–-drain`, the job
+will be stopped permanently and is also required to commit all the data. However, without `--drain` the job might
+be resumed from the savepoint later, thus not all data are required to be committed, as long as the state of the data could be
+recovered from the savepoint.
 
-Ideally we should ensure exactly-once semantics in both cases.
+Let's first have a look at the case of bounded sources. To achieve end-to-end exactly-once,
+two-phase-commit operators only commit data after a checkpoint following this piece of data succeed.
+However, previously there is no such an opportunity for the data between the last periodic checkpoint and job finish,

Review Comment:
   Changed to `job getting finished`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-web] wanglijie95 commented on a diff in pull request #545: Add blogs for FLIP-147 support checkpoints after tasks finished

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on code in PR #545:
URL: https://github.com/apache/flink-web/pull/545#discussion_r894187979


##########
_posts/2022-06-01-final-checkpoint-part1.md:
##########
@@ -0,0 +1,173 @@
+---
+layout: post 
+title:  "FLIP-147: Support Checkpoints After Tasks Finished - Part One"
+date: 2022-06-01T00:00:00.000Z 
+authors:
+- Yun Gao:
+  name: "Yun Gao"
+- Dawid Wysakowicz:
+  name: "Dawid Wysakowicz"
+- Daisy Tsang:
+  name: "Daisy Tsang"
+excerpt: This post briefly describes the motivation and changes made by the final checkpoint mechanism, including the changes to the checkpoint procedure and how tasks finished.
+
+---
+
+# Motivation
+
+Flink is a distributed processing engine for both unbounded and bounded streams of data. In recent versions,
+Flink has unified the DataStream API and the Table / SQL API to support both streaming and batch cases.
+Since most users require both types of data processing pipelines, the unification helps reduce the complexity of developing
+operating, and maintaining consistency between streaming and batch backfilling jobs, like
+[the case for Alibaba](https://www.ververica.com/blog/apache-flinks-stream-batch-unification-powers-alibabas-11.11-in-2020). 
+
+<center>
+<img vspace="20" style="width:70%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/stream_batch_cmp.png" />
+<p style="font-size: 0.6em;text-align:left;margin-top:-1em;margin-bottom: 4em">
+  Figure 1. A comparison of the Stream mode and Batch mode for the example count operator. For streaming mode, the arrived

Review Comment:
   Stream mode -> Streaming mode



##########
_posts/2022-06-01-final-checkpoint-part1.md:
##########
@@ -0,0 +1,173 @@
+---
+layout: post 
+title:  "FLIP-147: Support Checkpoints After Tasks Finished - Part One"
+date: 2022-06-01T00:00:00.000Z 
+authors:
+- Yun Gao:
+  name: "Yun Gao"
+- Dawid Wysakowicz:
+  name: "Dawid Wysakowicz"
+- Daisy Tsang:
+  name: "Daisy Tsang"
+excerpt: This post briefly describes the motivation and changes made by the final checkpoint mechanism, including the changes to the checkpoint procedure and how tasks finished.
+
+---
+
+# Motivation
+
+Flink is a distributed processing engine for both unbounded and bounded streams of data. In recent versions,
+Flink has unified the DataStream API and the Table / SQL API to support both streaming and batch cases.
+Since most users require both types of data processing pipelines, the unification helps reduce the complexity of developing
+operating, and maintaining consistency between streaming and batch backfilling jobs, like
+[the case for Alibaba](https://www.ververica.com/blog/apache-flinks-stream-batch-unification-powers-alibabas-11.11-in-2020). 
+
+<center>
+<img vspace="20" style="width:70%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/stream_batch_cmp.png" />
+<p style="font-size: 0.6em;text-align:left;margin-top:-1em;margin-bottom: 4em">
+  Figure 1. A comparison of the Stream mode and Batch mode for the example count operator. For streaming mode, the arrived
+  elements are not sorted, the operator would read / write the state corresponding to the element for computation.
+  For batch mode, the arrived elements are first sorted as a whole and then processed.
+</p>
+</center>
+
+Flink provides two execution modes under the unified programming API: the streaming mode and the batch mode.
+The streaming mode processes records incrementally based on the states, thus it supports both bounded and unbounded sources.
+The batch mode works with bounded sources and usually has a better performance for bounded jobs because it executes all the
+tasks in topological order and avoids random state access by pre-sorting the input records. Although batch mode is often the
+preferred mode to process bounded jobs, streaming mode is also required for various reasons. For example, users may want to deal
+with records containing retraction or exploit the property that data is roughly sorted by event times in stream mode
+(like the case in [Kappa+ Architecture](https://www.youtube.com/watch?t=666&v=4qSlsYogALo&feature=youtu.be)). Moreover,
+users often have mixed jobs involving both unbounded streams and bounded side-inputs, which also require stream execution mode. 
+
+In streaming mode, [checkpointing](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/checkpointing/)
+is the vital component in supporting exactly-once guarantees. By periodically snapshotting the

Review Comment:
   Maybe component -> mechanism



##########
_posts/2022-06-01-final-checkpoint-part1.md:
##########
@@ -0,0 +1,173 @@
+---
+layout: post 
+title:  "FLIP-147: Support Checkpoints After Tasks Finished - Part One"
+date: 2022-06-01T00:00:00.000Z 
+authors:
+- Yun Gao:
+  name: "Yun Gao"
+- Dawid Wysakowicz:
+  name: "Dawid Wysakowicz"
+- Daisy Tsang:
+  name: "Daisy Tsang"
+excerpt: This post briefly describes the motivation and changes made by the final checkpoint mechanism, including the changes to the checkpoint procedure and how tasks finished.
+
+---
+
+# Motivation
+
+Flink is a distributed processing engine for both unbounded and bounded streams of data. In recent versions,
+Flink has unified the DataStream API and the Table / SQL API to support both streaming and batch cases.
+Since most users require both types of data processing pipelines, the unification helps reduce the complexity of developing
+operating, and maintaining consistency between streaming and batch backfilling jobs, like
+[the case for Alibaba](https://www.ververica.com/blog/apache-flinks-stream-batch-unification-powers-alibabas-11.11-in-2020). 
+
+<center>
+<img vspace="20" style="width:70%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/stream_batch_cmp.png" />
+<p style="font-size: 0.6em;text-align:left;margin-top:-1em;margin-bottom: 4em">
+  Figure 1. A comparison of the Stream mode and Batch mode for the example count operator. For streaming mode, the arrived
+  elements are not sorted, the operator would read / write the state corresponding to the element for computation.
+  For batch mode, the arrived elements are first sorted as a whole and then processed.
+</p>
+</center>
+
+Flink provides two execution modes under the unified programming API: the streaming mode and the batch mode.
+The streaming mode processes records incrementally based on the states, thus it supports both bounded and unbounded sources.
+The batch mode works with bounded sources and usually has a better performance for bounded jobs because it executes all the
+tasks in topological order and avoids random state access by pre-sorting the input records. Although batch mode is often the
+preferred mode to process bounded jobs, streaming mode is also required for various reasons. For example, users may want to deal
+with records containing retraction or exploit the property that data is roughly sorted by event times in stream mode

Review Comment:
   stream mode -> streaming mode



##########
_posts/2022-06-01-final-checkpoint-part1.md:
##########
@@ -0,0 +1,173 @@
+---
+layout: post 
+title:  "FLIP-147: Support Checkpoints After Tasks Finished - Part One"
+date: 2022-06-01T00:00:00.000Z 
+authors:
+- Yun Gao:
+  name: "Yun Gao"
+- Dawid Wysakowicz:
+  name: "Dawid Wysakowicz"
+- Daisy Tsang:
+  name: "Daisy Tsang"
+excerpt: This post briefly describes the motivation and changes made by the final checkpoint mechanism, including the changes to the checkpoint procedure and how tasks finished.
+
+---
+
+# Motivation
+
+Flink is a distributed processing engine for both unbounded and bounded streams of data. In recent versions,
+Flink has unified the DataStream API and the Table / SQL API to support both streaming and batch cases.
+Since most users require both types of data processing pipelines, the unification helps reduce the complexity of developing
+operating, and maintaining consistency between streaming and batch backfilling jobs, like
+[the case for Alibaba](https://www.ververica.com/blog/apache-flinks-stream-batch-unification-powers-alibabas-11.11-in-2020). 
+
+<center>
+<img vspace="20" style="width:70%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/stream_batch_cmp.png" />
+<p style="font-size: 0.6em;text-align:left;margin-top:-1em;margin-bottom: 4em">
+  Figure 1. A comparison of the Stream mode and Batch mode for the example count operator. For streaming mode, the arrived
+  elements are not sorted, the operator would read / write the state corresponding to the element for computation.
+  For batch mode, the arrived elements are first sorted as a whole and then processed.
+</p>
+</center>
+
+Flink provides two execution modes under the unified programming API: the streaming mode and the batch mode.
+The streaming mode processes records incrementally based on the states, thus it supports both bounded and unbounded sources.
+The batch mode works with bounded sources and usually has a better performance for bounded jobs because it executes all the
+tasks in topological order and avoids random state access by pre-sorting the input records. Although batch mode is often the
+preferred mode to process bounded jobs, streaming mode is also required for various reasons. For example, users may want to deal
+with records containing retraction or exploit the property that data is roughly sorted by event times in stream mode
+(like the case in [Kappa+ Architecture](https://www.youtube.com/watch?t=666&v=4qSlsYogALo&feature=youtu.be)). Moreover,
+users often have mixed jobs involving both unbounded streams and bounded side-inputs, which also require stream execution mode. 
+
+In streaming mode, [checkpointing](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/checkpointing/)
+is the vital component in supporting exactly-once guarantees. By periodically snapshotting the
+aligned states of operators, Flink can recover from the latest checkpoint and continue execution when failover happens. However,
+previously Flink could not take checkpoints if any tasks finished. This would cause problems for jobs with both bounded and unbounded
+sources: if there are no checkpoints after the bounded part finished, the unbounded part might need to reprocess a large amount of
+records in case of a failure. 
+
+Furthermore, being unable to take checkpoints with finished tasks caused issues for jobs using two-phase-commit sinks to achieve
+[end-to-end exactly-once processing](https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html).
+The two-phase-commit sinks first write data to temporary files or external transactions,
+and commit the data only after a checkpoint completes to ensure the data would not be replayed on failure. However, if a job
+contains bounded sources, committing the results would not be possible after the bounded sources finish. Also because of that,
+for bounded jobs we have no way to commit the last piece of data after the first source task finished, and previously the bounded
+jobs just ignore the uncommitted data when finishing. These behaviors caused a lot of confusion and are always asked in the user
+mailing list. 
+
+Therefore, to complete the support of streaming mode for jobs using bounded sources, it is important for us to 
+
+1. Support taking checkpoints with finished tasks.
+2. Furthermore, revise the process of finishing so that all the data could always be committed.
+
+The remaining blog first briefly describes the changes we made to achieve the above targets. In the next blog,
+we’ll also share more details on how they are implemented. 
+
+# Support Checkpointing with Finished Tasks
+
+The core idea of supporting checkpoints with finished tasks is to mark the finished operators in checkpoints and skip
+executing these operators after recovery. As illustrated in Figure 2, a checkpoint is composed of the states of all
+the operators. If all the subtasks of an operator have finished, we could mark it as fully finished and skip the
+execution of this operator on startup. For other operators, their states are composed of the states of all the
+running subtasks. The states could be repartitioned on restarting and all the new subtasks restarted with the assigned states. 
+
+<center>
+<img vspace="20" style="width:50%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/checkpoint_format.png" />
+<p style="font-size: 0.6em;text-align:center;margin-top:-1em;margin-bottom: 4em">
+  Figure 2. An illustration of the extended checkpoint format.
+</p>
+</center>
+
+To support creating such a checkpoint for jobs with finished tasks, we extended the checkpoint procedure.
+Previously the checkpoint coordinator inside the JobManager first notifies all the sources to report snapshots,
+then all the sources further notify their descendants via broadcasting barrier events. Since now the sources might
+already finish, the checkpoint coordinator would instead treat the running tasks who do not have running precedent
+tasks as “new sources”, and notifies these tasks to initiate the checkpoints. The checkpoint could then deduce
+which operator is fully finished based on the task states when triggering checkpoint and the received snapshots. 
+
+The changes of the checkpoint procedure are transparent to users except that for checkpoints indeed containing
+finished tasks, we disallowed adding new operators before the fully finished ones, since it would make the fully
+finished operators have running precedents after restarting, which conflicts with the design that tasks finished
+in topological order.
+
+# Revise the Process of Finishing
+
+Based on the ability to take checkpoints with finished tasks, we could then solve the issue that two-phase-commit
+operators could not commit all the data when running in streaming mode. As the background, currently Flink jobs
+have two ways to finish:
+
+1.	All sources are bound and they processed all the input records. The job will start to finish after all the sources are finished.
+2.	Users execute `stop-with-savepoint [--drain]`. The job will take a savepoint and then finish. If the `–-drain` parameter is not set,
+the savepoint might be used to start new jobs and the operators will not flush all the event times or call methods marking all
+records processed (like `endInput()`).
+
+Ideally we should ensure exactly-once semantics in both cases.
+
+To achieve exactly-once, currently two-phase-commit operators only commit data when a checkpoint after all these
+data succeed. However, for the bounded source case, currently there is no suitable time point to commit the data
+between the last periodic checkpoint and the task get finished: if we commit them directly on task get finished,
+then if there are failovers after that (like due to other unfinished tasks get failed), these records would be
+re-emitted and cause data duplication; if we discard these records as now, these records would be lost and cause data loss. 
+
+This issue also exists for the stop-with-savepoint case: the current implementation first stalls the execution and
+takes a savepoint, then it notifies all the sources to start finishing. However, with `–-drain` the operator logic like
+`endInput()` would still be executed during finishing and generate new records, these records would not be committed.
+In addition to that, we could see that in fact the savepoint also does not include these records and thus it is
+inconsistent, which makes the situation worse. 
+
+To provide an opportunity for the operators to commit the last piece of data, we need to modify the process of finishing
+to wait for one more checkpoint after tasks processed all data. An intuitive idea is to directly insert a step to the
+tasks’ lifecycle to wait for the next checkpoint, as shown in the left part of Figure 3. However, it could not solve
+all the issues.
+
+<center>
+<img vspace="20" style="width:90%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/finish_cmp.png" />
+<p style="font-size: 0.6em;text-align:left;margin-top:-1em;margin-bottom: 4em">
+  Figure 3. A comparison of the two options to make tasks wait for one more checkpoint before getting finished. The first
+  option directly inserts a step in the tasks’ lifecycle to wait for the next checkpoint, which disallows the tasks to wait
+  for the same checkpoint / savepoint. The second option decouples the notification of all records processed and task finished
+  and allows all tasks to first process all records, then they have the chance to wait for the save checkpoint / savepoint. 
+</p>
+</center>
+
+For the case of bounded sources, the intuitive idea works, but it might have performance issues in some cases:
+as exemplified in Figure 4, If there are multiple cascading tasks containing two-phase commit sinks, each task would
+wait for the next checkpoint separately, thus the job needs to wait for three more checkpoints during finishing,
+which might prolong the total execution time for a long time. 
+
+<center>
+<img vspace="20" style="width:90%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/example_job.png" />
+<p style="font-size: 0.6em;text-align:center;margin-top:-1em;margin-bottom: 4em">
+    Figure 4. An example job that contains a chain of tasks containing two-phase-commit operators. 
+</p>
+</center>
+
+For the case of `stop-with-savepoint [--drain]`, the intuitive idea is also flawed since different tasks have to
+wait for different checkpoints / savepoints, thus we could not finish the job with a specific savepoint.
+
+To further overcome these issues, we’d better decouple "finishing operator logic" and "finishing tasks": if we could first

Review Comment:
   "finishing operator logic" "finishing tasks", I prefer to use italic or bold here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-web] gaoyunhaii closed pull request #545: Add blogs for FLIP-147 support checkpoints after tasks finished

Posted by GitBox <gi...@apache.org>.
gaoyunhaii closed pull request #545: Add blogs for FLIP-147 support checkpoints after tasks finished
URL: https://github.com/apache/flink-web/pull/545


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-web] gaoyunhaii commented on a diff in pull request #545: Add blogs for FLIP-147 support checkpoints after tasks finished

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on code in PR #545:
URL: https://github.com/apache/flink-web/pull/545#discussion_r901234540


##########
_posts/2022-06-01-final-checkpoint-part1.md:
##########
@@ -81,59 +81,61 @@ running subtasks. The states could be repartitioned on restarting and all the ne
 To support creating such a checkpoint for jobs with finished tasks, we extended the checkpoint procedure.
 Previously the checkpoint coordinator inside the JobManager first notifies all the sources to report snapshots,
 then all the sources further notify their descendants via broadcasting barrier events. Since now the sources might
-already finish, the checkpoint coordinator would instead treat the running tasks who do not have running precedent
-tasks as “new sources”, and notifies these tasks to initiate the checkpoints. The checkpoint could then deduce
-which operator is fully finished based on the task states when triggering checkpoint and the received snapshots. 
+have already finished, the checkpoint coordinator would instead treat the running tasks who also do not have running
+precedent tasks as "new sources", and it notifies these tasks to initiate the checkpoints. Finally, if the subtasks of
+an operator are either finished on triggering checkpoint or have finished processing all the data on snapshotting states,
+the operator would be marked as fully finished.
 
 The changes of the checkpoint procedure are transparent to users except that for checkpoints indeed containing
-finished tasks, we disallowed adding new operators before the fully finished ones, since it would make the fully
+finished tasks, we disallowed adding new operators as precedents of the fully finished ones, since it would make the fully
 finished operators have running precedents after restarting, which conflicts with the design that tasks finished
 in topological order.
 
 # Revise the Process of Finishing
 
 Based on the ability to take checkpoints with finished tasks, we could then solve the issue that two-phase-commit
-operators could not commit all the data when running in streaming mode. As the background, currently Flink jobs
+operators could not commit all the data when running in streaming mode. As the background, Flink jobs
 have two ways to finish:
 
-1.	All sources are bound and they processed all the input records. The job will start to finish after all the sources are finished.
-2.	Users execute `stop-with-savepoint [--drain]`. The job will take a savepoint and then finish. If the `–-drain` parameter is not set,
-the savepoint might be used to start new jobs and the operators will not flush all the event times or call methods marking all
-records processed (like `endInput()`).
+1.	All sources are bound and they processed all the input records. The job will finish after all the sources are finished.
+2.	Users execute `stop-with-savepoint [--drain]`. The job will take a savepoint and then finish. With `–-drain`, the job
+will be stopped permanently and is also required to commit all the data. However, without `--drain` the job might
+be resumed from the savepoint later, thus not all data are required to be committed, as long as the state of the data could be
+recovered from the savepoint.
 
-Ideally we should ensure exactly-once semantics in both cases.
+Let's first have a look at the case of bounded sources. To achieve end-to-end exactly-once,
+two-phase-commit operators only commit data after a checkpoint following this piece of data succeed.
+However, previously there is no such an opportunity for the data between the last periodic checkpoint and job finish,
+and the data finally gets lost. Note that it is also not correct if we directly commit the data on job finished, since
+if there are failovers after that (like due to other unfinished tasks get failed), the data will be replayed and cause replication.
 
-To achieve exactly-once, currently two-phase-commit operators only commit data when a checkpoint after all these
-data succeed. However, for the bounded source case, currently there is no suitable time point to commit the data
-between the last periodic checkpoint and the task get finished: if we commit them directly on task get finished,
-then if there are failovers after that (like due to other unfinished tasks get failed), these records would be
-re-emitted and cause data duplication; if we discard these records as now, these records would be lost and cause data loss. 
+The case of `stop-with-savepoint --drain` also has problems. The previous implementation first stalls the execution and
+take a savepoint. After the savepoint succeeds, all the source tasks would stop actively. Although the savepoint seems to
+provide the opportunity to commit all the data, some processing logic is in fact executed during the job getting stopped,
+and the records produced would be deserted by mistake. For example, calling `endInput()` method for operators happens during

Review Comment:
   Yes, I changed the word



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-web] zhuzhurk commented on a diff in pull request #545: Add blogs for FLIP-147 support checkpoints after tasks finished

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #545:
URL: https://github.com/apache/flink-web/pull/545#discussion_r908091944


##########
_posts/2022-06-01-final-checkpoint-part2.md:
##########
@@ -0,0 +1,244 @@
+---
+layout: post 
+title:  "FLIP-147: Support Checkpoints After Tasks Finished - Part Two"
+date: 2022-06-01T00:00:00.000Z 
+authors:
+- Yun Gao:
+  name: "Yun Gao"
+- Dawid Wysakowicz:
+  name: "Dawid Wysakowicz"
+- Daisy Tsang:
+  name: "Daisy Tsang"
+excerpt: This post presents more details on the changes on the checkpoint procedure and task finish process made by the final checkpoint mechanism.
+
+---
+
+In the [first part]({{site.baseurl}}/2022/06/01/final-checkpoint-part1.html) of this blog,
+we have briefly introduced the work to support checkpoints after tasks get
+finished and revise the process of finishing. In this part we will present more details on the implementation,
+including how we support checkpoints with finished tasks and the revised protocol of the finish process.
+
+# Implementation of support Checkpointing with Finished Tasks
+
+As described in part one,
+to support checkpoints after some tasks are finished, the core idea is to mark
+the finished operators in checkpoints and skip executing these operators after recovery. To implement this idea,
+we enhanced the checkpointing procedure to generate the flag and use the flag on recovery. This section presents
+more details on the process of taking checkpoints with finished tasks and recovery from such checkpoints. 
+
+Previously, checkpointing only worked when all tasks were running. As shown in the Figure 1, in this case the
+checkpoint coordinator first notify all the source tasks, and then the source tasks further notify the
+downstream tasks to take snapshots via barrier events. Similarly, if there are finished tasks, we need to
+find the new "source" tasks to initiate the checkpoint, namely those tasks that are still running but have
+no running precedent tasks. CheckpointCoordinator does the computation atomically at the JobManager side
+based on the latest states recorded in the execution graph.
+
+There might be race conditions when triggering tasks: when the checkpoint coordinator
+decides to trigger one task and starts emitting the RPC, it is possible that the task is just finished and
+reporting the FINISHED status to JobManager. In this case, the RPC message would fail and the checkpoint would be aborted.
+
+<center>
+<img vspace="20" style="width:50%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/checkpoint_trigger.png" />
+<p style="font-size: 0.6em">
+  Figure 1. The tasks chosen as the new sources when taking checkpoint with finished tasks. The principle is to
+  choose the running tasks whose precedent tasks are all finished. 
+</p>
+</center>
+
+In order to keep track of the finish status of each operator, we need to extend the checkpoint format.
+A checkpoint consists of the states of all the stateful operators, and the state of one operator consists of the
+entries from all its parallel instances. Note that the concept of Task is not reflected in the checkpoint. Task
+is more of a physical execution container that drives the behavior of operators. It is not well-defined across
+multiple executions of the same job since job upgrades might modify the operators contained in one task. 
+Therefore, the finished status should also be attached to the operators.
+
+As shown in the Figure 2, operators could be classified into three types according to their finished status:
+
+1. Fully finished: If all the instances of an operator are finished, we could view the logic of the operators as
+fully executed and we should skip the execution of the operator after recovery. We need to store a special flag for this
+kind of operator. 
+2. Partially finished: If only some instances of an operator are finished, then we still need to continue executing the
+remaining logic of this operator. As a whole we could view the state of the operator as the set of entries collected from all the
+running instances, which represents the remaining workload for this operator.
+3. No finished instances: In this case, the state of the operator is the same as the one taken when no tasks are finished.
+
+<center>
+<img vspace="20" style="width:50%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/checkpoint_format.png" />
+<p style="font-size: 0.6em">
+  Figure 2. An illustration of the extended checkpoint format.
+</p>
+</center>
+
+If the job is later restored from a checkpoint taken with finished tasks, we would skip executing all the logic for fully
+finished operators, and execute normally for the operators with no finished instances.
+
+However, this would be a bit complex for the partially finished operators. The state of partially finished operators would be
+redistributed to all the instances, similar to rescaling when the parallelism is changed. Among all the types of states that
+Flink offers, the [keyed state](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#using-keyed-state)
+and [operator state](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#using-operator-state)
+with even-split redistribution would work normally, but the 
+[broadcast state](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#broadcast-state) and 
+[operator state with union redistribution](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#using-operator-state)
+would be affected for the following reasons: 
+
+1. The broadcast state always replicates the state of the first subtask to the other subtasks. If the first subtask is finished,
+an empty state would be distributed and the operator would run from scratch, which is not correct.
+2. The operator state with union distribution merges the states of all the subtasks and then sends the merged state to all the
+subtasks. Based on this behavior, some operators may choose one subtask to store a shared value and after restarting this value will
+be distributed to all the subtasks. However, if this chosen task is finished, the state would be lost. 
+
+These two issues would not occur in when rescaling since there would be no finished tasks in that scenario. To address
+these issues, we chose one of the running subtasks instead to acquire the current state for the broadcast state. For the operator
+state with union redistribution, we have to collect the states of all the subtasks to maintain the semantics. Thus, currently we
+abort the checkpoint if parts of subtasks finished for operators using this kind of state. 
+
+In principle, you should be able to modify your job (which changes the dataflow graph) and restore from a previous checkpoint. That said,
+there are certain graph modifications that are not supported. These kinds of changes include adding a new operator as the precedent of a fully finished
+one. Flink would check for such modifications and throw exceptions while restoring.
+
+# The Revised Process of Finishing
+
+As described in the part one, based on the ability to take checkpoints with finished tasks, we revised the process of finishing
+so that we could always commit all the data for two-phase-commit sinks. We’ll show the detailed protocol of the finished process in this
+section.
+
+## How did Jobs in Flink Finish Before?
+
+A job might finish in two ways: all sources finish or users execute
+[`stop-with-savepoint [--drain]`](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/cli/#stopping-a-job-gracefully-creating-a-final-savepoint).
+Let’s first have a look at the detailed process of finishing before FLIP-147.
+
+### When sources finish
+
+If all the sources are bounded, The job will finish after all the input records are processed and all the result are

Review Comment:
   The -> the



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-web] gaoyunhaii commented on pull request #545: Add blogs for FLIP-147 support checkpoints after tasks finished

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on PR #545:
URL: https://github.com/apache/flink-web/pull/545#issuecomment-1180346462

   Thanks @zhuzhurk @wanglijie95 for the review! Merged~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-web] gaoyunhaii commented on pull request #545: Add blogs for FLIP-147 support checkpoints after tasks finished

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on PR #545:
URL: https://github.com/apache/flink-web/pull/545#issuecomment-1159993038

   Very thanks @zhuzhurk for the review! I have update the PR accordingly


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-web] zhuzhurk commented on a diff in pull request #545: Add blogs for FLIP-147 support checkpoints after tasks finished

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #545:
URL: https://github.com/apache/flink-web/pull/545#discussion_r893117141


##########
_posts/2022-06-01-final-checkpoint-part1.md:
##########
@@ -0,0 +1,173 @@
+---
+layout: post 
+title:  "FLIP-147: Support Checkpoints After Tasks Finished - Part One"
+date: 2022-06-01T00:00:00.000Z 
+authors:
+- Yun Gao:
+  name: "Yun Gao"
+- Dawid Wysakowicz:
+  name: "Dawid Wysakowicz"
+- Daisy Tsang:
+  name: "Daisy Tsang"
+excerpt: This post briefly describes the motivation and changes made by the final checkpoint mechanism, including the changes to the checkpoint procedure and how tasks finished.

Review Comment:
   finished -> finish



##########
_posts/2022-06-01-final-checkpoint-part1.md:
##########
@@ -0,0 +1,173 @@
+---
+layout: post 
+title:  "FLIP-147: Support Checkpoints After Tasks Finished - Part One"
+date: 2022-06-01T00:00:00.000Z 
+authors:
+- Yun Gao:
+  name: "Yun Gao"
+- Dawid Wysakowicz:
+  name: "Dawid Wysakowicz"
+- Daisy Tsang:
+  name: "Daisy Tsang"
+excerpt: This post briefly describes the motivation and changes made by the final checkpoint mechanism, including the changes to the checkpoint procedure and how tasks finished.
+
+---
+
+# Motivation
+
+Flink is a distributed processing engine for both unbounded and bounded streams of data. In recent versions,
+Flink has unified the DataStream API and the Table / SQL API to support both streaming and batch cases.
+Since most users require both types of data processing pipelines, the unification helps reduce the complexity of developing
+operating, and maintaining consistency between streaming and batch backfilling jobs, like
+[the case for Alibaba](https://www.ververica.com/blog/apache-flinks-stream-batch-unification-powers-alibabas-11.11-in-2020). 
+
+<center>
+<img vspace="20" style="width:70%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/stream_batch_cmp.png" />
+<p style="font-size: 0.6em;text-align:left;margin-top:-1em;margin-bottom: 4em">
+  Figure 1. A comparison of the Stream mode and Batch mode for the example count operator. For streaming mode, the arrived
+  elements are not sorted, the operator would read / write the state corresponding to the element for computation.
+  For batch mode, the arrived elements are first sorted as a whole and then processed.
+</p>
+</center>
+
+Flink provides two execution modes under the unified programming API: the streaming mode and the batch mode.
+The streaming mode processes records incrementally based on the states, thus it supports both bounded and unbounded sources.
+The batch mode works with bounded sources and usually has a better performance for bounded jobs because it executes all the
+tasks in topological order and avoids random state access by pre-sorting the input records. Although batch mode is often the
+preferred mode to process bounded jobs, streaming mode is also required for various reasons. For example, users may want to deal
+with records containing retraction or exploit the property that data is roughly sorted by event times in stream mode
+(like the case in [Kappa+ Architecture](https://www.youtube.com/watch?t=666&v=4qSlsYogALo&feature=youtu.be)). Moreover,
+users often have mixed jobs involving both unbounded streams and bounded side-inputs, which also require stream execution mode. 

Review Comment:
   Figure 1 seems to be  a supplementary of these statements, maybe put it below the paragraph?



##########
_posts/2022-06-01-final-checkpoint-part1.md:
##########
@@ -0,0 +1,173 @@
+---
+layout: post 
+title:  "FLIP-147: Support Checkpoints After Tasks Finished - Part One"
+date: 2022-06-01T00:00:00.000Z 
+authors:
+- Yun Gao:
+  name: "Yun Gao"
+- Dawid Wysakowicz:
+  name: "Dawid Wysakowicz"
+- Daisy Tsang:
+  name: "Daisy Tsang"
+excerpt: This post briefly describes the motivation and changes made by the final checkpoint mechanism, including the changes to the checkpoint procedure and how tasks finished.
+
+---
+
+# Motivation
+
+Flink is a distributed processing engine for both unbounded and bounded streams of data. In recent versions,
+Flink has unified the DataStream API and the Table / SQL API to support both streaming and batch cases.
+Since most users require both types of data processing pipelines, the unification helps reduce the complexity of developing
+operating, and maintaining consistency between streaming and batch backfilling jobs, like
+[the case for Alibaba](https://www.ververica.com/blog/apache-flinks-stream-batch-unification-powers-alibabas-11.11-in-2020). 
+
+<center>
+<img vspace="20" style="width:70%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/stream_batch_cmp.png" />
+<p style="font-size: 0.6em;text-align:left;margin-top:-1em;margin-bottom: 4em">
+  Figure 1. A comparison of the Stream mode and Batch mode for the example count operator. For streaming mode, the arrived
+  elements are not sorted, the operator would read / write the state corresponding to the element for computation.
+  For batch mode, the arrived elements are first sorted as a whole and then processed.
+</p>
+</center>
+
+Flink provides two execution modes under the unified programming API: the streaming mode and the batch mode.
+The streaming mode processes records incrementally based on the states, thus it supports both bounded and unbounded sources.
+The batch mode works with bounded sources and usually has a better performance for bounded jobs because it executes all the
+tasks in topological order and avoids random state access by pre-sorting the input records. Although batch mode is often the
+preferred mode to process bounded jobs, streaming mode is also required for various reasons. For example, users may want to deal
+with records containing retraction or exploit the property that data is roughly sorted by event times in stream mode
+(like the case in [Kappa+ Architecture](https://www.youtube.com/watch?t=666&v=4qSlsYogALo&feature=youtu.be)). Moreover,
+users often have mixed jobs involving both unbounded streams and bounded side-inputs, which also require stream execution mode. 
+
+In streaming mode, [checkpointing](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/checkpointing/)
+is the vital component in supporting exactly-once guarantees. By periodically snapshotting the
+aligned states of operators, Flink can recover from the latest checkpoint and continue execution when failover happens. However,
+previously Flink could not take checkpoints if any tasks finished. This would cause problems for jobs with both bounded and unbounded
+sources: if there are no checkpoints after the bounded part finished, the unbounded part might need to reprocess a large amount of
+records in case of a failure. 
+
+Furthermore, being unable to take checkpoints with finished tasks caused issues for jobs using two-phase-commit sinks to achieve

Review Comment:
   caused issues -> is a problem



##########
_posts/2022-06-01-final-checkpoint-part1.md:
##########
@@ -0,0 +1,173 @@
+---
+layout: post 
+title:  "FLIP-147: Support Checkpoints After Tasks Finished - Part One"
+date: 2022-06-01T00:00:00.000Z 
+authors:
+- Yun Gao:
+  name: "Yun Gao"
+- Dawid Wysakowicz:
+  name: "Dawid Wysakowicz"
+- Daisy Tsang:
+  name: "Daisy Tsang"
+excerpt: This post briefly describes the motivation and changes made by the final checkpoint mechanism, including the changes to the checkpoint procedure and how tasks finished.
+
+---
+
+# Motivation
+
+Flink is a distributed processing engine for both unbounded and bounded streams of data. In recent versions,
+Flink has unified the DataStream API and the Table / SQL API to support both streaming and batch cases.
+Since most users require both types of data processing pipelines, the unification helps reduce the complexity of developing
+operating, and maintaining consistency between streaming and batch backfilling jobs, like
+[the case for Alibaba](https://www.ververica.com/blog/apache-flinks-stream-batch-unification-powers-alibabas-11.11-in-2020). 
+
+<center>
+<img vspace="20" style="width:70%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/stream_batch_cmp.png" />
+<p style="font-size: 0.6em;text-align:left;margin-top:-1em;margin-bottom: 4em">
+  Figure 1. A comparison of the Stream mode and Batch mode for the example count operator. For streaming mode, the arrived
+  elements are not sorted, the operator would read / write the state corresponding to the element for computation.
+  For batch mode, the arrived elements are first sorted as a whole and then processed.
+</p>
+</center>
+
+Flink provides two execution modes under the unified programming API: the streaming mode and the batch mode.
+The streaming mode processes records incrementally based on the states, thus it supports both bounded and unbounded sources.
+The batch mode works with bounded sources and usually has a better performance for bounded jobs because it executes all the
+tasks in topological order and avoids random state access by pre-sorting the input records. Although batch mode is often the
+preferred mode to process bounded jobs, streaming mode is also required for various reasons. For example, users may want to deal
+with records containing retraction or exploit the property that data is roughly sorted by event times in stream mode
+(like the case in [Kappa+ Architecture](https://www.youtube.com/watch?t=666&v=4qSlsYogALo&feature=youtu.be)). Moreover,
+users often have mixed jobs involving both unbounded streams and bounded side-inputs, which also require stream execution mode. 
+
+In streaming mode, [checkpointing](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/checkpointing/)
+is the vital component in supporting exactly-once guarantees. By periodically snapshotting the
+aligned states of operators, Flink can recover from the latest checkpoint and continue execution when failover happens. However,
+previously Flink could not take checkpoints if any tasks finished. This would cause problems for jobs with both bounded and unbounded
+sources: if there are no checkpoints after the bounded part finished, the unbounded part might need to reprocess a large amount of
+records in case of a failure. 
+
+Furthermore, being unable to take checkpoints with finished tasks caused issues for jobs using two-phase-commit sinks to achieve
+[end-to-end exactly-once processing](https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html).
+The two-phase-commit sinks first write data to temporary files or external transactions,
+and commit the data only after a checkpoint completes to ensure the data would not be replayed on failure. However, if a job
+contains bounded sources, committing the results would not be possible after the bounded sources finish. Also because of that,
+for bounded jobs we have no way to commit the last piece of data after the first source task finished, and previously the bounded
+jobs just ignore the uncommitted data when finishing. These behaviors caused a lot of confusion and are always asked in the user
+mailing list. 
+
+Therefore, to complete the support of streaming mode for jobs using bounded sources, it is important for us to 
+
+1. Support taking checkpoints with finished tasks.
+2. Furthermore, revise the process of finishing so that all the data could always be committed.
+
+The remaining blog first briefly describes the changes we made to achieve the above targets. In the next blog,
+we’ll also share more details on how they are implemented. 
+
+# Support Checkpointing with Finished Tasks
+
+The core idea of supporting checkpoints with finished tasks is to mark the finished operators in checkpoints and skip
+executing these operators after recovery. As illustrated in Figure 2, a checkpoint is composed of the states of all
+the operators. If all the subtasks of an operator have finished, we could mark it as fully finished and skip the
+execution of this operator on startup. For other operators, their states are composed of the states of all the
+running subtasks. The states could be repartitioned on restarting and all the new subtasks restarted with the assigned states. 
+
+<center>
+<img vspace="20" style="width:50%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/checkpoint_format.png" />
+<p style="font-size: 0.6em;text-align:center;margin-top:-1em;margin-bottom: 4em">
+  Figure 2. An illustration of the extended checkpoint format.
+</p>
+</center>
+
+To support creating such a checkpoint for jobs with finished tasks, we extended the checkpoint procedure.
+Previously the checkpoint coordinator inside the JobManager first notifies all the sources to report snapshots,
+then all the sources further notify their descendants via broadcasting barrier events. Since now the sources might
+already finish, the checkpoint coordinator would instead treat the running tasks who do not have running precedent
+tasks as “new sources”, and notifies these tasks to initiate the checkpoints. The checkpoint could then deduce
+which operator is fully finished based on the task states when triggering checkpoint and the received snapshots. 
+
+The changes of the checkpoint procedure are transparent to users except that for checkpoints indeed containing
+finished tasks, we disallowed adding new operators before the fully finished ones, since it would make the fully

Review Comment:
   before -> as precedents of



##########
_posts/2022-06-01-final-checkpoint-part1.md:
##########
@@ -0,0 +1,173 @@
+---
+layout: post 
+title:  "FLIP-147: Support Checkpoints After Tasks Finished - Part One"
+date: 2022-06-01T00:00:00.000Z 
+authors:
+- Yun Gao:
+  name: "Yun Gao"
+- Dawid Wysakowicz:
+  name: "Dawid Wysakowicz"
+- Daisy Tsang:
+  name: "Daisy Tsang"
+excerpt: This post briefly describes the motivation and changes made by the final checkpoint mechanism, including the changes to the checkpoint procedure and how tasks finished.
+
+---
+
+# Motivation
+
+Flink is a distributed processing engine for both unbounded and bounded streams of data. In recent versions,
+Flink has unified the DataStream API and the Table / SQL API to support both streaming and batch cases.
+Since most users require both types of data processing pipelines, the unification helps reduce the complexity of developing
+operating, and maintaining consistency between streaming and batch backfilling jobs, like
+[the case for Alibaba](https://www.ververica.com/blog/apache-flinks-stream-batch-unification-powers-alibabas-11.11-in-2020). 
+
+<center>
+<img vspace="20" style="width:70%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/stream_batch_cmp.png" />
+<p style="font-size: 0.6em;text-align:left;margin-top:-1em;margin-bottom: 4em">
+  Figure 1. A comparison of the Stream mode and Batch mode for the example count operator. For streaming mode, the arrived
+  elements are not sorted, the operator would read / write the state corresponding to the element for computation.
+  For batch mode, the arrived elements are first sorted as a whole and then processed.
+</p>
+</center>
+
+Flink provides two execution modes under the unified programming API: the streaming mode and the batch mode.
+The streaming mode processes records incrementally based on the states, thus it supports both bounded and unbounded sources.
+The batch mode works with bounded sources and usually has a better performance for bounded jobs because it executes all the
+tasks in topological order and avoids random state access by pre-sorting the input records. Although batch mode is often the
+preferred mode to process bounded jobs, streaming mode is also required for various reasons. For example, users may want to deal
+with records containing retraction or exploit the property that data is roughly sorted by event times in stream mode
+(like the case in [Kappa+ Architecture](https://www.youtube.com/watch?t=666&v=4qSlsYogALo&feature=youtu.be)). Moreover,
+users often have mixed jobs involving both unbounded streams and bounded side-inputs, which also require stream execution mode. 
+
+In streaming mode, [checkpointing](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/checkpointing/)
+is the vital component in supporting exactly-once guarantees. By periodically snapshotting the
+aligned states of operators, Flink can recover from the latest checkpoint and continue execution when failover happens. However,
+previously Flink could not take checkpoints if any tasks finished. This would cause problems for jobs with both bounded and unbounded
+sources: if there are no checkpoints after the bounded part finished, the unbounded part might need to reprocess a large amount of
+records in case of a failure. 
+
+Furthermore, being unable to take checkpoints with finished tasks caused issues for jobs using two-phase-commit sinks to achieve
+[end-to-end exactly-once processing](https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html).
+The two-phase-commit sinks first write data to temporary files or external transactions,
+and commit the data only after a checkpoint completes to ensure the data would not be replayed on failure. However, if a job
+contains bounded sources, committing the results would not be possible after the bounded sources finish. Also because of that,
+for bounded jobs we have no way to commit the last piece of data after the first source task finished, and previously the bounded
+jobs just ignore the uncommitted data when finishing. These behaviors caused a lot of confusion and are always asked in the user
+mailing list. 
+
+Therefore, to complete the support of streaming mode for jobs using bounded sources, it is important for us to 
+
+1. Support taking checkpoints with finished tasks.
+2. Furthermore, revise the process of finishing so that all the data could always be committed.
+
+The remaining blog first briefly describes the changes we made to achieve the above targets. In the next blog,
+we’ll also share more details on how they are implemented. 

Review Comment:
   I prefer to remove the `also` 



##########
_posts/2022-06-01-final-checkpoint-part1.md:
##########
@@ -0,0 +1,173 @@
+---
+layout: post 
+title:  "FLIP-147: Support Checkpoints After Tasks Finished - Part One"
+date: 2022-06-01T00:00:00.000Z 
+authors:
+- Yun Gao:
+  name: "Yun Gao"
+- Dawid Wysakowicz:
+  name: "Dawid Wysakowicz"
+- Daisy Tsang:
+  name: "Daisy Tsang"
+excerpt: This post briefly describes the motivation and changes made by the final checkpoint mechanism, including the changes to the checkpoint procedure and how tasks finished.
+
+---
+
+# Motivation
+
+Flink is a distributed processing engine for both unbounded and bounded streams of data. In recent versions,
+Flink has unified the DataStream API and the Table / SQL API to support both streaming and batch cases.
+Since most users require both types of data processing pipelines, the unification helps reduce the complexity of developing
+operating, and maintaining consistency between streaming and batch backfilling jobs, like
+[the case for Alibaba](https://www.ververica.com/blog/apache-flinks-stream-batch-unification-powers-alibabas-11.11-in-2020). 
+
+<center>
+<img vspace="20" style="width:70%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/stream_batch_cmp.png" />
+<p style="font-size: 0.6em;text-align:left;margin-top:-1em;margin-bottom: 4em">
+  Figure 1. A comparison of the Stream mode and Batch mode for the example count operator. For streaming mode, the arrived
+  elements are not sorted, the operator would read / write the state corresponding to the element for computation.
+  For batch mode, the arrived elements are first sorted as a whole and then processed.
+</p>
+</center>
+
+Flink provides two execution modes under the unified programming API: the streaming mode and the batch mode.
+The streaming mode processes records incrementally based on the states, thus it supports both bounded and unbounded sources.
+The batch mode works with bounded sources and usually has a better performance for bounded jobs because it executes all the
+tasks in topological order and avoids random state access by pre-sorting the input records. Although batch mode is often the
+preferred mode to process bounded jobs, streaming mode is also required for various reasons. For example, users may want to deal
+with records containing retraction or exploit the property that data is roughly sorted by event times in stream mode
+(like the case in [Kappa+ Architecture](https://www.youtube.com/watch?t=666&v=4qSlsYogALo&feature=youtu.be)). Moreover,
+users often have mixed jobs involving both unbounded streams and bounded side-inputs, which also require stream execution mode. 
+
+In streaming mode, [checkpointing](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/checkpointing/)
+is the vital component in supporting exactly-once guarantees. By periodically snapshotting the
+aligned states of operators, Flink can recover from the latest checkpoint and continue execution when failover happens. However,
+previously Flink could not take checkpoints if any tasks finished. This would cause problems for jobs with both bounded and unbounded
+sources: if there are no checkpoints after the bounded part finished, the unbounded part might need to reprocess a large amount of
+records in case of a failure. 
+
+Furthermore, being unable to take checkpoints with finished tasks caused issues for jobs using two-phase-commit sinks to achieve
+[end-to-end exactly-once processing](https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html).
+The two-phase-commit sinks first write data to temporary files or external transactions,
+and commit the data only after a checkpoint completes to ensure the data would not be replayed on failure. However, if a job
+contains bounded sources, committing the results would not be possible after the bounded sources finish. Also because of that,
+for bounded jobs we have no way to commit the last piece of data after the first source task finished, and previously the bounded
+jobs just ignore the uncommitted data when finishing. These behaviors caused a lot of confusion and are always asked in the user
+mailing list. 
+
+Therefore, to complete the support of streaming mode for jobs using bounded sources, it is important for us to 
+
+1. Support taking checkpoints with finished tasks.
+2. Furthermore, revise the process of finishing so that all the data could always be committed.
+
+The remaining blog first briefly describes the changes we made to achieve the above targets. In the next blog,

Review Comment:
   I prefer to remove the `first`



##########
_posts/2022-06-01-final-checkpoint-part1.md:
##########
@@ -0,0 +1,173 @@
+---
+layout: post 
+title:  "FLIP-147: Support Checkpoints After Tasks Finished - Part One"
+date: 2022-06-01T00:00:00.000Z 
+authors:
+- Yun Gao:
+  name: "Yun Gao"
+- Dawid Wysakowicz:
+  name: "Dawid Wysakowicz"
+- Daisy Tsang:
+  name: "Daisy Tsang"
+excerpt: This post briefly describes the motivation and changes made by the final checkpoint mechanism, including the changes to the checkpoint procedure and how tasks finished.
+
+---
+
+# Motivation
+
+Flink is a distributed processing engine for both unbounded and bounded streams of data. In recent versions,
+Flink has unified the DataStream API and the Table / SQL API to support both streaming and batch cases.
+Since most users require both types of data processing pipelines, the unification helps reduce the complexity of developing

Review Comment:
   developing -> developing,



##########
_posts/2022-06-01-final-checkpoint-part1.md:
##########
@@ -0,0 +1,173 @@
+---
+layout: post 
+title:  "FLIP-147: Support Checkpoints After Tasks Finished - Part One"
+date: 2022-06-01T00:00:00.000Z 
+authors:
+- Yun Gao:
+  name: "Yun Gao"
+- Dawid Wysakowicz:
+  name: "Dawid Wysakowicz"
+- Daisy Tsang:
+  name: "Daisy Tsang"
+excerpt: This post briefly describes the motivation and changes made by the final checkpoint mechanism, including the changes to the checkpoint procedure and how tasks finished.
+
+---
+
+# Motivation
+
+Flink is a distributed processing engine for both unbounded and bounded streams of data. In recent versions,
+Flink has unified the DataStream API and the Table / SQL API to support both streaming and batch cases.
+Since most users require both types of data processing pipelines, the unification helps reduce the complexity of developing
+operating, and maintaining consistency between streaming and batch backfilling jobs, like
+[the case for Alibaba](https://www.ververica.com/blog/apache-flinks-stream-batch-unification-powers-alibabas-11.11-in-2020). 
+
+<center>
+<img vspace="20" style="width:70%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/stream_batch_cmp.png" />
+<p style="font-size: 0.6em;text-align:left;margin-top:-1em;margin-bottom: 4em">
+  Figure 1. A comparison of the Stream mode and Batch mode for the example count operator. For streaming mode, the arrived
+  elements are not sorted, the operator would read / write the state corresponding to the element for computation.
+  For batch mode, the arrived elements are first sorted as a whole and then processed.
+</p>
+</center>
+
+Flink provides two execution modes under the unified programming API: the streaming mode and the batch mode.
+The streaming mode processes records incrementally based on the states, thus it supports both bounded and unbounded sources.
+The batch mode works with bounded sources and usually has a better performance for bounded jobs because it executes all the
+tasks in topological order and avoids random state access by pre-sorting the input records. Although batch mode is often the
+preferred mode to process bounded jobs, streaming mode is also required for various reasons. For example, users may want to deal
+with records containing retraction or exploit the property that data is roughly sorted by event times in stream mode
+(like the case in [Kappa+ Architecture](https://www.youtube.com/watch?t=666&v=4qSlsYogALo&feature=youtu.be)). Moreover,
+users often have mixed jobs involving both unbounded streams and bounded side-inputs, which also require stream execution mode. 
+
+In streaming mode, [checkpointing](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/checkpointing/)
+is the vital component in supporting exactly-once guarantees. By periodically snapshotting the
+aligned states of operators, Flink can recover from the latest checkpoint and continue execution when failover happens. However,
+previously Flink could not take checkpoints if any tasks finished. This would cause problems for jobs with both bounded and unbounded
+sources: if there are no checkpoints after the bounded part finished, the unbounded part might need to reprocess a large amount of
+records in case of a failure. 
+
+Furthermore, being unable to take checkpoints with finished tasks caused issues for jobs using two-phase-commit sinks to achieve
+[end-to-end exactly-once processing](https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html).
+The two-phase-commit sinks first write data to temporary files or external transactions,
+and commit the data only after a checkpoint completes to ensure the data would not be replayed on failure. However, if a job
+contains bounded sources, committing the results would not be possible after the bounded sources finish. Also because of that,
+for bounded jobs we have no way to commit the last piece of data after the first source task finished, and previously the bounded
+jobs just ignore the uncommitted data when finishing. These behaviors caused a lot of confusion and are always asked in the user
+mailing list. 
+
+Therefore, to complete the support of streaming mode for jobs using bounded sources, it is important for us to 
+
+1. Support taking checkpoints with finished tasks.
+2. Furthermore, revise the process of finishing so that all the data could always be committed.
+
+The remaining blog first briefly describes the changes we made to achieve the above targets. In the next blog,

Review Comment:
   targets -> goals



##########
_posts/2022-06-01-final-checkpoint-part1.md:
##########
@@ -0,0 +1,173 @@
+---
+layout: post 
+title:  "FLIP-147: Support Checkpoints After Tasks Finished - Part One"
+date: 2022-06-01T00:00:00.000Z 
+authors:
+- Yun Gao:
+  name: "Yun Gao"
+- Dawid Wysakowicz:
+  name: "Dawid Wysakowicz"
+- Daisy Tsang:
+  name: "Daisy Tsang"
+excerpt: This post briefly describes the motivation and changes made by the final checkpoint mechanism, including the changes to the checkpoint procedure and how tasks finished.
+
+---
+
+# Motivation
+
+Flink is a distributed processing engine for both unbounded and bounded streams of data. In recent versions,
+Flink has unified the DataStream API and the Table / SQL API to support both streaming and batch cases.
+Since most users require both types of data processing pipelines, the unification helps reduce the complexity of developing
+operating, and maintaining consistency between streaming and batch backfilling jobs, like
+[the case for Alibaba](https://www.ververica.com/blog/apache-flinks-stream-batch-unification-powers-alibabas-11.11-in-2020). 
+
+<center>
+<img vspace="20" style="width:70%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/stream_batch_cmp.png" />
+<p style="font-size: 0.6em;text-align:left;margin-top:-1em;margin-bottom: 4em">
+  Figure 1. A comparison of the Stream mode and Batch mode for the example count operator. For streaming mode, the arrived
+  elements are not sorted, the operator would read / write the state corresponding to the element for computation.
+  For batch mode, the arrived elements are first sorted as a whole and then processed.
+</p>
+</center>
+
+Flink provides two execution modes under the unified programming API: the streaming mode and the batch mode.
+The streaming mode processes records incrementally based on the states, thus it supports both bounded and unbounded sources.
+The batch mode works with bounded sources and usually has a better performance for bounded jobs because it executes all the
+tasks in topological order and avoids random state access by pre-sorting the input records. Although batch mode is often the
+preferred mode to process bounded jobs, streaming mode is also required for various reasons. For example, users may want to deal
+with records containing retraction or exploit the property that data is roughly sorted by event times in stream mode
+(like the case in [Kappa+ Architecture](https://www.youtube.com/watch?t=666&v=4qSlsYogALo&feature=youtu.be)). Moreover,
+users often have mixed jobs involving both unbounded streams and bounded side-inputs, which also require stream execution mode. 
+
+In streaming mode, [checkpointing](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/checkpointing/)
+is the vital component in supporting exactly-once guarantees. By periodically snapshotting the
+aligned states of operators, Flink can recover from the latest checkpoint and continue execution when failover happens. However,
+previously Flink could not take checkpoints if any tasks finished. This would cause problems for jobs with both bounded and unbounded
+sources: if there are no checkpoints after the bounded part finished, the unbounded part might need to reprocess a large amount of
+records in case of a failure. 
+
+Furthermore, being unable to take checkpoints with finished tasks caused issues for jobs using two-phase-commit sinks to achieve
+[end-to-end exactly-once processing](https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html).
+The two-phase-commit sinks first write data to temporary files or external transactions,
+and commit the data only after a checkpoint completes to ensure the data would not be replayed on failure. However, if a job
+contains bounded sources, committing the results would not be possible after the bounded sources finish. Also because of that,
+for bounded jobs we have no way to commit the last piece of data after the first source task finished, and previously the bounded
+jobs just ignore the uncommitted data when finishing. These behaviors caused a lot of confusion and are always asked in the user
+mailing list. 
+
+Therefore, to complete the support of streaming mode for jobs using bounded sources, it is important for us to 
+
+1. Support taking checkpoints with finished tasks.
+2. Furthermore, revise the process of finishing so that all the data could always be committed.
+
+The remaining blog first briefly describes the changes we made to achieve the above targets. In the next blog,
+we’ll also share more details on how they are implemented. 
+
+# Support Checkpointing with Finished Tasks
+
+The core idea of supporting checkpoints with finished tasks is to mark the finished operators in checkpoints and skip
+executing these operators after recovery. As illustrated in Figure 2, a checkpoint is composed of the states of all
+the operators. If all the subtasks of an operator have finished, we could mark it as fully finished and skip the
+execution of this operator on startup. For other operators, their states are composed of the states of all the
+running subtasks. The states could be repartitioned on restarting and all the new subtasks restarted with the assigned states. 
+
+<center>
+<img vspace="20" style="width:50%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/checkpoint_format.png" />
+<p style="font-size: 0.6em;text-align:center;margin-top:-1em;margin-bottom: 4em">
+  Figure 2. An illustration of the extended checkpoint format.
+</p>
+</center>
+
+To support creating such a checkpoint for jobs with finished tasks, we extended the checkpoint procedure.
+Previously the checkpoint coordinator inside the JobManager first notifies all the sources to report snapshots,
+then all the sources further notify their descendants via broadcasting barrier events. Since now the sources might
+already finish, the checkpoint coordinator would instead treat the running tasks who do not have running precedent

Review Comment:
   already finish -> have already finished



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-web] zhuzhurk commented on a diff in pull request #545: Add blogs for FLIP-147 support checkpoints after tasks finished

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #545:
URL: https://github.com/apache/flink-web/pull/545#discussion_r898738184


##########
_posts/2022-06-01-final-checkpoint-part1.md:
##########
@@ -9,44 +9,44 @@ authors:
   name: "Dawid Wysakowicz"
 - Daisy Tsang:
   name: "Daisy Tsang"
-excerpt: This post briefly describes the motivation and changes made by the final checkpoint mechanism, including the changes to the checkpoint procedure and how tasks finished.
+excerpt: This post briefly describes the motivation and changes made by the final checkpoint mechanism, including the changes to the checkpoint procedure and how tasks finish.
 
 ---
 
 # Motivation
 
 Flink is a distributed processing engine for both unbounded and bounded streams of data. In recent versions,
 Flink has unified the DataStream API and the Table / SQL API to support both streaming and batch cases.
-Since most users require both types of data processing pipelines, the unification helps reduce the complexity of developing
+Since most users require both types of data processing pipelines, the unification helps reduce the complexity of developing,
 operating, and maintaining consistency between streaming and batch backfilling jobs, like
 [the case for Alibaba](https://www.ververica.com/blog/apache-flinks-stream-batch-unification-powers-alibabas-11.11-in-2020). 
 
+Flink provides two execution modes under the unified programming API: the streaming mode and the batch mode.
+The streaming mode processes records incrementally based on the states, thus it supports both bounded and unbounded sources.
+The batch mode works with bounded sources and usually has a better performance for bounded jobs because it executes all the
+tasks in topological order and avoids random state access by pre-sorting the input records. Although batch mode is often the
+preferred mode to process bounded jobs, streaming mode is also required for various reasons. For example, users may want to deal
+with records containing retraction or exploit the property that data is roughly sorted by event times in streaming mode
+(like the case in [Kappa+ Architecture](https://www.youtube.com/watch?t=666&v=4qSlsYogALo&feature=youtu.be)). Moreover,
+users often have mixed jobs involving both unbounded streams and bounded side-inputs, which also require streaming execution mode.
+
 <center>
 <img vspace="20" style="width:70%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/stream_batch_cmp.png" />
 <p style="font-size: 0.6em;text-align:left;margin-top:-1em;margin-bottom: 4em">
-  Figure 1. A comparison of the Stream mode and Batch mode for the example count operator. For streaming mode, the arrived
+  Figure 1. A comparison of the Streaming mode and Batch mode for the example Count operator. For streaming mode, the arrived
   elements are not sorted, the operator would read / write the state corresponding to the element for computation.
   For batch mode, the arrived elements are first sorted as a whole and then processed.
 </p>
 </center>
 
-Flink provides two execution modes under the unified programming API: the streaming mode and the batch mode.
-The streaming mode processes records incrementally based on the states, thus it supports both bounded and unbounded sources.
-The batch mode works with bounded sources and usually has a better performance for bounded jobs because it executes all the
-tasks in topological order and avoids random state access by pre-sorting the input records. Although batch mode is often the
-preferred mode to process bounded jobs, streaming mode is also required for various reasons. For example, users may want to deal
-with records containing retraction or exploit the property that data is roughly sorted by event times in stream mode
-(like the case in [Kappa+ Architecture](https://www.youtube.com/watch?t=666&v=4qSlsYogALo&feature=youtu.be)). Moreover,
-users often have mixed jobs involving both unbounded streams and bounded side-inputs, which also require stream execution mode. 
-
 In streaming mode, [checkpointing](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/checkpointing/)
-is the vital component in supporting exactly-once guarantees. By periodically snapshotting the
+is the vital mechanism in supporting exactly-once guarantees. By periodically snapshotting the
 aligned states of operators, Flink can recover from the latest checkpoint and continue execution when failover happens. However,
-previously Flink could not take checkpoints if any tasks finished. This would cause problems for jobs with both bounded and unbounded
+previously Flink could not take checkpoints if any tasks gets finished. This would cause problems for jobs with both bounded and unbounded

Review Comment:
   tasks -> task



##########
_posts/2022-06-01-final-checkpoint-part1.md:
##########
@@ -81,59 +81,61 @@ running subtasks. The states could be repartitioned on restarting and all the ne
 To support creating such a checkpoint for jobs with finished tasks, we extended the checkpoint procedure.
 Previously the checkpoint coordinator inside the JobManager first notifies all the sources to report snapshots,
 then all the sources further notify their descendants via broadcasting barrier events. Since now the sources might
-already finish, the checkpoint coordinator would instead treat the running tasks who do not have running precedent
-tasks as “new sources”, and notifies these tasks to initiate the checkpoints. The checkpoint could then deduce
-which operator is fully finished based on the task states when triggering checkpoint and the received snapshots. 
+have already finished, the checkpoint coordinator would instead treat the running tasks who also do not have running
+precedent tasks as "new sources", and it notifies these tasks to initiate the checkpoints. Finally, if the subtasks of
+an operator are either finished on triggering checkpoint or have finished processing all the data on snapshotting states,
+the operator would be marked as fully finished.
 
 The changes of the checkpoint procedure are transparent to users except that for checkpoints indeed containing
-finished tasks, we disallowed adding new operators before the fully finished ones, since it would make the fully
+finished tasks, we disallowed adding new operators as precedents of the fully finished ones, since it would make the fully
 finished operators have running precedents after restarting, which conflicts with the design that tasks finished
 in topological order.
 
 # Revise the Process of Finishing
 
 Based on the ability to take checkpoints with finished tasks, we could then solve the issue that two-phase-commit
-operators could not commit all the data when running in streaming mode. As the background, currently Flink jobs
+operators could not commit all the data when running in streaming mode. As the background, Flink jobs
 have two ways to finish:
 
-1.	All sources are bound and they processed all the input records. The job will start to finish after all the sources are finished.
-2.	Users execute `stop-with-savepoint [--drain]`. The job will take a savepoint and then finish. If the `–-drain` parameter is not set,
-the savepoint might be used to start new jobs and the operators will not flush all the event times or call methods marking all
-records processed (like `endInput()`).
+1.	All sources are bound and they processed all the input records. The job will finish after all the sources are finished.

Review Comment:
   The statement is not accurate that the job finishes when all sources have finished. The previous statement `start to finish` seems better. But maybe it's better to say that "The job will finish after all the input records are processed and all the result records are committed to external sinks."



##########
_posts/2022-06-01-final-checkpoint-part1.md:
##########
@@ -81,59 +81,61 @@ running subtasks. The states could be repartitioned on restarting and all the ne
 To support creating such a checkpoint for jobs with finished tasks, we extended the checkpoint procedure.
 Previously the checkpoint coordinator inside the JobManager first notifies all the sources to report snapshots,
 then all the sources further notify their descendants via broadcasting barrier events. Since now the sources might
-already finish, the checkpoint coordinator would instead treat the running tasks who do not have running precedent
-tasks as “new sources”, and notifies these tasks to initiate the checkpoints. The checkpoint could then deduce
-which operator is fully finished based on the task states when triggering checkpoint and the received snapshots. 
+have already finished, the checkpoint coordinator would instead treat the running tasks who also do not have running
+precedent tasks as "new sources", and it notifies these tasks to initiate the checkpoints. Finally, if the subtasks of
+an operator are either finished on triggering checkpoint or have finished processing all the data on snapshotting states,
+the operator would be marked as fully finished.
 
 The changes of the checkpoint procedure are transparent to users except that for checkpoints indeed containing
-finished tasks, we disallowed adding new operators before the fully finished ones, since it would make the fully
+finished tasks, we disallowed adding new operators as precedents of the fully finished ones, since it would make the fully
 finished operators have running precedents after restarting, which conflicts with the design that tasks finished
 in topological order.
 
 # Revise the Process of Finishing
 
 Based on the ability to take checkpoints with finished tasks, we could then solve the issue that two-phase-commit
-operators could not commit all the data when running in streaming mode. As the background, currently Flink jobs
+operators could not commit all the data when running in streaming mode. As the background, Flink jobs
 have two ways to finish:
 
-1.	All sources are bound and they processed all the input records. The job will start to finish after all the sources are finished.
-2.	Users execute `stop-with-savepoint [--drain]`. The job will take a savepoint and then finish. If the `–-drain` parameter is not set,
-the savepoint might be used to start new jobs and the operators will not flush all the event times or call methods marking all
-records processed (like `endInput()`).
+1.	All sources are bound and they processed all the input records. The job will finish after all the sources are finished.
+2.	Users execute `stop-with-savepoint [--drain]`. The job will take a savepoint and then finish. With `–-drain`, the job
+will be stopped permanently and is also required to commit all the data. However, without `--drain` the job might
+be resumed from the savepoint later, thus not all data are required to be committed, as long as the state of the data could be
+recovered from the savepoint.
 
-Ideally we should ensure exactly-once semantics in both cases.
+Let's first have a look at the case of bounded sources. To achieve end-to-end exactly-once,
+two-phase-commit operators only commit data after a checkpoint following this piece of data succeed.
+However, previously there is no such an opportunity for the data between the last periodic checkpoint and job finish,
+and the data finally gets lost. Note that it is also not correct if we directly commit the data on job finished, since
+if there are failovers after that (like due to other unfinished tasks get failed), the data will be replayed and cause replication.
 
-To achieve exactly-once, currently two-phase-commit operators only commit data when a checkpoint after all these
-data succeed. However, for the bounded source case, currently there is no suitable time point to commit the data
-between the last periodic checkpoint and the task get finished: if we commit them directly on task get finished,
-then if there are failovers after that (like due to other unfinished tasks get failed), these records would be
-re-emitted and cause data duplication; if we discard these records as now, these records would be lost and cause data loss. 
+The case of `stop-with-savepoint --drain` also has problems. The previous implementation first stalls the execution and
+take a savepoint. After the savepoint succeeds, all the source tasks would stop actively. Although the savepoint seems to
+provide the opportunity to commit all the data, some processing logic is in fact executed during the job getting stopped,
+and the records produced would be deserted by mistake. For example, calling `endInput()` method for operators happens during

Review Comment:
   deserted -> do you mean discarded?



##########
_posts/2022-06-01-final-checkpoint-part1.md:
##########
@@ -81,59 +81,61 @@ running subtasks. The states could be repartitioned on restarting and all the ne
 To support creating such a checkpoint for jobs with finished tasks, we extended the checkpoint procedure.
 Previously the checkpoint coordinator inside the JobManager first notifies all the sources to report snapshots,
 then all the sources further notify their descendants via broadcasting barrier events. Since now the sources might
-already finish, the checkpoint coordinator would instead treat the running tasks who do not have running precedent
-tasks as “new sources”, and notifies these tasks to initiate the checkpoints. The checkpoint could then deduce
-which operator is fully finished based on the task states when triggering checkpoint and the received snapshots. 
+have already finished, the checkpoint coordinator would instead treat the running tasks who also do not have running
+precedent tasks as "new sources", and it notifies these tasks to initiate the checkpoints. Finally, if the subtasks of
+an operator are either finished on triggering checkpoint or have finished processing all the data on snapshotting states,
+the operator would be marked as fully finished.
 
 The changes of the checkpoint procedure are transparent to users except that for checkpoints indeed containing
-finished tasks, we disallowed adding new operators before the fully finished ones, since it would make the fully
+finished tasks, we disallowed adding new operators as precedents of the fully finished ones, since it would make the fully
 finished operators have running precedents after restarting, which conflicts with the design that tasks finished
 in topological order.
 
 # Revise the Process of Finishing
 
 Based on the ability to take checkpoints with finished tasks, we could then solve the issue that two-phase-commit
-operators could not commit all the data when running in streaming mode. As the background, currently Flink jobs
+operators could not commit all the data when running in streaming mode. As the background, Flink jobs
 have two ways to finish:
 
-1.	All sources are bound and they processed all the input records. The job will start to finish after all the sources are finished.
-2.	Users execute `stop-with-savepoint [--drain]`. The job will take a savepoint and then finish. If the `–-drain` parameter is not set,
-the savepoint might be used to start new jobs and the operators will not flush all the event times or call methods marking all
-records processed (like `endInput()`).
+1.	All sources are bound and they processed all the input records. The job will finish after all the sources are finished.
+2.	Users execute `stop-with-savepoint [--drain]`. The job will take a savepoint and then finish. With `–-drain`, the job
+will be stopped permanently and is also required to commit all the data. However, without `--drain` the job might
+be resumed from the savepoint later, thus not all data are required to be committed, as long as the state of the data could be
+recovered from the savepoint.
 
-Ideally we should ensure exactly-once semantics in both cases.
+Let's first have a look at the case of bounded sources. To achieve end-to-end exactly-once,
+two-phase-commit operators only commit data after a checkpoint following this piece of data succeed.
+However, previously there is no such an opportunity for the data between the last periodic checkpoint and job finish,
+and the data finally gets lost. Note that it is also not correct if we directly commit the data on job finished, since
+if there are failovers after that (like due to other unfinished tasks get failed), the data will be replayed and cause replication.

Review Comment:
   replication -> duplication



##########
_posts/2022-06-01-final-checkpoint-part1.md:
##########
@@ -142,26 +144,26 @@ which might prolong the total execution time for a long time.
 </p>
 </center>
 
-For the case of `stop-with-savepoint [--drain]`, the intuitive idea is also flawed since different tasks have to
+For the case of `stop-with-savepoint [--drain]`, the intuitive idea does not work since different tasks have to
 wait for different checkpoints / savepoints, thus we could not finish the job with a specific savepoint.
 
-To further overcome these issues, we’d better decouple "finishing operator logic" and "finishing tasks": if we could first
-let all the operators finish their execution logic as a whole, including calling lifecycle methods `endInput()`, then each operator
-could wait for the next checkpoint concurrently. Besides, for stop-with-savepoint we could also simply make all the operators to 
-wait for one specific savepoint taken after operators finishing their execution logic. Therefore, in this way the two types of
-finish process could be unified.
+Therefore, we do not take the intuitive option. Instead, we decoupled *"finishing operator logic"* and *"finishing tasks"*:
+all the operators would first finish their execution logic as a whole, including calling lifecycle methods like `endInput()`,
+then each operator could wait for the next checkpoint concurrently. Besides, for stop-with-savepoint we also reverted the current
+implementation similarly: all the tasks will first finish executing the operators' logic, then they simply wait for the next savepoint
+to happen before finish. Therefore, in this way the finish processes are unified and the data could be fully committed for all the cases.
 
 Based on this thought, as shown in the right part of Figure 3, to decoupled the process of "finishing operator logic"
-and "finishing tasks", we introduced a new `EndOfDataEvent`. For each task, after executing all the operator logic it would first notify
-the descendants with an `EndOfDataEvent` so that the descendants also have chances to finish executing the operator logic. Then all
+and "finishing tasks", we introduced a new `EndOfData` event. For each task, after executing all the operator logic it would first notify
+the descendants with an `EndOfData` event so that the descendants also have chances to finish executing the operator logic. Then all
 the tasks could wait for the next checkpoint or the specified savepoint concurrently to commit all the remaining data before getting finished.
 
 At last, it is also worthy to mention we have clarified and renamed the `close()` and `dispose()` methods in the operators’ lifecycle.
 The two methods are in fact different since `close()` is only called when the task finishes normally and dispose() is called in both
 cases of normal finishing and failover. However, this was not clear from their names. Therefore, we rename the two methods to `finish()` and `close()`:
 
 - `finish()` marks the termination of the operator and no more records are allowed after `finish()` is called. It should
-- only be called when sources are finished or when the `-–drain` parameter is specified. 
+  only be called when sources are finished or when the `-–drain` parameter is specified.
 - `close()` is used to do cleanup and release all the holding resources.

Review Comment:
   holding -> held?



##########
_posts/2022-06-01-final-checkpoint-part1.md:
##########
@@ -81,59 +81,61 @@ running subtasks. The states could be repartitioned on restarting and all the ne
 To support creating such a checkpoint for jobs with finished tasks, we extended the checkpoint procedure.
 Previously the checkpoint coordinator inside the JobManager first notifies all the sources to report snapshots,
 then all the sources further notify their descendants via broadcasting barrier events. Since now the sources might
-already finish, the checkpoint coordinator would instead treat the running tasks who do not have running precedent
-tasks as “new sources”, and notifies these tasks to initiate the checkpoints. The checkpoint could then deduce
-which operator is fully finished based on the task states when triggering checkpoint and the received snapshots. 
+have already finished, the checkpoint coordinator would instead treat the running tasks who also do not have running
+precedent tasks as "new sources", and it notifies these tasks to initiate the checkpoints. Finally, if the subtasks of
+an operator are either finished on triggering checkpoint or have finished processing all the data on snapshotting states,
+the operator would be marked as fully finished.
 
 The changes of the checkpoint procedure are transparent to users except that for checkpoints indeed containing
-finished tasks, we disallowed adding new operators before the fully finished ones, since it would make the fully
+finished tasks, we disallowed adding new operators as precedents of the fully finished ones, since it would make the fully
 finished operators have running precedents after restarting, which conflicts with the design that tasks finished
 in topological order.
 
 # Revise the Process of Finishing
 
 Based on the ability to take checkpoints with finished tasks, we could then solve the issue that two-phase-commit
-operators could not commit all the data when running in streaming mode. As the background, currently Flink jobs
+operators could not commit all the data when running in streaming mode. As the background, Flink jobs
 have two ways to finish:
 
-1.	All sources are bound and they processed all the input records. The job will start to finish after all the sources are finished.
-2.	Users execute `stop-with-savepoint [--drain]`. The job will take a savepoint and then finish. If the `–-drain` parameter is not set,
-the savepoint might be used to start new jobs and the operators will not flush all the event times or call methods marking all
-records processed (like `endInput()`).
+1.	All sources are bound and they processed all the input records. The job will finish after all the sources are finished.
+2.	Users execute `stop-with-savepoint [--drain]`. The job will take a savepoint and then finish. With `–-drain`, the job
+will be stopped permanently and is also required to commit all the data. However, without `--drain` the job might
+be resumed from the savepoint later, thus not all data are required to be committed, as long as the state of the data could be
+recovered from the savepoint.
 
-Ideally we should ensure exactly-once semantics in both cases.
+Let's first have a look at the case of bounded sources. To achieve end-to-end exactly-once,
+two-phase-commit operators only commit data after a checkpoint following this piece of data succeed.
+However, previously there is no such an opportunity for the data between the last periodic checkpoint and job finish,

Review Comment:
   finish -> finishment or finishing?



##########
_posts/2022-06-01-final-checkpoint-part1.md:
##########
@@ -142,26 +144,26 @@ which might prolong the total execution time for a long time.
 </p>
 </center>
 
-For the case of `stop-with-savepoint [--drain]`, the intuitive idea is also flawed since different tasks have to
+For the case of `stop-with-savepoint [--drain]`, the intuitive idea does not work since different tasks have to
 wait for different checkpoints / savepoints, thus we could not finish the job with a specific savepoint.
 
-To further overcome these issues, we’d better decouple "finishing operator logic" and "finishing tasks": if we could first
-let all the operators finish their execution logic as a whole, including calling lifecycle methods `endInput()`, then each operator
-could wait for the next checkpoint concurrently. Besides, for stop-with-savepoint we could also simply make all the operators to 
-wait for one specific savepoint taken after operators finishing their execution logic. Therefore, in this way the two types of
-finish process could be unified.
+Therefore, we do not take the intuitive option. Instead, we decoupled *"finishing operator logic"* and *"finishing tasks"*:
+all the operators would first finish their execution logic as a whole, including calling lifecycle methods like `endInput()`,
+then each operator could wait for the next checkpoint concurrently. Besides, for stop-with-savepoint we also reverted the current
+implementation similarly: all the tasks will first finish executing the operators' logic, then they simply wait for the next savepoint
+to happen before finish. Therefore, in this way the finish processes are unified and the data could be fully committed for all the cases.

Review Comment:
   finish -> finishing



##########
_posts/2022-06-01-final-checkpoint-part1.md:
##########
@@ -81,59 +81,61 @@ running subtasks. The states could be repartitioned on restarting and all the ne
 To support creating such a checkpoint for jobs with finished tasks, we extended the checkpoint procedure.
 Previously the checkpoint coordinator inside the JobManager first notifies all the sources to report snapshots,
 then all the sources further notify their descendants via broadcasting barrier events. Since now the sources might
-already finish, the checkpoint coordinator would instead treat the running tasks who do not have running precedent
-tasks as “new sources”, and notifies these tasks to initiate the checkpoints. The checkpoint could then deduce
-which operator is fully finished based on the task states when triggering checkpoint and the received snapshots. 
+have already finished, the checkpoint coordinator would instead treat the running tasks who also do not have running
+precedent tasks as "new sources", and it notifies these tasks to initiate the checkpoints. Finally, if the subtasks of
+an operator are either finished on triggering checkpoint or have finished processing all the data on snapshotting states,
+the operator would be marked as fully finished.
 
 The changes of the checkpoint procedure are transparent to users except that for checkpoints indeed containing
-finished tasks, we disallowed adding new operators before the fully finished ones, since it would make the fully
+finished tasks, we disallowed adding new operators as precedents of the fully finished ones, since it would make the fully
 finished operators have running precedents after restarting, which conflicts with the design that tasks finished
 in topological order.
 
 # Revise the Process of Finishing
 
 Based on the ability to take checkpoints with finished tasks, we could then solve the issue that two-phase-commit
-operators could not commit all the data when running in streaming mode. As the background, currently Flink jobs
+operators could not commit all the data when running in streaming mode. As the background, Flink jobs
 have two ways to finish:
 
-1.	All sources are bound and they processed all the input records. The job will start to finish after all the sources are finished.
-2.	Users execute `stop-with-savepoint [--drain]`. The job will take a savepoint and then finish. If the `–-drain` parameter is not set,
-the savepoint might be used to start new jobs and the operators will not flush all the event times or call methods marking all
-records processed (like `endInput()`).
+1.	All sources are bound and they processed all the input records. The job will finish after all the sources are finished.
+2.	Users execute `stop-with-savepoint [--drain]`. The job will take a savepoint and then finish. With `–-drain`, the job
+will be stopped permanently and is also required to commit all the data. However, without `--drain` the job might
+be resumed from the savepoint later, thus not all data are required to be committed, as long as the state of the data could be
+recovered from the savepoint.
 
-Ideally we should ensure exactly-once semantics in both cases.
+Let's first have a look at the case of bounded sources. To achieve end-to-end exactly-once,
+two-phase-commit operators only commit data after a checkpoint following this piece of data succeed.
+However, previously there is no such an opportunity for the data between the last periodic checkpoint and job finish,
+and the data finally gets lost. Note that it is also not correct if we directly commit the data on job finished, since
+if there are failovers after that (like due to other unfinished tasks get failed), the data will be replayed and cause replication.
 
-To achieve exactly-once, currently two-phase-commit operators only commit data when a checkpoint after all these
-data succeed. However, for the bounded source case, currently there is no suitable time point to commit the data
-between the last periodic checkpoint and the task get finished: if we commit them directly on task get finished,
-then if there are failovers after that (like due to other unfinished tasks get failed), these records would be
-re-emitted and cause data duplication; if we discard these records as now, these records would be lost and cause data loss. 
+The case of `stop-with-savepoint --drain` also has problems. The previous implementation first stalls the execution and
+take a savepoint. After the savepoint succeeds, all the source tasks would stop actively. Although the savepoint seems to

Review Comment:
   take -> takes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org