You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/09 08:25:00 UTC

[GitHub] [flink-web] zhuzhurk commented on a diff in pull request #545: Add blogs for FLIP-147 support checkpoints after tasks finished

zhuzhurk commented on code in PR #545:
URL: https://github.com/apache/flink-web/pull/545#discussion_r893117141


##########
_posts/2022-06-01-final-checkpoint-part1.md:
##########
@@ -0,0 +1,173 @@
+---
+layout: post 
+title:  "FLIP-147: Support Checkpoints After Tasks Finished - Part One"
+date: 2022-06-01T00:00:00.000Z 
+authors:
+- Yun Gao:
+  name: "Yun Gao"
+- Dawid Wysakowicz:
+  name: "Dawid Wysakowicz"
+- Daisy Tsang:
+  name: "Daisy Tsang"
+excerpt: This post briefly describes the motivation and changes made by the final checkpoint mechanism, including the changes to the checkpoint procedure and how tasks finished.

Review Comment:
   finished -> finish



##########
_posts/2022-06-01-final-checkpoint-part1.md:
##########
@@ -0,0 +1,173 @@
+---
+layout: post 
+title:  "FLIP-147: Support Checkpoints After Tasks Finished - Part One"
+date: 2022-06-01T00:00:00.000Z 
+authors:
+- Yun Gao:
+  name: "Yun Gao"
+- Dawid Wysakowicz:
+  name: "Dawid Wysakowicz"
+- Daisy Tsang:
+  name: "Daisy Tsang"
+excerpt: This post briefly describes the motivation and changes made by the final checkpoint mechanism, including the changes to the checkpoint procedure and how tasks finished.
+
+---
+
+# Motivation
+
+Flink is a distributed processing engine for both unbounded and bounded streams of data. In recent versions,
+Flink has unified the DataStream API and the Table / SQL API to support both streaming and batch cases.
+Since most users require both types of data processing pipelines, the unification helps reduce the complexity of developing
+operating, and maintaining consistency between streaming and batch backfilling jobs, like
+[the case for Alibaba](https://www.ververica.com/blog/apache-flinks-stream-batch-unification-powers-alibabas-11.11-in-2020). 
+
+<center>
+<img vspace="20" style="width:70%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/stream_batch_cmp.png" />
+<p style="font-size: 0.6em;text-align:left;margin-top:-1em;margin-bottom: 4em">
+  Figure 1. A comparison of the Stream mode and Batch mode for the example count operator. For streaming mode, the arrived
+  elements are not sorted, the operator would read / write the state corresponding to the element for computation.
+  For batch mode, the arrived elements are first sorted as a whole and then processed.
+</p>
+</center>
+
+Flink provides two execution modes under the unified programming API: the streaming mode and the batch mode.
+The streaming mode processes records incrementally based on the states, thus it supports both bounded and unbounded sources.
+The batch mode works with bounded sources and usually has a better performance for bounded jobs because it executes all the
+tasks in topological order and avoids random state access by pre-sorting the input records. Although batch mode is often the
+preferred mode to process bounded jobs, streaming mode is also required for various reasons. For example, users may want to deal
+with records containing retraction or exploit the property that data is roughly sorted by event times in stream mode
+(like the case in [Kappa+ Architecture](https://www.youtube.com/watch?t=666&v=4qSlsYogALo&feature=youtu.be)). Moreover,
+users often have mixed jobs involving both unbounded streams and bounded side-inputs, which also require stream execution mode. 

Review Comment:
   Figure 1 seems to be  a supplementary of these statements, maybe put it below the paragraph?



##########
_posts/2022-06-01-final-checkpoint-part1.md:
##########
@@ -0,0 +1,173 @@
+---
+layout: post 
+title:  "FLIP-147: Support Checkpoints After Tasks Finished - Part One"
+date: 2022-06-01T00:00:00.000Z 
+authors:
+- Yun Gao:
+  name: "Yun Gao"
+- Dawid Wysakowicz:
+  name: "Dawid Wysakowicz"
+- Daisy Tsang:
+  name: "Daisy Tsang"
+excerpt: This post briefly describes the motivation and changes made by the final checkpoint mechanism, including the changes to the checkpoint procedure and how tasks finished.
+
+---
+
+# Motivation
+
+Flink is a distributed processing engine for both unbounded and bounded streams of data. In recent versions,
+Flink has unified the DataStream API and the Table / SQL API to support both streaming and batch cases.
+Since most users require both types of data processing pipelines, the unification helps reduce the complexity of developing
+operating, and maintaining consistency between streaming and batch backfilling jobs, like
+[the case for Alibaba](https://www.ververica.com/blog/apache-flinks-stream-batch-unification-powers-alibabas-11.11-in-2020). 
+
+<center>
+<img vspace="20" style="width:70%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/stream_batch_cmp.png" />
+<p style="font-size: 0.6em;text-align:left;margin-top:-1em;margin-bottom: 4em">
+  Figure 1. A comparison of the Stream mode and Batch mode for the example count operator. For streaming mode, the arrived
+  elements are not sorted, the operator would read / write the state corresponding to the element for computation.
+  For batch mode, the arrived elements are first sorted as a whole and then processed.
+</p>
+</center>
+
+Flink provides two execution modes under the unified programming API: the streaming mode and the batch mode.
+The streaming mode processes records incrementally based on the states, thus it supports both bounded and unbounded sources.
+The batch mode works with bounded sources and usually has a better performance for bounded jobs because it executes all the
+tasks in topological order and avoids random state access by pre-sorting the input records. Although batch mode is often the
+preferred mode to process bounded jobs, streaming mode is also required for various reasons. For example, users may want to deal
+with records containing retraction or exploit the property that data is roughly sorted by event times in stream mode
+(like the case in [Kappa+ Architecture](https://www.youtube.com/watch?t=666&v=4qSlsYogALo&feature=youtu.be)). Moreover,
+users often have mixed jobs involving both unbounded streams and bounded side-inputs, which also require stream execution mode. 
+
+In streaming mode, [checkpointing](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/checkpointing/)
+is the vital component in supporting exactly-once guarantees. By periodically snapshotting the
+aligned states of operators, Flink can recover from the latest checkpoint and continue execution when failover happens. However,
+previously Flink could not take checkpoints if any tasks finished. This would cause problems for jobs with both bounded and unbounded
+sources: if there are no checkpoints after the bounded part finished, the unbounded part might need to reprocess a large amount of
+records in case of a failure. 
+
+Furthermore, being unable to take checkpoints with finished tasks caused issues for jobs using two-phase-commit sinks to achieve

Review Comment:
   caused issues -> is a problem



##########
_posts/2022-06-01-final-checkpoint-part1.md:
##########
@@ -0,0 +1,173 @@
+---
+layout: post 
+title:  "FLIP-147: Support Checkpoints After Tasks Finished - Part One"
+date: 2022-06-01T00:00:00.000Z 
+authors:
+- Yun Gao:
+  name: "Yun Gao"
+- Dawid Wysakowicz:
+  name: "Dawid Wysakowicz"
+- Daisy Tsang:
+  name: "Daisy Tsang"
+excerpt: This post briefly describes the motivation and changes made by the final checkpoint mechanism, including the changes to the checkpoint procedure and how tasks finished.
+
+---
+
+# Motivation
+
+Flink is a distributed processing engine for both unbounded and bounded streams of data. In recent versions,
+Flink has unified the DataStream API and the Table / SQL API to support both streaming and batch cases.
+Since most users require both types of data processing pipelines, the unification helps reduce the complexity of developing
+operating, and maintaining consistency between streaming and batch backfilling jobs, like
+[the case for Alibaba](https://www.ververica.com/blog/apache-flinks-stream-batch-unification-powers-alibabas-11.11-in-2020). 
+
+<center>
+<img vspace="20" style="width:70%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/stream_batch_cmp.png" />
+<p style="font-size: 0.6em;text-align:left;margin-top:-1em;margin-bottom: 4em">
+  Figure 1. A comparison of the Stream mode and Batch mode for the example count operator. For streaming mode, the arrived
+  elements are not sorted, the operator would read / write the state corresponding to the element for computation.
+  For batch mode, the arrived elements are first sorted as a whole and then processed.
+</p>
+</center>
+
+Flink provides two execution modes under the unified programming API: the streaming mode and the batch mode.
+The streaming mode processes records incrementally based on the states, thus it supports both bounded and unbounded sources.
+The batch mode works with bounded sources and usually has a better performance for bounded jobs because it executes all the
+tasks in topological order and avoids random state access by pre-sorting the input records. Although batch mode is often the
+preferred mode to process bounded jobs, streaming mode is also required for various reasons. For example, users may want to deal
+with records containing retraction or exploit the property that data is roughly sorted by event times in stream mode
+(like the case in [Kappa+ Architecture](https://www.youtube.com/watch?t=666&v=4qSlsYogALo&feature=youtu.be)). Moreover,
+users often have mixed jobs involving both unbounded streams and bounded side-inputs, which also require stream execution mode. 
+
+In streaming mode, [checkpointing](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/checkpointing/)
+is the vital component in supporting exactly-once guarantees. By periodically snapshotting the
+aligned states of operators, Flink can recover from the latest checkpoint and continue execution when failover happens. However,
+previously Flink could not take checkpoints if any tasks finished. This would cause problems for jobs with both bounded and unbounded
+sources: if there are no checkpoints after the bounded part finished, the unbounded part might need to reprocess a large amount of
+records in case of a failure. 
+
+Furthermore, being unable to take checkpoints with finished tasks caused issues for jobs using two-phase-commit sinks to achieve
+[end-to-end exactly-once processing](https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html).
+The two-phase-commit sinks first write data to temporary files or external transactions,
+and commit the data only after a checkpoint completes to ensure the data would not be replayed on failure. However, if a job
+contains bounded sources, committing the results would not be possible after the bounded sources finish. Also because of that,
+for bounded jobs we have no way to commit the last piece of data after the first source task finished, and previously the bounded
+jobs just ignore the uncommitted data when finishing. These behaviors caused a lot of confusion and are always asked in the user
+mailing list. 
+
+Therefore, to complete the support of streaming mode for jobs using bounded sources, it is important for us to 
+
+1. Support taking checkpoints with finished tasks.
+2. Furthermore, revise the process of finishing so that all the data could always be committed.
+
+The remaining blog first briefly describes the changes we made to achieve the above targets. In the next blog,
+we’ll also share more details on how they are implemented. 
+
+# Support Checkpointing with Finished Tasks
+
+The core idea of supporting checkpoints with finished tasks is to mark the finished operators in checkpoints and skip
+executing these operators after recovery. As illustrated in Figure 2, a checkpoint is composed of the states of all
+the operators. If all the subtasks of an operator have finished, we could mark it as fully finished and skip the
+execution of this operator on startup. For other operators, their states are composed of the states of all the
+running subtasks. The states could be repartitioned on restarting and all the new subtasks restarted with the assigned states. 
+
+<center>
+<img vspace="20" style="width:50%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/checkpoint_format.png" />
+<p style="font-size: 0.6em;text-align:center;margin-top:-1em;margin-bottom: 4em">
+  Figure 2. An illustration of the extended checkpoint format.
+</p>
+</center>
+
+To support creating such a checkpoint for jobs with finished tasks, we extended the checkpoint procedure.
+Previously the checkpoint coordinator inside the JobManager first notifies all the sources to report snapshots,
+then all the sources further notify their descendants via broadcasting barrier events. Since now the sources might
+already finish, the checkpoint coordinator would instead treat the running tasks who do not have running precedent
+tasks as “new sources”, and notifies these tasks to initiate the checkpoints. The checkpoint could then deduce
+which operator is fully finished based on the task states when triggering checkpoint and the received snapshots. 
+
+The changes of the checkpoint procedure are transparent to users except that for checkpoints indeed containing
+finished tasks, we disallowed adding new operators before the fully finished ones, since it would make the fully

Review Comment:
   before -> as precedents of



##########
_posts/2022-06-01-final-checkpoint-part1.md:
##########
@@ -0,0 +1,173 @@
+---
+layout: post 
+title:  "FLIP-147: Support Checkpoints After Tasks Finished - Part One"
+date: 2022-06-01T00:00:00.000Z 
+authors:
+- Yun Gao:
+  name: "Yun Gao"
+- Dawid Wysakowicz:
+  name: "Dawid Wysakowicz"
+- Daisy Tsang:
+  name: "Daisy Tsang"
+excerpt: This post briefly describes the motivation and changes made by the final checkpoint mechanism, including the changes to the checkpoint procedure and how tasks finished.
+
+---
+
+# Motivation
+
+Flink is a distributed processing engine for both unbounded and bounded streams of data. In recent versions,
+Flink has unified the DataStream API and the Table / SQL API to support both streaming and batch cases.
+Since most users require both types of data processing pipelines, the unification helps reduce the complexity of developing
+operating, and maintaining consistency between streaming and batch backfilling jobs, like
+[the case for Alibaba](https://www.ververica.com/blog/apache-flinks-stream-batch-unification-powers-alibabas-11.11-in-2020). 
+
+<center>
+<img vspace="20" style="width:70%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/stream_batch_cmp.png" />
+<p style="font-size: 0.6em;text-align:left;margin-top:-1em;margin-bottom: 4em">
+  Figure 1. A comparison of the Stream mode and Batch mode for the example count operator. For streaming mode, the arrived
+  elements are not sorted, the operator would read / write the state corresponding to the element for computation.
+  For batch mode, the arrived elements are first sorted as a whole and then processed.
+</p>
+</center>
+
+Flink provides two execution modes under the unified programming API: the streaming mode and the batch mode.
+The streaming mode processes records incrementally based on the states, thus it supports both bounded and unbounded sources.
+The batch mode works with bounded sources and usually has a better performance for bounded jobs because it executes all the
+tasks in topological order and avoids random state access by pre-sorting the input records. Although batch mode is often the
+preferred mode to process bounded jobs, streaming mode is also required for various reasons. For example, users may want to deal
+with records containing retraction or exploit the property that data is roughly sorted by event times in stream mode
+(like the case in [Kappa+ Architecture](https://www.youtube.com/watch?t=666&v=4qSlsYogALo&feature=youtu.be)). Moreover,
+users often have mixed jobs involving both unbounded streams and bounded side-inputs, which also require stream execution mode. 
+
+In streaming mode, [checkpointing](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/checkpointing/)
+is the vital component in supporting exactly-once guarantees. By periodically snapshotting the
+aligned states of operators, Flink can recover from the latest checkpoint and continue execution when failover happens. However,
+previously Flink could not take checkpoints if any tasks finished. This would cause problems for jobs with both bounded and unbounded
+sources: if there are no checkpoints after the bounded part finished, the unbounded part might need to reprocess a large amount of
+records in case of a failure. 
+
+Furthermore, being unable to take checkpoints with finished tasks caused issues for jobs using two-phase-commit sinks to achieve
+[end-to-end exactly-once processing](https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html).
+The two-phase-commit sinks first write data to temporary files or external transactions,
+and commit the data only after a checkpoint completes to ensure the data would not be replayed on failure. However, if a job
+contains bounded sources, committing the results would not be possible after the bounded sources finish. Also because of that,
+for bounded jobs we have no way to commit the last piece of data after the first source task finished, and previously the bounded
+jobs just ignore the uncommitted data when finishing. These behaviors caused a lot of confusion and are always asked in the user
+mailing list. 
+
+Therefore, to complete the support of streaming mode for jobs using bounded sources, it is important for us to 
+
+1. Support taking checkpoints with finished tasks.
+2. Furthermore, revise the process of finishing so that all the data could always be committed.
+
+The remaining blog first briefly describes the changes we made to achieve the above targets. In the next blog,
+we’ll also share more details on how they are implemented. 

Review Comment:
   I prefer to remove the `also` 



##########
_posts/2022-06-01-final-checkpoint-part1.md:
##########
@@ -0,0 +1,173 @@
+---
+layout: post 
+title:  "FLIP-147: Support Checkpoints After Tasks Finished - Part One"
+date: 2022-06-01T00:00:00.000Z 
+authors:
+- Yun Gao:
+  name: "Yun Gao"
+- Dawid Wysakowicz:
+  name: "Dawid Wysakowicz"
+- Daisy Tsang:
+  name: "Daisy Tsang"
+excerpt: This post briefly describes the motivation and changes made by the final checkpoint mechanism, including the changes to the checkpoint procedure and how tasks finished.
+
+---
+
+# Motivation
+
+Flink is a distributed processing engine for both unbounded and bounded streams of data. In recent versions,
+Flink has unified the DataStream API and the Table / SQL API to support both streaming and batch cases.
+Since most users require both types of data processing pipelines, the unification helps reduce the complexity of developing
+operating, and maintaining consistency between streaming and batch backfilling jobs, like
+[the case for Alibaba](https://www.ververica.com/blog/apache-flinks-stream-batch-unification-powers-alibabas-11.11-in-2020). 
+
+<center>
+<img vspace="20" style="width:70%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/stream_batch_cmp.png" />
+<p style="font-size: 0.6em;text-align:left;margin-top:-1em;margin-bottom: 4em">
+  Figure 1. A comparison of the Stream mode and Batch mode for the example count operator. For streaming mode, the arrived
+  elements are not sorted, the operator would read / write the state corresponding to the element for computation.
+  For batch mode, the arrived elements are first sorted as a whole and then processed.
+</p>
+</center>
+
+Flink provides two execution modes under the unified programming API: the streaming mode and the batch mode.
+The streaming mode processes records incrementally based on the states, thus it supports both bounded and unbounded sources.
+The batch mode works with bounded sources and usually has a better performance for bounded jobs because it executes all the
+tasks in topological order and avoids random state access by pre-sorting the input records. Although batch mode is often the
+preferred mode to process bounded jobs, streaming mode is also required for various reasons. For example, users may want to deal
+with records containing retraction or exploit the property that data is roughly sorted by event times in stream mode
+(like the case in [Kappa+ Architecture](https://www.youtube.com/watch?t=666&v=4qSlsYogALo&feature=youtu.be)). Moreover,
+users often have mixed jobs involving both unbounded streams and bounded side-inputs, which also require stream execution mode. 
+
+In streaming mode, [checkpointing](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/checkpointing/)
+is the vital component in supporting exactly-once guarantees. By periodically snapshotting the
+aligned states of operators, Flink can recover from the latest checkpoint and continue execution when failover happens. However,
+previously Flink could not take checkpoints if any tasks finished. This would cause problems for jobs with both bounded and unbounded
+sources: if there are no checkpoints after the bounded part finished, the unbounded part might need to reprocess a large amount of
+records in case of a failure. 
+
+Furthermore, being unable to take checkpoints with finished tasks caused issues for jobs using two-phase-commit sinks to achieve
+[end-to-end exactly-once processing](https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html).
+The two-phase-commit sinks first write data to temporary files or external transactions,
+and commit the data only after a checkpoint completes to ensure the data would not be replayed on failure. However, if a job
+contains bounded sources, committing the results would not be possible after the bounded sources finish. Also because of that,
+for bounded jobs we have no way to commit the last piece of data after the first source task finished, and previously the bounded
+jobs just ignore the uncommitted data when finishing. These behaviors caused a lot of confusion and are always asked in the user
+mailing list. 
+
+Therefore, to complete the support of streaming mode for jobs using bounded sources, it is important for us to 
+
+1. Support taking checkpoints with finished tasks.
+2. Furthermore, revise the process of finishing so that all the data could always be committed.
+
+The remaining blog first briefly describes the changes we made to achieve the above targets. In the next blog,

Review Comment:
   I prefer to remove the `first`



##########
_posts/2022-06-01-final-checkpoint-part1.md:
##########
@@ -0,0 +1,173 @@
+---
+layout: post 
+title:  "FLIP-147: Support Checkpoints After Tasks Finished - Part One"
+date: 2022-06-01T00:00:00.000Z 
+authors:
+- Yun Gao:
+  name: "Yun Gao"
+- Dawid Wysakowicz:
+  name: "Dawid Wysakowicz"
+- Daisy Tsang:
+  name: "Daisy Tsang"
+excerpt: This post briefly describes the motivation and changes made by the final checkpoint mechanism, including the changes to the checkpoint procedure and how tasks finished.
+
+---
+
+# Motivation
+
+Flink is a distributed processing engine for both unbounded and bounded streams of data. In recent versions,
+Flink has unified the DataStream API and the Table / SQL API to support both streaming and batch cases.
+Since most users require both types of data processing pipelines, the unification helps reduce the complexity of developing

Review Comment:
   developing -> developing,



##########
_posts/2022-06-01-final-checkpoint-part1.md:
##########
@@ -0,0 +1,173 @@
+---
+layout: post 
+title:  "FLIP-147: Support Checkpoints After Tasks Finished - Part One"
+date: 2022-06-01T00:00:00.000Z 
+authors:
+- Yun Gao:
+  name: "Yun Gao"
+- Dawid Wysakowicz:
+  name: "Dawid Wysakowicz"
+- Daisy Tsang:
+  name: "Daisy Tsang"
+excerpt: This post briefly describes the motivation and changes made by the final checkpoint mechanism, including the changes to the checkpoint procedure and how tasks finished.
+
+---
+
+# Motivation
+
+Flink is a distributed processing engine for both unbounded and bounded streams of data. In recent versions,
+Flink has unified the DataStream API and the Table / SQL API to support both streaming and batch cases.
+Since most users require both types of data processing pipelines, the unification helps reduce the complexity of developing
+operating, and maintaining consistency between streaming and batch backfilling jobs, like
+[the case for Alibaba](https://www.ververica.com/blog/apache-flinks-stream-batch-unification-powers-alibabas-11.11-in-2020). 
+
+<center>
+<img vspace="20" style="width:70%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/stream_batch_cmp.png" />
+<p style="font-size: 0.6em;text-align:left;margin-top:-1em;margin-bottom: 4em">
+  Figure 1. A comparison of the Stream mode and Batch mode for the example count operator. For streaming mode, the arrived
+  elements are not sorted, the operator would read / write the state corresponding to the element for computation.
+  For batch mode, the arrived elements are first sorted as a whole and then processed.
+</p>
+</center>
+
+Flink provides two execution modes under the unified programming API: the streaming mode and the batch mode.
+The streaming mode processes records incrementally based on the states, thus it supports both bounded and unbounded sources.
+The batch mode works with bounded sources and usually has a better performance for bounded jobs because it executes all the
+tasks in topological order and avoids random state access by pre-sorting the input records. Although batch mode is often the
+preferred mode to process bounded jobs, streaming mode is also required for various reasons. For example, users may want to deal
+with records containing retraction or exploit the property that data is roughly sorted by event times in stream mode
+(like the case in [Kappa+ Architecture](https://www.youtube.com/watch?t=666&v=4qSlsYogALo&feature=youtu.be)). Moreover,
+users often have mixed jobs involving both unbounded streams and bounded side-inputs, which also require stream execution mode. 
+
+In streaming mode, [checkpointing](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/checkpointing/)
+is the vital component in supporting exactly-once guarantees. By periodically snapshotting the
+aligned states of operators, Flink can recover from the latest checkpoint and continue execution when failover happens. However,
+previously Flink could not take checkpoints if any tasks finished. This would cause problems for jobs with both bounded and unbounded
+sources: if there are no checkpoints after the bounded part finished, the unbounded part might need to reprocess a large amount of
+records in case of a failure. 
+
+Furthermore, being unable to take checkpoints with finished tasks caused issues for jobs using two-phase-commit sinks to achieve
+[end-to-end exactly-once processing](https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html).
+The two-phase-commit sinks first write data to temporary files or external transactions,
+and commit the data only after a checkpoint completes to ensure the data would not be replayed on failure. However, if a job
+contains bounded sources, committing the results would not be possible after the bounded sources finish. Also because of that,
+for bounded jobs we have no way to commit the last piece of data after the first source task finished, and previously the bounded
+jobs just ignore the uncommitted data when finishing. These behaviors caused a lot of confusion and are always asked in the user
+mailing list. 
+
+Therefore, to complete the support of streaming mode for jobs using bounded sources, it is important for us to 
+
+1. Support taking checkpoints with finished tasks.
+2. Furthermore, revise the process of finishing so that all the data could always be committed.
+
+The remaining blog first briefly describes the changes we made to achieve the above targets. In the next blog,

Review Comment:
   targets -> goals



##########
_posts/2022-06-01-final-checkpoint-part1.md:
##########
@@ -0,0 +1,173 @@
+---
+layout: post 
+title:  "FLIP-147: Support Checkpoints After Tasks Finished - Part One"
+date: 2022-06-01T00:00:00.000Z 
+authors:
+- Yun Gao:
+  name: "Yun Gao"
+- Dawid Wysakowicz:
+  name: "Dawid Wysakowicz"
+- Daisy Tsang:
+  name: "Daisy Tsang"
+excerpt: This post briefly describes the motivation and changes made by the final checkpoint mechanism, including the changes to the checkpoint procedure and how tasks finished.
+
+---
+
+# Motivation
+
+Flink is a distributed processing engine for both unbounded and bounded streams of data. In recent versions,
+Flink has unified the DataStream API and the Table / SQL API to support both streaming and batch cases.
+Since most users require both types of data processing pipelines, the unification helps reduce the complexity of developing
+operating, and maintaining consistency between streaming and batch backfilling jobs, like
+[the case for Alibaba](https://www.ververica.com/blog/apache-flinks-stream-batch-unification-powers-alibabas-11.11-in-2020). 
+
+<center>
+<img vspace="20" style="width:70%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/stream_batch_cmp.png" />
+<p style="font-size: 0.6em;text-align:left;margin-top:-1em;margin-bottom: 4em">
+  Figure 1. A comparison of the Stream mode and Batch mode for the example count operator. For streaming mode, the arrived
+  elements are not sorted, the operator would read / write the state corresponding to the element for computation.
+  For batch mode, the arrived elements are first sorted as a whole and then processed.
+</p>
+</center>
+
+Flink provides two execution modes under the unified programming API: the streaming mode and the batch mode.
+The streaming mode processes records incrementally based on the states, thus it supports both bounded and unbounded sources.
+The batch mode works with bounded sources and usually has a better performance for bounded jobs because it executes all the
+tasks in topological order and avoids random state access by pre-sorting the input records. Although batch mode is often the
+preferred mode to process bounded jobs, streaming mode is also required for various reasons. For example, users may want to deal
+with records containing retraction or exploit the property that data is roughly sorted by event times in stream mode
+(like the case in [Kappa+ Architecture](https://www.youtube.com/watch?t=666&v=4qSlsYogALo&feature=youtu.be)). Moreover,
+users often have mixed jobs involving both unbounded streams and bounded side-inputs, which also require stream execution mode. 
+
+In streaming mode, [checkpointing](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/checkpointing/)
+is the vital component in supporting exactly-once guarantees. By periodically snapshotting the
+aligned states of operators, Flink can recover from the latest checkpoint and continue execution when failover happens. However,
+previously Flink could not take checkpoints if any tasks finished. This would cause problems for jobs with both bounded and unbounded
+sources: if there are no checkpoints after the bounded part finished, the unbounded part might need to reprocess a large amount of
+records in case of a failure. 
+
+Furthermore, being unable to take checkpoints with finished tasks caused issues for jobs using two-phase-commit sinks to achieve
+[end-to-end exactly-once processing](https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html).
+The two-phase-commit sinks first write data to temporary files or external transactions,
+and commit the data only after a checkpoint completes to ensure the data would not be replayed on failure. However, if a job
+contains bounded sources, committing the results would not be possible after the bounded sources finish. Also because of that,
+for bounded jobs we have no way to commit the last piece of data after the first source task finished, and previously the bounded
+jobs just ignore the uncommitted data when finishing. These behaviors caused a lot of confusion and are always asked in the user
+mailing list. 
+
+Therefore, to complete the support of streaming mode for jobs using bounded sources, it is important for us to 
+
+1. Support taking checkpoints with finished tasks.
+2. Furthermore, revise the process of finishing so that all the data could always be committed.
+
+The remaining blog first briefly describes the changes we made to achieve the above targets. In the next blog,
+we’ll also share more details on how they are implemented. 
+
+# Support Checkpointing with Finished Tasks
+
+The core idea of supporting checkpoints with finished tasks is to mark the finished operators in checkpoints and skip
+executing these operators after recovery. As illustrated in Figure 2, a checkpoint is composed of the states of all
+the operators. If all the subtasks of an operator have finished, we could mark it as fully finished and skip the
+execution of this operator on startup. For other operators, their states are composed of the states of all the
+running subtasks. The states could be repartitioned on restarting and all the new subtasks restarted with the assigned states. 
+
+<center>
+<img vspace="20" style="width:50%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/checkpoint_format.png" />
+<p style="font-size: 0.6em;text-align:center;margin-top:-1em;margin-bottom: 4em">
+  Figure 2. An illustration of the extended checkpoint format.
+</p>
+</center>
+
+To support creating such a checkpoint for jobs with finished tasks, we extended the checkpoint procedure.
+Previously the checkpoint coordinator inside the JobManager first notifies all the sources to report snapshots,
+then all the sources further notify their descendants via broadcasting barrier events. Since now the sources might
+already finish, the checkpoint coordinator would instead treat the running tasks who do not have running precedent

Review Comment:
   already finish -> have already finished



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org