You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2022/07/11 12:24:33 UTC

[flink-web] 01/02: Add blogs for FLIP-147 support checkpoints after tasks finished

This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git

commit 313d98723676c865c3057145319854af3864a538
Author: Yun Gao <ga...@gmail.com>
AuthorDate: Mon Jun 6 16:01:48 2022 +0800

    Add blogs for FLIP-147 support checkpoints after tasks finished
---
 _posts/2022-07-11-final-checkpoint-part1.md        | 176 +++++++++++++++
 _posts/2022-07-11-final-checkpoint-part2.md        | 244 +++++++++++++++++++++
 .../checkpoint_format.png                          | Bin 0 -> 96886 bytes
 .../checkpoint_trigger.png                         | Bin 0 -> 146793 bytes
 .../2022-07-11-final-checkpoint/example_job.png    | Bin 0 -> 26922 bytes
 .../example_job_finish.png                         | Bin 0 -> 42969 bytes
 .../2022-07-11-final-checkpoint/finish_cmp.png     | Bin 0 -> 26074 bytes
 .../stream_batch_cmp.png                           | Bin 0 -> 44100 bytes
 8 files changed, 420 insertions(+)

diff --git a/_posts/2022-07-11-final-checkpoint-part1.md b/_posts/2022-07-11-final-checkpoint-part1.md
new file mode 100644
index 000000000..5eff0740e
--- /dev/null
+++ b/_posts/2022-07-11-final-checkpoint-part1.md
@@ -0,0 +1,176 @@
+---
+layout: post 
+title:  "FLIP-147: Support Checkpoints After Tasks Finished - Part One"
+date: 2022-07-11T00:00:00.000Z 
+authors:
+- Yun Gao:
+  name: "Yun Gao"
+- Dawid Wysakowicz:
+  name: "Dawid Wysakowicz"
+- Daisy Tsang:
+  name: "Daisy Tsang"
+excerpt: This post briefly describes the motivation and changes made by the final checkpoint mechanism, including the changes to the checkpoint procedure and how tasks finish.
+
+---
+
+# Motivation
+
+Flink is a distributed processing engine for both unbounded and bounded streams of data. In recent versions,
+Flink has unified the DataStream API and the Table / SQL API to support both streaming and batch cases.
+Since most users require both types of data processing pipelines, the unification helps reduce the complexity of developing,
+operating, and maintaining consistency between streaming and batch backfilling jobs, like
+[the case for Alibaba](https://www.ververica.com/blog/apache-flinks-stream-batch-unification-powers-alibabas-11.11-in-2020). 
+
+Flink provides two execution modes under the unified programming API: the streaming mode and the batch mode.
+The streaming mode processes records incrementally based on the states, thus it supports both bounded and unbounded sources.
+The batch mode works with bounded sources and usually has a better performance for bounded jobs because it executes all the
+tasks in topological order and avoids random state access by pre-sorting the input records. Although batch mode is often the
+preferred mode to process bounded jobs, streaming mode is also required for various reasons. For example, users may want to deal
+with records containing retraction or exploit the property that data is roughly sorted by event times in streaming mode
+(like the case in [Kappa+ Architecture](https://www.youtube.com/watch?t=666&v=4qSlsYogALo&feature=youtu.be)). Moreover,
+users often have mixed jobs involving both unbounded streams and bounded side-inputs, which also require streaming execution mode.
+
+<center>
+<img vspace="20" style="width:70%" src="{{site.baseurl}}/img/blog/2022-07-11-final-checkpoint/stream_batch_cmp.png" />
+<p style="font-size: 0.6em;text-align:left;margin-top:-1em;margin-bottom: 4em">
+  Figure 1. A comparison of the Streaming mode and Batch mode for the example Count operator. For streaming mode, the arrived
+  elements are not sorted, the operator would read / write the state corresponding to the element for computation.
+  For batch mode, the arrived elements are first sorted as a whole and then processed.
+</p>
+</center>
+
+In streaming mode, [checkpointing](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/checkpointing/)
+is the vital mechanism in supporting exactly-once guarantees. By periodically snapshotting the
+aligned states of operators, Flink can recover from the latest checkpoint and continue execution when failover happens. However,
+previously Flink could not take checkpoints if any task gets finished. This would cause problems for jobs with both bounded and unbounded
+sources: if there are no checkpoints after the bounded part finished, the unbounded part might need to reprocess a large amount of
+records in case of a failure. 
+
+Furthermore, being unable to take checkpoints with finished tasks is a problem for jobs using two-phase-commit sinks to achieve
+[end-to-end exactly-once processing](https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html).
+The two-phase-commit sinks first write data to temporary files or external transactions,
+and commit the data only after a checkpoint completes to ensure the data would not be replayed on failure. However, if a job
+contains bounded sources, committing the results would not be possible after the bounded sources finish. Also because of that,
+for bounded jobs we have no way to commit the last piece of data after the first source task finished, and previously the bounded
+jobs just ignore the uncommitted data when finishing. These behaviors caused a lot of confusion and are always asked in the user
+mailing list. 
+
+Therefore, to complete the support of streaming mode for jobs using bounded sources, it is important for us to 
+
+1. Support taking checkpoints with finished tasks.
+2. Furthermore, revise the process of finishing so that all the data could always be committed.
+
+The remaining blog briefly describes the changes we made to achieve the above goals. In the next blog,
+we’ll share more details on how they are implemented.
+
+# Support Checkpointing with Finished Tasks
+
+The core idea of supporting checkpoints with finished tasks is to mark the finished operators in checkpoints and skip
+executing these operators after recovery. As illustrated in Figure 2, a checkpoint is composed of the states of all
+the operators. If all the subtasks of an operator have finished, we could mark it as fully finished and skip the
+execution of this operator on startup. For other operators, their states are composed of the states of all the
+running subtasks. The states will be repartitioned on restarting and all the new subtasks restarted with the assigned states.
+
+<center>
+<img vspace="20" style="width:50%" src="{{site.baseurl}}/img/blog/2022-07-11-final-checkpoint/checkpoint_format.png" />
+<p style="font-size: 0.6em;text-align:center;margin-top:-1em;margin-bottom: 4em">
+  Figure 2. An illustration of the extended checkpoint format.
+</p>
+</center>
+
+To support creating such a checkpoint for jobs with finished tasks, we extended the checkpoint procedure.
+Previously the checkpoint coordinator inside the JobManager first notifies all the sources to report snapshots,
+then all the sources further notify their descendants via broadcasting barrier events. Since now the sources might
+have already finished, the checkpoint coordinator would instead treat the running tasks who also do not have running
+precedent tasks as "new sources", and it notifies these tasks to initiate the checkpoints. Finally, if the subtasks of
+an operator are either finished on triggering checkpoint or have finished processing all the data on snapshotting states,
+the operator would be marked as fully finished.
+
+The changes of the checkpoint procedure are transparent to users except that for checkpoints indeed containing
+finished tasks, we disallowed adding new operators as precedents of the fully finished ones, since it would make the fully
+finished operators have running precedents after restarting, which conflicts with the design that tasks finished
+in topological order.
+
+# Revise the Process of Finishing
+
+Based on the ability to take checkpoints with finished tasks, we could then solve the issue that two-phase-commit
+operators could not commit all the data when running in streaming mode. As the background, Flink jobs
+have two ways to finish:
+
+1.	All sources are bound and they processed all the input records. The job will finish after all the
+input records are processed and all the result are committed to external systems.
+2.	Users execute `stop-with-savepoint [--drain]`. The job will take a savepoint and then finish. With `–-drain`, the job
+will be stopped permanently and is also required to commit all the data. However, without `--drain` the job might
+be resumed from the savepoint later, thus not all data are required to be committed, as long as the state of the data could be
+recovered from the savepoint.
+
+Let's first have a look at the case of bounded sources. To achieve end-to-end exactly-once,
+two-phase-commit operators only commit data after a checkpoint following this piece of data succeeded.
+However, previously there is no such an opportunity for the data between the last periodic checkpoint and job getting finished,
+and the data finally gets lost. Note that it is also not correct if we directly commit the data on job finished, since
+if there are failovers after that (like due to other unfinished tasks getting failed), the data will be replayed and cause duplication.
+
+The case of `stop-with-savepoint --drain` also has problems. The previous implementation first stalls the execution and
+takes a savepoint. After the savepoint succeeds, all the source tasks would stop actively. Although the savepoint seems to
+provide the opportunity to commit all the data, some processing logic is in fact executed during the job getting stopped,
+and the records produced would be discarded by mistake. For example, calling `endInput()` method for operators happens during
+the stopping phase, some operators like the async operator might still emit new records in this method.
+
+At last, although `stop-with-savepoint` without draining is not required to commit all the data, we hope the job finish process could
+be unified for all the cases to keep the code clean.
+
+To fix the remaining issues, we need to modify the process of finishing to ensure all the data getting committed for the required cases.
+An intuitive idea is to directly insert a step to the tasks’ lifecycle to wait for the next checkpoint, as shown in the left part
+of Figure 3. However, it could not solve all the issues.
+
+<center>
+<img vspace="20" style="width:90%" src="{{site.baseurl}}/img/blog/2022-07-11-final-checkpoint/finish_cmp.png" />
+<p style="font-size: 0.6em;text-align:left;margin-top:-1em;margin-bottom: 4em">
+  Figure 3. A comparison of the two options to ensure tasks committed all the data before getting finished. The first
+  option directly inserts a step in the tasks’ lifecycle to wait for the next checkpoint, which disallows the tasks to wait
+  for the same checkpoint / savepoint. The second option decouples the notification of finishing operator logic and finishing tasks,
+  thus it allows all the tasks to first process all records, then they have the chance to wait for the same checkpoint / savepoint.
+</p>
+</center>
+
+For the case of bounded sources, the intuitive idea works, but it might have performance issues in some cases:
+as exemplified in Figure 4, If there are multiple cascading tasks containing two-phase commit sinks, each task would
+wait for the next checkpoint separately, thus the job needs to wait for three more checkpoints during finishing,
+which might prolong the total execution time for a long time.
+
+<center>
+<img vspace="20" style="width:90%" src="{{site.baseurl}}/img/blog/2022-07-11-final-checkpoint/example_job.png" />
+<p style="font-size: 0.6em;text-align:center;margin-top:-1em;margin-bottom: 4em">
+    Figure 4. An example job that contains a chain of tasks containing two-phase-commit operators. 
+</p>
+</center>
+
+For the case of `stop-with-savepoint [--drain]`, the intuitive idea does not work since different tasks have to
+wait for different checkpoints / savepoints, thus we could not finish the job with a specific savepoint.
+
+Therefore, we do not take the intuitive option. Instead, we decoupled *"finishing operator logic"* and *"finishing tasks"*:
+all the tasks would first finish their execution logic as a whole, including calling lifecycle methods like `endInput()`,
+then each task could wait for the next checkpoint concurrently. Besides, for stop-with-savepoint we also reverted the current
+implementation similarly: all the tasks will first finish executing the operators' logic, then they simply wait for the next savepoint
+to happen before finish. Therefore, in this way the finishing processes are unified and the data could be fully committed for all the cases.
+
+Based on this thought, as shown in the right part of Figure 3, to decoupled the process of "finishing operator logic"
+and "finishing tasks", we introduced a new `EndOfData` event. For each task, after executing all the operator logic it would first notify
+the descendants with an `EndOfData` event so that the descendants also have chances to finish executing the operator logic. Then all
+the tasks could wait for the next checkpoint or the specified savepoint concurrently to commit all the remaining data before getting finished.
+
+At last, it is also worthy to mention we have clarified and renamed the `close()` and `dispose()` methods in the operators’ lifecycle.
+The two methods are in fact different since `close()` is only called when the task finishes normally and dispose() is called in both
+cases of normal finishing and failover. However, this was not clear from their names. Therefore, we rename the two methods to `finish()` and `close()`:
+
+- `finish()` marks the termination of the operator and no more records are allowed after `finish()` is called. It should
+  only be called when sources are finished or when the `-–drain` parameter is specified.
+- `close()` is used to do cleanup and release all the held resources.
+
+# Conclusion
+
+By supporting the checkpoints after tasks finished and revising the process of finishing, we can support checkpoints for jobs with
+both bounded and unbounded sources, and ensure the bounded job gets all output records committed before it finishes. The motivation
+behind this change is to ensure data consistency, results completeness, and failure recovery if there are bounded sources in the pipeline.
+The final checkpoint mechanism was first implemented in Flink 1.14 and enabled by default in Flink 1.15. If you have any questions,
+please feel free to start a discussion or report an issue in the dev or user mailing list.
diff --git a/_posts/2022-07-11-final-checkpoint-part2.md b/_posts/2022-07-11-final-checkpoint-part2.md
new file mode 100644
index 000000000..c6eda82e5
--- /dev/null
+++ b/_posts/2022-07-11-final-checkpoint-part2.md
@@ -0,0 +1,244 @@
+---
+layout: post 
+title:  "FLIP-147: Support Checkpoints After Tasks Finished - Part Two"
+date: 2022-07-11T00:00:00.000Z 
+authors:
+- Yun Gao:
+  name: "Yun Gao"
+- Dawid Wysakowicz:
+  name: "Dawid Wysakowicz"
+- Daisy Tsang:
+  name: "Daisy Tsang"
+excerpt: This post presents more details on the changes on the checkpoint procedure and task finish process made by the final checkpoint mechanism.
+
+---
+
+In the [first part]({{site.baseurl}}/2022/06/01/final-checkpoint-part1.html) of this blog,
+we have briefly introduced the work to support checkpoints after tasks get
+finished and revised the process of finishing. In this part we will present more details on the implementation,
+including how we support checkpoints with finished tasks and the revised protocol of the finish process.
+
+# Implementation of support Checkpointing with Finished Tasks
+
+As described in part one,
+to support checkpoints after some tasks are finished, the core idea is to mark
+the finished operators in checkpoints and skip executing these operators after recovery. To implement this idea,
+we enhanced the checkpointing procedure to generate the flag and use the flag on recovery. This section presents
+more details on the process of taking checkpoints with finished tasks and recovery from such checkpoints. 
+
+Previously, checkpointing only worked when all tasks were running. As shown in the Figure 1, in this case the
+checkpoint coordinator first notify all the source tasks, and then the source tasks further notify the
+downstream tasks to take snapshots via barrier events. Similarly, if there are finished tasks, we need to
+find the new "source" tasks to initiate the checkpoint, namely those tasks that are still running but have
+no running precedent tasks. CheckpointCoordinator does the computation atomically at the JobManager side
+based on the latest states recorded in the execution graph.
+
+There might be race conditions when triggering tasks: when the checkpoint coordinator
+decides to trigger one task and starts emitting the RPC, it is possible that the task is just finished and
+reporting the FINISHED status to JobManager. In this case, the RPC message would fail and the checkpoint would be aborted.
+
+<center>
+<img vspace="20" style="width:50%" src="{{site.baseurl}}/img/blog/2022-07-11-final-checkpoint/checkpoint_trigger.png" />
+<p style="font-size: 0.6em">
+  Figure 1. The tasks chosen as the new sources when taking checkpoint with finished tasks. The principle is to
+  choose the running tasks whose precedent tasks are all finished. 
+</p>
+</center>
+
+In order to keep track of the finish status of each operator, we need to extend the checkpoint format.
+A checkpoint consists of the states of all the stateful operators, and the state of one operator consists of the
+entries from all its parallel instances. Note that the concept of Task is not reflected in the checkpoint. Task
+is more of a physical execution container that drives the behavior of operators. It is not well-defined across
+multiple executions of the same job since job upgrades might modify the operators contained in one task. 
+Therefore, the finished status should also be attached to the operators.
+
+As shown in the Figure 2, operators could be classified into three types according to their finished status:
+
+1. Fully finished: If all the instances of an operator are finished, we could view the logic of the operators as
+fully executed and we should skip the execution of the operator after recovery. We need to store a special flag for this
+kind of operator. 
+2. Partially finished: If only some instances of an operator are finished, then we still need to continue executing the
+remaining logic of this operator. As a whole we could view the state of the operator as the set of entries collected from all the
+running instances, which represents the remaining workload for this operator.
+3. No finished instances: In this case, the state of the operator is the same as the one taken when no tasks are finished.
+
+<center>
+<img vspace="20" style="width:50%" src="{{site.baseurl}}/img/blog/2022-07-11-final-checkpoint/checkpoint_format.png" />
+<p style="font-size: 0.6em">
+  Figure 2. An illustration of the extended checkpoint format.
+</p>
+</center>
+
+If the job is later restored from a checkpoint taken with finished tasks, we would skip executing all the logic for fully
+finished operators, and execute normally for the operators with no finished instances.
+
+However, this would be a bit complex for the partially finished operators. The state of partially finished operators would be
+redistributed to all the instances, similar to rescaling when the parallelism is changed. Among all the types of states that
+Flink offers, the [keyed state](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#using-keyed-state)
+and [operator state](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#using-operator-state)
+with even-split redistribution would work normally, but the 
+[broadcast state](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#broadcast-state) and 
+[operator state with union redistribution](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#using-operator-state)
+would be affected for the following reasons: 
+
+1. The broadcast state always replicates the state of the first subtask to the other subtasks. If the first subtask is finished,
+an empty state would be distributed and the operator would run from scratch, which is not correct.
+2. The operator state with union distribution merges the states of all the subtasks and then sends the merged state to all the
+subtasks. Based on this behavior, some operators may choose one subtask to store a shared value and after restarting this value will
+be distributed to all the subtasks. However, if this chosen task is finished, the state would be lost. 
+
+These two issues would not occur when rescaling since there would be no finished tasks in that scenario. To address
+these issues, we chose one of the running subtasks instead to acquire the current state for the broadcast state. For the operator
+state with union redistribution, we have to collect the states of all the subtasks to maintain the semantics. Thus, currently we
+abort the checkpoint if parts of subtasks finished for operators using this kind of state. 
+
+In principle, you should be able to modify your job (which changes the dataflow graph) and restore from a previous checkpoint. That said,
+there are certain graph modifications that are not supported. These kinds of changes include adding a new operator as the precedent of a fully finished
+one. Flink would check for such modifications and throw exceptions while restoring.
+
+# The Revised Process of Finishing
+
+As described in the part one, based on the ability to take checkpoints with finished tasks, we revised the process of finishing
+so that we could always commit all the data for two-phase-commit sinks. We’ll show the detailed protocol of the finished process in this
+section.
+
+## How did Jobs in Flink Finish Before?
+
+A job might finish in two ways: all sources finish or users execute
+[`stop-with-savepoint [--drain]`](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/cli/#stopping-a-job-gracefully-creating-a-final-savepoint).
+Let’s first have a look at the detailed process of finishing before FLIP-147.
+
+### When sources finish
+
+If all the sources are bounded, The job will finish after all the input records are processed and all the result are
+committed to external systems. In this case, the sources would first
+emit a `MAX_WATERMARK` (`Long.MAX_VALUE`) and then start to terminate the task. On termination, a task would call `endOfInput()`,
+`close()` and `dispose()` for all the operators, then emit an `EndOfPartitionEvent` to the downstream tasks. The intermediate tasks
+would start terminating after receiving an `EndOfPartitionEvent` from all the input channels, and this process will continue
+until the last task is finished. 
+
+```
+1. Source operators emit MAX_WATERMARK
+2. On received MAX_WATERMARK for non-source operators
+    a. Trigger all the event-time timers
+    b. Emit MAX_WATERMARK
+3. Source tasks finished
+    a. endInput(inputId) for all the operators
+    b. close() for all the operators
+    c. dispose() for all the operators
+    d. Emit EndOfPartitionEvent
+    e. Task cleanup
+4. On received EndOfPartitionEvent for non-source tasks
+    a. endInput(int inputId) for all the operators
+    b. close() for all the operators
+    c. dispose() for all the operators
+    d. Emit EndOfPartitionEvent
+    e. Task cleanup
+```
+
+### When users execute stop-with-savepoint [--drain]
+
+Users could execute the command stop-with-savepoint [--drain] for both bounded and unbounded jobs to trigger jobs to finish.
+In this case, Flink first triggers a synchronous savepoint and all the tasks would stall after seeing the synchronous
+savepoint. If the savepoint succeeds, all the source operators would finish actively and the job would finish the same as the above scenario. 
+
+```
+1. Trigger a savepoint
+2. Sources received savepoint trigger RPC
+    a. If with –-drain
+        i. source operators emit MAX_WATERMARK
+    b. Source emits savepoint barrier
+3. On received MAX_WATERMARK for non-source operators
+    a. Trigger all the event times
+    b. Emit MAX_WATERMARK
+4. On received savepoint barrier for non-source operators
+    a. The task blocks till the savepoint succeed
+5. Finish the source tasks actively
+    a. If with –-drain
+        ii. endInput(inputId) for all the operators
+    b. close() for all the operators
+    c. dispose() for all the operators
+    d. Emit EndOfPartitionEvent
+    e. Task cleanup
+6. On received EndOfPartitionEvent for non-source tasks
+    a. If with –-drain
+        i. endInput(int inputId) for all the operators
+    b. close() for all the operators
+    c. dispose() for all the operators
+    d. Emit EndOfPartitionEvent
+    e. Task cleanup
+```
+
+A parameter `–-drain` is supported with `stop-with-savepoint`: if not specified, the job is expected to resume from this savepoint,
+otherwise the job is expected to terminate permanently. Thus we only emit `MAX_WATERMARK` to trigger all the event timers and call
+`endInput()` in the latter case.
+
+## Revise the Finishing Steps
+
+As described in part one, after revising the process of finishing, we have decoupled the process of "finishing operator logic"
+and "finishing task" by introducing a new `EndOfData` event. After the revision each task will first
+notify the descendants with an `EndOfData` event after executing all the logic
+so that the descendants also have chances to finish executing the operator logic, then
+all the tasks could wait for the next checkpoint or the specified savepoint concurrently to commit all the remaining data. 
+This section will present the detailed protocol of the revised process. Since we have renamed
+`close()` /`dispose()` to `finish()` / `close()`, we’ll stick to the new terminologies in the following description.
+
+The revised process of finishing is shown as follows:
+
+```
+1. Source tasks finished due to no more records or stop-with-savepoint. 
+    a. if no more records or stop-with-savepoint –-drain
+        i. source operators emit MAX_WATERMARK
+        ii. endInput(inputId) for all the operators
+        iii. finish() for all the operators
+        iv. emit EndOfData[isDrain = true] event
+    b. else if stop-with-savepoint
+        i. emit EndOfData[isDrain = false] event
+    c. Wait for the next checkpoint / the savepoint after operator finished complete
+    d. close() for all the operators
+    e. Emit EndOfPartitionEvent
+    f. Task cleanup
+2. On received MAX_WATERMARK for non-source operators
+    a. Trigger all the event times
+    b. Emit MAX_WATERMARK
+3. On received EndOfData for non-source tasks
+    a. If isDrain
+        i. endInput(int inputId) for all the operators
+        ii. finish() for all the operators
+    b. Emit EndOfData[isDrain = the flag value of the received event]
+4. On received EndOfPartitionEvent for non-source tasks
+    a. Wait for the next checkpoint / the savepoint after operator finished complete
+    b. close() for all the operators
+    c. Emit EndOfPartitionEvent
+    d. Task cleanup
+```
+
+<center>
+<img vspace="20" style="width:60%" src="{{site.baseurl}}/img/blog/2022-07-11-final-checkpoint/example_job_finish.png" />
+<p style="font-size: 0.6em">
+  Figure 3. An example job of the revised process of finishing.
+</p>
+</center>
+
+An example of the process of job finishing is shown in Figure 3. 
+
+Let's first have a look at the example that all the source tasks are bounded.
+If Task `C` finishes after processing all the records, it first emits the max-watermark, then finishes the operators and emits
+the `EndOfData` event. After that, it waits for the next checkpoint to complete and then emits the `EndOfPartitionEvent`. 
+
+Task `D` finishes all the operators right after receiving the `EndOfData` event. Since any checkpoints taken after operators finish
+can commit all the pending records and be the final checkpoint, Task `D`’s final checkpoint would be the same as Task `C`’s since
+the barrier must be emitted after the `EndOfData` event. 
+
+Task `E` is a bit different in that it has two inputs. Task `A` might continue to run for a while and, thus, Task `E` needs to wait
+until it receives an `EndOfData` event also from the other input before finishing operators and its final checkpoint might be different. 
+
+On the other hand, when using `stop-with-savepoint [--drain]`, the process is similar except that all the tasks need to wait for the exact
+savepoint before finishing instead of just any checkpoints. Moreover, since both Task `C` and Task `A` would finish at the same time,
+Task `E` would also be able to wait for this particular savepoint before finishing.
+
+# Conclusion
+
+In this part we have presented more details of how the checkpoints are taken with finished tasks and the revised process
+of finishing. We hope the details could provide more insights of the thoughts and implementations for this part of work. Still, if you
+have any questions, please feel free to start a discussion or report an issue in the dev or user mailing list.
\ No newline at end of file
diff --git a/img/blog/2022-07-11-final-checkpoint/checkpoint_format.png b/img/blog/2022-07-11-final-checkpoint/checkpoint_format.png
new file mode 100644
index 000000000..98cb22ca3
Binary files /dev/null and b/img/blog/2022-07-11-final-checkpoint/checkpoint_format.png differ
diff --git a/img/blog/2022-07-11-final-checkpoint/checkpoint_trigger.png b/img/blog/2022-07-11-final-checkpoint/checkpoint_trigger.png
new file mode 100644
index 000000000..4e2012524
Binary files /dev/null and b/img/blog/2022-07-11-final-checkpoint/checkpoint_trigger.png differ
diff --git a/img/blog/2022-07-11-final-checkpoint/example_job.png b/img/blog/2022-07-11-final-checkpoint/example_job.png
new file mode 100644
index 000000000..4996050b5
Binary files /dev/null and b/img/blog/2022-07-11-final-checkpoint/example_job.png differ
diff --git a/img/blog/2022-07-11-final-checkpoint/example_job_finish.png b/img/blog/2022-07-11-final-checkpoint/example_job_finish.png
new file mode 100644
index 000000000..730a918f9
Binary files /dev/null and b/img/blog/2022-07-11-final-checkpoint/example_job_finish.png differ
diff --git a/img/blog/2022-07-11-final-checkpoint/finish_cmp.png b/img/blog/2022-07-11-final-checkpoint/finish_cmp.png
new file mode 100644
index 000000000..2b634594e
Binary files /dev/null and b/img/blog/2022-07-11-final-checkpoint/finish_cmp.png differ
diff --git a/img/blog/2022-07-11-final-checkpoint/stream_batch_cmp.png b/img/blog/2022-07-11-final-checkpoint/stream_batch_cmp.png
new file mode 100644
index 000000000..6c50e6938
Binary files /dev/null and b/img/blog/2022-07-11-final-checkpoint/stream_batch_cmp.png differ