You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/08 03:11:15 UTC

[GitHub] [flink-web] wanglijie95 opened a new pull request, #546: Add blogs for FLIP-187 adaptive batch scheduler

wanglijie95 opened a new pull request, #546:
URL: https://github.com/apache/flink-web/pull/546

   This pr adds the blog for FLIP-187: adaptive batch scheduler


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-web] wanglijie95 commented on a diff in pull request #546: Add blogs for FLIP-187 adaptive batch scheduler

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on code in PR #546:
URL: https://github.com/apache/flink-web/pull/546#discussion_r896704323


##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,198 @@
+---
+layout: post
+title: "Automatically decide parallelism for Flink batch jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+  name: "Lijie Wang"
+- Zhu Zhu:
+  name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not easy work for many users. For batch jobs, a small parallelism may result in long execution time and big failover regression. While an unnecessary large parallelism may result in resource waste and more overhead cost in task deployment and network shuffling. 
+
+To decide a proper parallelism, one needs to know how much data each operator needs to process. However, It can be hard to predict data volume to be processed by a job because it can be different everyday. And it can be harder or even impossible (due to complex operators or UDFs) to predict data volume to be processed by each operator.
+
+To solve this problem, we introduced the adaptive batch scheduler in Flink 1.15. The adaptive batch scheduler can automatically decide parallelism for an operator according to the size of its consumed datasets. Here are the benefits the adaptive batch scheduler can bring:
+
+1. Batch job users can be relieved from parallelism tuning.
+2. Parallelism tuning is fine grained considering different operators. This is particularly beneficial for SQL jobs which can only be set with a global parallelism previously.
+3. Parallelism tuning can better fit consumed datasets which have a varying volume size every day.
+
+# Get Started
+
+To automatically decide parallelism for operators, you need to:
+
+1. Configure to use adaptive batch scheduler.
+2. Set the parallelism of operators to -1.
+
+
+## Configure to use adaptive batch scheduler
+
+To use adaptive batch scheduler, you need to set configurations as below:
+
+- Set `jobmanager.scheduler: AdaptiveBatch`.
+- Leave the [execution.batch-shuffle-mode]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#execution-batch-shuffle-mode) unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value). Currently, the adaptive batch scheduler only supports batch jobs whose shuffle mode is `ALL-EXCHANGES-BLOCKING`.
+
+In addition, there are several related configuration options to control the upper bounds and lower bounds of tuned parallelisms, to specify expected data volume to process by each operator, and to specify the default parallelism of sources. More details can be found in the [feature documentation page]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/elastic_scaling/#configure-to-use-adaptive-batch-scheduler).
+
+## Set the parallelism of operators to -1
+
+The adaptive batch scheduler will only automatically decide parallelism for operators whose parallelism is not set (which means the parallelism is -1).To leave parallelism unset, you should configure as follows:
+
+- Set `parallelism.default: -1`
+- Set `table.exec.resource.default-parallelism: -1` in SQL jobs.
+- Don't call `setParallelism()` for operators in DataStream/DataSet jobs.
+- Don't call `setParallelism()` on `StreamExecutionEnvironment/ExecutionEnvironment` in DataStream/DataSet jobs.
+
+
+# Implementation Details
+
+In this section, we will elaborate the details of the implementation. To automatically decide parallelism of operators, we introduced the following changes:
+
+1. Enables the scheduler to collect sizes of finished datasets.
+2. Introduces a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results.
+3. Enables to dynamically build up ExecutionGraph to allow the parallelisms of job vertices to be decided lazily. The execution graph starts with an empty execution topology and then gradually attaches the vertices during job execution.
+4. Introduces the adaptive batch scheduler to update and schedule the dynamic execution graph.
+
+The details will be introduced in the following sections.
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/1-overall-structure.png" width="60%"/>
+<br/>
+Fig. 1 - The overall structure of automatically deciding parallelism
+</center>
+
+<br/>
+
+## Collect sizes of consumed datasets
+
+The adaptive batch scheduler decides the parallelism of vertices by the size of input results, so the scheduler needs to know the sizes of result partitions produced by tasks. We introduced a numBytesProduced counter to record the size of each produced result partition, the accumulated result of the counter will be sent to the scheduler when tasks finish. 
+
+## Decide proper parallelisms for job vertices
+
+We introduced a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results. The computation algorithm is as follows:
+Suppose:
+
+- ***V*** is the bytes of data the user expects to be processed by each task.
+- ***totalBytes<sub>non-broadcast</sub>*** is the sum of the non-broadcast result sizes consumed by this job vertex.
+- ***totalBytes<sub>broadcast</sub>*** is the sum of the broadcast result sizes consumed by this job vertex.
+- ***broadcastCapRatio*** is the cap ratio of broadcast bytes that affects the parallelism calculation.
+- ***normalize(***x***)*** is a function that round ***x*** to the closest power of 2.
+
+then the parallelism of this job vertex ***P*** will be:
+<center>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/parallelism-formula.png" width="60%"/>
+</center>
+
+Note that we introduced two special treatment in the above formula :
+
+- [Limit the cap ratio of broadcast bytes](#limit-the-cap-ratio-of-broadcast-bytes)
+- [Normalize the parallelism to the closest power of 2](#normalize-the-parallelism-to-the-closest-power-of-2)
+
+However, the above formula cannot be used to decide the parallelism of the source vertices, because the source vertices have no input. To solve it, we introduced the configuration option `jobmanager.adaptive-batch-scheduler.default-source-parallelism` to allow users to manually configure the parallelism of source vertices. Note that not all data sources need this option, because some data sources can automatically infer parallelism (For example, HiveTableSource, see [HiveParallelismInference](https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveParallelismInference.java) for more detail). For these sources, it is recommended to decide parallelism by themselves.
+
+### Limit the cap ratio of broadcast bytes
+As you can see, we limit the cap ratio of broadcast bytes that affects the parallelism calculation to ***broadcastCapRatio***. That is, the non-broadcast bytes processed by each task is at least ***(1-broadcastCapRatio) * V***. If not so,when the total broadcast bytes is close to ***V***, even if the total non-broadcast bytes is very small, it may cause a large parallelism, which is unnecessary and may lead to resource waste and large task deployment overhead.
+
+Generally, the broadcast dataset is usually relatively small against the other co-processed datasets. Therefore, we set the cap ratio to 0.5 by default because we usually expect the broadcast bytes to be smaller than non-broadcast bytes. The value is hard coded in the first version, and we may make it configurable later.
+
+
+### Normalize the parallelism to the closest power of 2
+The normalize is to avoid introducing data skew. To better understand this section, we suggest you read the [Flexible subpartition mapping](#flexible-subpartition-mapping) section first.
+
+Taking Fig. 4 (b) as example, A1/A2 produces 4 subpartitions, and the decided parallelism of B is 3. In this case, B1 will consume 1 subpartition, B2 will consume 1 subpartition, and B3 will consume 2 subpartitions. We assume that subpartitions have the same amount of data, which means B3 will consume twice the data of other tasks, data skew is introduced due to the subpartition mapping.
+
+To solve this problem, we need to make the subpartitions evenly consumed by downstream tasks, which means the number of subpartitions should be a multiple of the number of downstream tasks. For simplicity, we require the user-specified max parallelism to be 2<sup>N</sup>, and then adjust the calculated parallelism to a closest 2<sup>M</sup> (M <= N), so that we can guarantee that subpartitions will be evenly consumed by downstream tasks.
+
+Note that this is a temporary solution, the ultimate solution would be the [Auto-rebalancing of workloads](#auto-rebalancing-of-workloads), which may come soon.
+
+## Build up execution graph dynamically
+Before Flink 1.15, the execution graph was fully built in a static way before starting scheduling. To allow parallelisms of job vertices to be decided lazily, the execution graph must be able to be built up dynamically.
+
+### Create execution vertices and execution edges lazily
+A dynamic execution graph means that a Flink job starts with an empty execution topology, and then gradually attaches vertices during job execution, as shown in Fig. 2.
+
+The execution topology consists of execution vertices and execution edges. The execution vertices will be created and attached to the execution topology only when:
+
+- The parallelism of the corresponding job vertex is decided.
+- All upstream execution vertices are already attached.
+
+A decided parallelism of the job vertex is needed so that Flink knows how many execution vertices should be created. Upstream execution vertices need to be attached first so that Flink can connect the newly created execution vertices to the upstream vertices with execution edges. 
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/2-dynamic-graph.png" width="90%"/>
+<br/>
+Fig. 2 - Build up execution graph dynamically
+</center>
+
+<br/>
+
+### Flexible subpartition mapping
+Before Flink 1.15, when deploying a task, Flink needs to know the parallelism of its consumer job vertex. This is because consumer vertex parallelism is used to decide the number of subpartitions produced by each upstream task. The reason behind that is, for one result partition, different subpartitions serve different consumer execution vertices. More specifically, one consumer execution vertex only consumes data from subpartition with the same index. 
+
+Taking Fig. 3 as example, parallelism of the consumer B is 2, so the result partition produced by A1/A2 should contain 2 subpartitions, the subpartition with index 0 serves B1, and the subpartition with index 1 serves B2.
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/3-static-graph-subpartition-mapping.png" width="30%"/>
+<br/>
+Fig. 3 - How subpartitions serve consumer execution vertices with static execution graph
+</center>
+
+<br/>
+
+But obviously, this doesn't work for dynamic graphs, because when a job vertex is deployed, the parallelism of its consumer job vertices may not have been decided yet. To enable Flink to work in this case, we need a way to allow a job vertex to run without knowing the parallelism of its consumer job vertices(or rather, we need a way to allow execution vertices to run without knowing the number of their consumer execution vertices). 
+
+To achieve this goal, we can set the number of subpartitions to be the max parallelism of the consumer job vertex. Then when the consumer execution vertices are deployed, they should be assigned with a subpartition range to consume. Suppose N is the number of consumer execution vertices and P is the number of subpartitions. For the kth consumer execution vertex, the consumed subpartition range should be:
+
+<center>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/range-formula.png" width="55%"/>
+</center>
+
+Taking Fig. 4 as example, the max parallelism of B is 4, so A1/A2 have 4 subpartitions. And then if the decided parallelism of B is 2, then the subpartitions mapping will be Fig. 4 (a), if the decided parallelism of B is 3, then the subpartitions mapping will be  Fig. 4 (b).
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/4-dynamic-graph-subpartition-mapping.png" width="75%"/>
+<br/>
+Fig. 4 - How subpartitions serve consumer execution vertices with dynamic graph
+</center>
+
+<br/>
+
+## Update and schedule the dynamic execution graph
+The adaptive batch scheduler scheduling is similar to the default scheduler, the only difference is that an empty dynamic execution graph will be generated initially and vertices will be attached later. Before handling any scheduling event, the scheduler will try deciding the parallelisms for job vertices, and then initialize them to generate execution vertices, connecting execution edges, and update the execution graph.
+
+The scheduler will try to decide the parallelism for all job vertices before each scheduling, and the parallelism decision will be made for each job vertex in topological order:
+
+- For source vertices, the parallelism should have been decided before starting scheduling. 
+- For non-source vertices, the parallelism can be decided only when all its consumed results are fully finished.
+
+After trying to decide the parallelism for all job vertices, the scheduler will try to initialize the job vertices in topological order. A job vertex that can be initialized should meet the following conditions:

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-web] wanglijie95 commented on a diff in pull request #546: Add blogs for FLIP-187 adaptive batch scheduler

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on code in PR #546:
URL: https://github.com/apache/flink-web/pull/546#discussion_r896699847


##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,198 @@
+---
+layout: post
+title: "Automatically decide parallelism for Flink batch jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+  name: "Lijie Wang"
+- Zhu Zhu:
+  name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not easy work for many users. For batch jobs, a small parallelism may result in long execution time and big failover regression. While an unnecessary large parallelism may result in resource waste and more overhead cost in task deployment and network shuffling. 
+
+To decide a proper parallelism, one needs to know how much data each operator needs to process. However, It can be hard to predict data volume to be processed by a job because it can be different everyday. And it can be harder or even impossible (due to complex operators or UDFs) to predict data volume to be processed by each operator.
+
+To solve this problem, we introduced the adaptive batch scheduler in Flink 1.15. The adaptive batch scheduler can automatically decide parallelism for an operator according to the size of its consumed datasets. Here are the benefits the adaptive batch scheduler can bring:
+
+1. Batch job users can be relieved from parallelism tuning.
+2. Parallelism tuning is fine grained considering different operators. This is particularly beneficial for SQL jobs which can only be set with a global parallelism previously.
+3. Parallelism tuning can better fit consumed datasets which have a varying volume size every day.
+
+# Get Started
+
+To automatically decide parallelism for operators, you need to:
+
+1. Configure to use adaptive batch scheduler.
+2. Set the parallelism of operators to -1.
+
+
+## Configure to use adaptive batch scheduler
+
+To use adaptive batch scheduler, you need to set configurations as below:
+
+- Set `jobmanager.scheduler: AdaptiveBatch`.
+- Leave the [execution.batch-shuffle-mode]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#execution-batch-shuffle-mode) unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value). Currently, the adaptive batch scheduler only supports batch jobs whose shuffle mode is `ALL-EXCHANGES-BLOCKING`.
+
+In addition, there are several related configuration options to control the upper bounds and lower bounds of tuned parallelisms, to specify expected data volume to process by each operator, and to specify the default parallelism of sources. More details can be found in the [feature documentation page]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/elastic_scaling/#configure-to-use-adaptive-batch-scheduler).
+
+## Set the parallelism of operators to -1
+
+The adaptive batch scheduler will only automatically decide parallelism for operators whose parallelism is not set (which means the parallelism is -1).To leave parallelism unset, you should configure as follows:
+
+- Set `parallelism.default: -1`
+- Set `table.exec.resource.default-parallelism: -1` in SQL jobs.
+- Don't call `setParallelism()` for operators in DataStream/DataSet jobs.
+- Don't call `setParallelism()` on `StreamExecutionEnvironment/ExecutionEnvironment` in DataStream/DataSet jobs.
+
+
+# Implementation Details
+
+In this section, we will elaborate the details of the implementation. To automatically decide parallelism of operators, we introduced the following changes:
+
+1. Enables the scheduler to collect sizes of finished datasets.
+2. Introduces a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results.
+3. Enables to dynamically build up ExecutionGraph to allow the parallelisms of job vertices to be decided lazily. The execution graph starts with an empty execution topology and then gradually attaches the vertices during job execution.
+4. Introduces the adaptive batch scheduler to update and schedule the dynamic execution graph.
+
+The details will be introduced in the following sections.
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/1-overall-structure.png" width="60%"/>
+<br/>
+Fig. 1 - The overall structure of automatically deciding parallelism
+</center>
+
+<br/>
+
+## Collect sizes of consumed datasets
+
+The adaptive batch scheduler decides the parallelism of vertices by the size of input results, so the scheduler needs to know the sizes of result partitions produced by tasks. We introduced a numBytesProduced counter to record the size of each produced result partition, the accumulated result of the counter will be sent to the scheduler when tasks finish. 
+
+## Decide proper parallelisms for job vertices
+
+We introduced a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results. The computation algorithm is as follows:
+Suppose:
+
+- ***V*** is the bytes of data the user expects to be processed by each task.
+- ***totalBytes<sub>non-broadcast</sub>*** is the sum of the non-broadcast result sizes consumed by this job vertex.
+- ***totalBytes<sub>broadcast</sub>*** is the sum of the broadcast result sizes consumed by this job vertex.
+- ***broadcastCapRatio*** is the cap ratio of broadcast bytes that affects the parallelism calculation.
+- ***normalize(***x***)*** is a function that round ***x*** to the closest power of 2.
+
+then the parallelism of this job vertex ***P*** will be:
+<center>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/parallelism-formula.png" width="60%"/>
+</center>
+
+Note that we introduced two special treatment in the above formula :
+
+- [Limit the cap ratio of broadcast bytes](#limit-the-cap-ratio-of-broadcast-bytes)
+- [Normalize the parallelism to the closest power of 2](#normalize-the-parallelism-to-the-closest-power-of-2)
+
+However, the above formula cannot be used to decide the parallelism of the source vertices, because the source vertices have no input. To solve it, we introduced the configuration option `jobmanager.adaptive-batch-scheduler.default-source-parallelism` to allow users to manually configure the parallelism of source vertices. Note that not all data sources need this option, because some data sources can automatically infer parallelism (For example, HiveTableSource, see [HiveParallelismInference](https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveParallelismInference.java) for more detail). For these sources, it is recommended to decide parallelism by themselves.
+
+### Limit the cap ratio of broadcast bytes
+As you can see, we limit the cap ratio of broadcast bytes that affects the parallelism calculation to ***broadcastCapRatio***. That is, the non-broadcast bytes processed by each task is at least ***(1-broadcastCapRatio) * V***. If not so,when the total broadcast bytes is close to ***V***, even if the total non-broadcast bytes is very small, it may cause a large parallelism, which is unnecessary and may lead to resource waste and large task deployment overhead.
+
+Generally, the broadcast dataset is usually relatively small against the other co-processed datasets. Therefore, we set the cap ratio to 0.5 by default because we usually expect the broadcast bytes to be smaller than non-broadcast bytes. The value is hard coded in the first version, and we may make it configurable later.
+
+
+### Normalize the parallelism to the closest power of 2
+The normalize is to avoid introducing data skew. To better understand this section, we suggest you read the [Flexible subpartition mapping](#flexible-subpartition-mapping) section first.
+
+Taking Fig. 4 (b) as example, A1/A2 produces 4 subpartitions, and the decided parallelism of B is 3. In this case, B1 will consume 1 subpartition, B2 will consume 1 subpartition, and B3 will consume 2 subpartitions. We assume that subpartitions have the same amount of data, which means B3 will consume twice the data of other tasks, data skew is introduced due to the subpartition mapping.
+
+To solve this problem, we need to make the subpartitions evenly consumed by downstream tasks, which means the number of subpartitions should be a multiple of the number of downstream tasks. For simplicity, we require the user-specified max parallelism to be 2<sup>N</sup>, and then adjust the calculated parallelism to a closest 2<sup>M</sup> (M <= N), so that we can guarantee that subpartitions will be evenly consumed by downstream tasks.
+
+Note that this is a temporary solution, the ultimate solution would be the [Auto-rebalancing of workloads](#auto-rebalancing-of-workloads), which may come soon.
+
+## Build up execution graph dynamically
+Before Flink 1.15, the execution graph was fully built in a static way before starting scheduling. To allow parallelisms of job vertices to be decided lazily, the execution graph must be able to be built up dynamically.
+
+### Create execution vertices and execution edges lazily
+A dynamic execution graph means that a Flink job starts with an empty execution topology, and then gradually attaches vertices during job execution, as shown in Fig. 2.
+
+The execution topology consists of execution vertices and execution edges. The execution vertices will be created and attached to the execution topology only when:
+
+- The parallelism of the corresponding job vertex is decided.
+- All upstream execution vertices are already attached.
+
+A decided parallelism of the job vertex is needed so that Flink knows how many execution vertices should be created. Upstream execution vertices need to be attached first so that Flink can connect the newly created execution vertices to the upstream vertices with execution edges. 
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/2-dynamic-graph.png" width="90%"/>
+<br/>
+Fig. 2 - Build up execution graph dynamically
+</center>
+
+<br/>
+
+### Flexible subpartition mapping
+Before Flink 1.15, when deploying a task, Flink needs to know the parallelism of its consumer job vertex. This is because consumer vertex parallelism is used to decide the number of subpartitions produced by each upstream task. The reason behind that is, for one result partition, different subpartitions serve different consumer execution vertices. More specifically, one consumer execution vertex only consumes data from subpartition with the same index. 
+
+Taking Fig. 3 as example, parallelism of the consumer B is 2, so the result partition produced by A1/A2 should contain 2 subpartitions, the subpartition with index 0 serves B1, and the subpartition with index 1 serves B2.
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/3-static-graph-subpartition-mapping.png" width="30%"/>
+<br/>
+Fig. 3 - How subpartitions serve consumer execution vertices with static execution graph
+</center>
+
+<br/>
+
+But obviously, this doesn't work for dynamic graphs, because when a job vertex is deployed, the parallelism of its consumer job vertices may not have been decided yet. To enable Flink to work in this case, we need a way to allow a job vertex to run without knowing the parallelism of its consumer job vertices(or rather, we need a way to allow execution vertices to run without knowing the number of their consumer execution vertices). 
+
+To achieve this goal, we can set the number of subpartitions to be the max parallelism of the consumer job vertex. Then when the consumer execution vertices are deployed, they should be assigned with a subpartition range to consume. Suppose N is the number of consumer execution vertices and P is the number of subpartitions. For the kth consumer execution vertex, the consumed subpartition range should be:
+
+<center>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/range-formula.png" width="55%"/>
+</center>
+
+Taking Fig. 4 as example, the max parallelism of B is 4, so A1/A2 have 4 subpartitions. And then if the decided parallelism of B is 2, then the subpartitions mapping will be Fig. 4 (a), if the decided parallelism of B is 3, then the subpartitions mapping will be  Fig. 4 (b).
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/4-dynamic-graph-subpartition-mapping.png" width="75%"/>
+<br/>
+Fig. 4 - How subpartitions serve consumer execution vertices with dynamic graph
+</center>
+
+<br/>
+
+## Update and schedule the dynamic execution graph
+The adaptive batch scheduler scheduling is similar to the default scheduler, the only difference is that an empty dynamic execution graph will be generated initially and vertices will be attached later. Before handling any scheduling event, the scheduler will try deciding the parallelisms for job vertices, and then initialize them to generate execution vertices, connecting execution edges, and update the execution graph.
+
+The scheduler will try to decide the parallelism for all job vertices before each scheduling, and the parallelism decision will be made for each job vertex in topological order:

Review Comment:
   I change it to `before handling each scheduling event`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-web] wanglijie95 commented on a diff in pull request #546: Add blogs for FLIP-187 adaptive batch scheduler

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on code in PR #546:
URL: https://github.com/apache/flink-web/pull/546#discussion_r896687126


##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,198 @@
+---
+layout: post
+title: "Automatically decide parallelism for Flink batch jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+  name: "Lijie Wang"
+- Zhu Zhu:
+  name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not easy work for many users. For batch jobs, a small parallelism may result in long execution time and big failover regression. While an unnecessary large parallelism may result in resource waste and more overhead cost in task deployment and network shuffling. 
+
+To decide a proper parallelism, one needs to know how much data each operator needs to process. However, It can be hard to predict data volume to be processed by a job because it can be different everyday. And it can be harder or even impossible (due to complex operators or UDFs) to predict data volume to be processed by each operator.
+
+To solve this problem, we introduced the adaptive batch scheduler in Flink 1.15. The adaptive batch scheduler can automatically decide parallelism for an operator according to the size of its consumed datasets. Here are the benefits the adaptive batch scheduler can bring:
+
+1. Batch job users can be relieved from parallelism tuning.
+2. Parallelism tuning is fine grained considering different operators. This is particularly beneficial for SQL jobs which can only be set with a global parallelism previously.
+3. Parallelism tuning can better fit consumed datasets which have a varying volume size every day.
+
+# Get Started
+
+To automatically decide parallelism for operators, you need to:
+
+1. Configure to use adaptive batch scheduler.
+2. Set the parallelism of operators to -1.
+
+
+## Configure to use adaptive batch scheduler
+
+To use adaptive batch scheduler, you need to set configurations as below:
+
+- Set `jobmanager.scheduler: AdaptiveBatch`.
+- Leave the [execution.batch-shuffle-mode]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#execution-batch-shuffle-mode) unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value). Currently, the adaptive batch scheduler only supports batch jobs whose shuffle mode is `ALL-EXCHANGES-BLOCKING`.
+
+In addition, there are several related configuration options to control the upper bounds and lower bounds of tuned parallelisms, to specify expected data volume to process by each operator, and to specify the default parallelism of sources. More details can be found in the [feature documentation page]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/elastic_scaling/#configure-to-use-adaptive-batch-scheduler).
+
+## Set the parallelism of operators to -1
+
+The adaptive batch scheduler will only automatically decide parallelism for operators whose parallelism is not set (which means the parallelism is -1).To leave parallelism unset, you should configure as follows:
+
+- Set `parallelism.default: -1`
+- Set `table.exec.resource.default-parallelism: -1` in SQL jobs.
+- Don't call `setParallelism()` for operators in DataStream/DataSet jobs.
+- Don't call `setParallelism()` on `StreamExecutionEnvironment/ExecutionEnvironment` in DataStream/DataSet jobs.
+
+
+# Implementation Details
+
+In this section, we will elaborate the details of the implementation. To automatically decide parallelism of operators, we introduced the following changes:
+
+1. Enables the scheduler to collect sizes of finished datasets.
+2. Introduces a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results.
+3. Enables to dynamically build up ExecutionGraph to allow the parallelisms of job vertices to be decided lazily. The execution graph starts with an empty execution topology and then gradually attaches the vertices during job execution.
+4. Introduces the adaptive batch scheduler to update and schedule the dynamic execution graph.
+
+The details will be introduced in the following sections.
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/1-overall-structure.png" width="60%"/>
+<br/>
+Fig. 1 - The overall structure of automatically deciding parallelism
+</center>
+
+<br/>
+
+## Collect sizes of consumed datasets
+
+The adaptive batch scheduler decides the parallelism of vertices by the size of input results, so the scheduler needs to know the sizes of result partitions produced by tasks. We introduced a numBytesProduced counter to record the size of each produced result partition, the accumulated result of the counter will be sent to the scheduler when tasks finish. 
+
+## Decide proper parallelisms for job vertices
+
+We introduced a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results. The computation algorithm is as follows:
+Suppose:
+
+- ***V*** is the bytes of data the user expects to be processed by each task.
+- ***totalBytes<sub>non-broadcast</sub>*** is the sum of the non-broadcast result sizes consumed by this job vertex.
+- ***totalBytes<sub>broadcast</sub>*** is the sum of the broadcast result sizes consumed by this job vertex.
+- ***broadcastCapRatio*** is the cap ratio of broadcast bytes that affects the parallelism calculation.

Review Comment:
   I changed it to `maximum ratio`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-web] wanglijie95 commented on a diff in pull request #546: Add blogs for FLIP-187 adaptive batch scheduler

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on code in PR #546:
URL: https://github.com/apache/flink-web/pull/546#discussion_r896699847


##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,198 @@
+---
+layout: post
+title: "Automatically decide parallelism for Flink batch jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+  name: "Lijie Wang"
+- Zhu Zhu:
+  name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not easy work for many users. For batch jobs, a small parallelism may result in long execution time and big failover regression. While an unnecessary large parallelism may result in resource waste and more overhead cost in task deployment and network shuffling. 
+
+To decide a proper parallelism, one needs to know how much data each operator needs to process. However, It can be hard to predict data volume to be processed by a job because it can be different everyday. And it can be harder or even impossible (due to complex operators or UDFs) to predict data volume to be processed by each operator.
+
+To solve this problem, we introduced the adaptive batch scheduler in Flink 1.15. The adaptive batch scheduler can automatically decide parallelism for an operator according to the size of its consumed datasets. Here are the benefits the adaptive batch scheduler can bring:
+
+1. Batch job users can be relieved from parallelism tuning.
+2. Parallelism tuning is fine grained considering different operators. This is particularly beneficial for SQL jobs which can only be set with a global parallelism previously.
+3. Parallelism tuning can better fit consumed datasets which have a varying volume size every day.
+
+# Get Started
+
+To automatically decide parallelism for operators, you need to:
+
+1. Configure to use adaptive batch scheduler.
+2. Set the parallelism of operators to -1.
+
+
+## Configure to use adaptive batch scheduler
+
+To use adaptive batch scheduler, you need to set configurations as below:
+
+- Set `jobmanager.scheduler: AdaptiveBatch`.
+- Leave the [execution.batch-shuffle-mode]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#execution-batch-shuffle-mode) unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value). Currently, the adaptive batch scheduler only supports batch jobs whose shuffle mode is `ALL-EXCHANGES-BLOCKING`.
+
+In addition, there are several related configuration options to control the upper bounds and lower bounds of tuned parallelisms, to specify expected data volume to process by each operator, and to specify the default parallelism of sources. More details can be found in the [feature documentation page]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/elastic_scaling/#configure-to-use-adaptive-batch-scheduler).
+
+## Set the parallelism of operators to -1
+
+The adaptive batch scheduler will only automatically decide parallelism for operators whose parallelism is not set (which means the parallelism is -1).To leave parallelism unset, you should configure as follows:
+
+- Set `parallelism.default: -1`
+- Set `table.exec.resource.default-parallelism: -1` in SQL jobs.
+- Don't call `setParallelism()` for operators in DataStream/DataSet jobs.
+- Don't call `setParallelism()` on `StreamExecutionEnvironment/ExecutionEnvironment` in DataStream/DataSet jobs.
+
+
+# Implementation Details
+
+In this section, we will elaborate the details of the implementation. To automatically decide parallelism of operators, we introduced the following changes:
+
+1. Enables the scheduler to collect sizes of finished datasets.
+2. Introduces a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results.
+3. Enables to dynamically build up ExecutionGraph to allow the parallelisms of job vertices to be decided lazily. The execution graph starts with an empty execution topology and then gradually attaches the vertices during job execution.
+4. Introduces the adaptive batch scheduler to update and schedule the dynamic execution graph.
+
+The details will be introduced in the following sections.
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/1-overall-structure.png" width="60%"/>
+<br/>
+Fig. 1 - The overall structure of automatically deciding parallelism
+</center>
+
+<br/>
+
+## Collect sizes of consumed datasets
+
+The adaptive batch scheduler decides the parallelism of vertices by the size of input results, so the scheduler needs to know the sizes of result partitions produced by tasks. We introduced a numBytesProduced counter to record the size of each produced result partition, the accumulated result of the counter will be sent to the scheduler when tasks finish. 
+
+## Decide proper parallelisms for job vertices
+
+We introduced a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results. The computation algorithm is as follows:
+Suppose:
+
+- ***V*** is the bytes of data the user expects to be processed by each task.
+- ***totalBytes<sub>non-broadcast</sub>*** is the sum of the non-broadcast result sizes consumed by this job vertex.
+- ***totalBytes<sub>broadcast</sub>*** is the sum of the broadcast result sizes consumed by this job vertex.
+- ***broadcastCapRatio*** is the cap ratio of broadcast bytes that affects the parallelism calculation.
+- ***normalize(***x***)*** is a function that round ***x*** to the closest power of 2.
+
+then the parallelism of this job vertex ***P*** will be:
+<center>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/parallelism-formula.png" width="60%"/>
+</center>
+
+Note that we introduced two special treatment in the above formula :
+
+- [Limit the cap ratio of broadcast bytes](#limit-the-cap-ratio-of-broadcast-bytes)
+- [Normalize the parallelism to the closest power of 2](#normalize-the-parallelism-to-the-closest-power-of-2)
+
+However, the above formula cannot be used to decide the parallelism of the source vertices, because the source vertices have no input. To solve it, we introduced the configuration option `jobmanager.adaptive-batch-scheduler.default-source-parallelism` to allow users to manually configure the parallelism of source vertices. Note that not all data sources need this option, because some data sources can automatically infer parallelism (For example, HiveTableSource, see [HiveParallelismInference](https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveParallelismInference.java) for more detail). For these sources, it is recommended to decide parallelism by themselves.
+
+### Limit the cap ratio of broadcast bytes
+As you can see, we limit the cap ratio of broadcast bytes that affects the parallelism calculation to ***broadcastCapRatio***. That is, the non-broadcast bytes processed by each task is at least ***(1-broadcastCapRatio) * V***. If not so,when the total broadcast bytes is close to ***V***, even if the total non-broadcast bytes is very small, it may cause a large parallelism, which is unnecessary and may lead to resource waste and large task deployment overhead.
+
+Generally, the broadcast dataset is usually relatively small against the other co-processed datasets. Therefore, we set the cap ratio to 0.5 by default because we usually expect the broadcast bytes to be smaller than non-broadcast bytes. The value is hard coded in the first version, and we may make it configurable later.
+
+
+### Normalize the parallelism to the closest power of 2
+The normalize is to avoid introducing data skew. To better understand this section, we suggest you read the [Flexible subpartition mapping](#flexible-subpartition-mapping) section first.
+
+Taking Fig. 4 (b) as example, A1/A2 produces 4 subpartitions, and the decided parallelism of B is 3. In this case, B1 will consume 1 subpartition, B2 will consume 1 subpartition, and B3 will consume 2 subpartitions. We assume that subpartitions have the same amount of data, which means B3 will consume twice the data of other tasks, data skew is introduced due to the subpartition mapping.
+
+To solve this problem, we need to make the subpartitions evenly consumed by downstream tasks, which means the number of subpartitions should be a multiple of the number of downstream tasks. For simplicity, we require the user-specified max parallelism to be 2<sup>N</sup>, and then adjust the calculated parallelism to a closest 2<sup>M</sup> (M <= N), so that we can guarantee that subpartitions will be evenly consumed by downstream tasks.
+
+Note that this is a temporary solution, the ultimate solution would be the [Auto-rebalancing of workloads](#auto-rebalancing-of-workloads), which may come soon.
+
+## Build up execution graph dynamically
+Before Flink 1.15, the execution graph was fully built in a static way before starting scheduling. To allow parallelisms of job vertices to be decided lazily, the execution graph must be able to be built up dynamically.
+
+### Create execution vertices and execution edges lazily
+A dynamic execution graph means that a Flink job starts with an empty execution topology, and then gradually attaches vertices during job execution, as shown in Fig. 2.
+
+The execution topology consists of execution vertices and execution edges. The execution vertices will be created and attached to the execution topology only when:
+
+- The parallelism of the corresponding job vertex is decided.
+- All upstream execution vertices are already attached.
+
+A decided parallelism of the job vertex is needed so that Flink knows how many execution vertices should be created. Upstream execution vertices need to be attached first so that Flink can connect the newly created execution vertices to the upstream vertices with execution edges. 
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/2-dynamic-graph.png" width="90%"/>
+<br/>
+Fig. 2 - Build up execution graph dynamically
+</center>
+
+<br/>
+
+### Flexible subpartition mapping
+Before Flink 1.15, when deploying a task, Flink needs to know the parallelism of its consumer job vertex. This is because consumer vertex parallelism is used to decide the number of subpartitions produced by each upstream task. The reason behind that is, for one result partition, different subpartitions serve different consumer execution vertices. More specifically, one consumer execution vertex only consumes data from subpartition with the same index. 
+
+Taking Fig. 3 as example, parallelism of the consumer B is 2, so the result partition produced by A1/A2 should contain 2 subpartitions, the subpartition with index 0 serves B1, and the subpartition with index 1 serves B2.
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/3-static-graph-subpartition-mapping.png" width="30%"/>
+<br/>
+Fig. 3 - How subpartitions serve consumer execution vertices with static execution graph
+</center>
+
+<br/>
+
+But obviously, this doesn't work for dynamic graphs, because when a job vertex is deployed, the parallelism of its consumer job vertices may not have been decided yet. To enable Flink to work in this case, we need a way to allow a job vertex to run without knowing the parallelism of its consumer job vertices(or rather, we need a way to allow execution vertices to run without knowing the number of their consumer execution vertices). 
+
+To achieve this goal, we can set the number of subpartitions to be the max parallelism of the consumer job vertex. Then when the consumer execution vertices are deployed, they should be assigned with a subpartition range to consume. Suppose N is the number of consumer execution vertices and P is the number of subpartitions. For the kth consumer execution vertex, the consumed subpartition range should be:
+
+<center>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/range-formula.png" width="55%"/>
+</center>
+
+Taking Fig. 4 as example, the max parallelism of B is 4, so A1/A2 have 4 subpartitions. And then if the decided parallelism of B is 2, then the subpartitions mapping will be Fig. 4 (a), if the decided parallelism of B is 3, then the subpartitions mapping will be  Fig. 4 (b).
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/4-dynamic-graph-subpartition-mapping.png" width="75%"/>
+<br/>
+Fig. 4 - How subpartitions serve consumer execution vertices with dynamic graph
+</center>
+
+<br/>
+
+## Update and schedule the dynamic execution graph
+The adaptive batch scheduler scheduling is similar to the default scheduler, the only difference is that an empty dynamic execution graph will be generated initially and vertices will be attached later. Before handling any scheduling event, the scheduler will try deciding the parallelisms for job vertices, and then initialize them to generate execution vertices, connecting execution edges, and update the execution graph.
+
+The scheduler will try to decide the parallelism for all job vertices before each scheduling, and the parallelism decision will be made for each job vertex in topological order:

Review Comment:
   I change it to `before handling scheduling event`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-web] gaoyunhaii commented on pull request #546: Add blogs for FLIP-187 adaptive batch scheduler

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on PR #546:
URL: https://github.com/apache/flink-web/pull/546#issuecomment-1156033959

   Thanks @wanglijie95 for the update! I have only one more comment. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-web] wanglijie95 commented on a diff in pull request #546: Add blogs for FLIP-187 adaptive batch scheduler

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on code in PR #546:
URL: https://github.com/apache/flink-web/pull/546#discussion_r896686761


##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,198 @@
+---
+layout: post
+title: "Automatically decide parallelism for Flink batch jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+  name: "Lijie Wang"
+- Zhu Zhu:
+  name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not easy work for many users. For batch jobs, a small parallelism may result in long execution time and big failover regression. While an unnecessary large parallelism may result in resource waste and more overhead cost in task deployment and network shuffling. 
+
+To decide a proper parallelism, one needs to know how much data each operator needs to process. However, It can be hard to predict data volume to be processed by a job because it can be different everyday. And it can be harder or even impossible (due to complex operators or UDFs) to predict data volume to be processed by each operator.
+
+To solve this problem, we introduced the adaptive batch scheduler in Flink 1.15. The adaptive batch scheduler can automatically decide parallelism for an operator according to the size of its consumed datasets. Here are the benefits the adaptive batch scheduler can bring:
+
+1. Batch job users can be relieved from parallelism tuning.
+2. Parallelism tuning is fine grained considering different operators. This is particularly beneficial for SQL jobs which can only be set with a global parallelism previously.
+3. Parallelism tuning can better fit consumed datasets which have a varying volume size every day.
+
+# Get Started
+
+To automatically decide parallelism for operators, you need to:
+
+1. Configure to use adaptive batch scheduler.
+2. Set the parallelism of operators to -1.
+
+
+## Configure to use adaptive batch scheduler
+
+To use adaptive batch scheduler, you need to set configurations as below:
+
+- Set `jobmanager.scheduler: AdaptiveBatch`.
+- Leave the [execution.batch-shuffle-mode]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#execution-batch-shuffle-mode) unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value). Currently, the adaptive batch scheduler only supports batch jobs whose shuffle mode is `ALL-EXCHANGES-BLOCKING`.
+
+In addition, there are several related configuration options to control the upper bounds and lower bounds of tuned parallelisms, to specify expected data volume to process by each operator, and to specify the default parallelism of sources. More details can be found in the [feature documentation page]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/elastic_scaling/#configure-to-use-adaptive-batch-scheduler).
+
+## Set the parallelism of operators to -1
+
+The adaptive batch scheduler will only automatically decide parallelism for operators whose parallelism is not set (which means the parallelism is -1).To leave parallelism unset, you should configure as follows:
+
+- Set `parallelism.default: -1`
+- Set `table.exec.resource.default-parallelism: -1` in SQL jobs.
+- Don't call `setParallelism()` for operators in DataStream/DataSet jobs.
+- Don't call `setParallelism()` on `StreamExecutionEnvironment/ExecutionEnvironment` in DataStream/DataSet jobs.
+
+
+# Implementation Details
+
+In this section, we will elaborate the details of the implementation. To automatically decide parallelism of operators, we introduced the following changes:
+
+1. Enables the scheduler to collect sizes of finished datasets.
+2. Introduces a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results.
+3. Enables to dynamically build up ExecutionGraph to allow the parallelisms of job vertices to be decided lazily. The execution graph starts with an empty execution topology and then gradually attaches the vertices during job execution.
+4. Introduces the adaptive batch scheduler to update and schedule the dynamic execution graph.
+
+The details will be introduced in the following sections.
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/1-overall-structure.png" width="60%"/>
+<br/>
+Fig. 1 - The overall structure of automatically deciding parallelism
+</center>
+
+<br/>
+
+## Collect sizes of consumed datasets
+
+The adaptive batch scheduler decides the parallelism of vertices by the size of input results, so the scheduler needs to know the sizes of result partitions produced by tasks. We introduced a numBytesProduced counter to record the size of each produced result partition, the accumulated result of the counter will be sent to the scheduler when tasks finish. 
+
+## Decide proper parallelisms for job vertices
+
+We introduced a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results. The computation algorithm is as follows:
+Suppose:
+
+- ***V*** is the bytes of data the user expects to be processed by each task.
+- ***totalBytes<sub>non-broadcast</sub>*** is the sum of the non-broadcast result sizes consumed by this job vertex.
+- ***totalBytes<sub>broadcast</sub>*** is the sum of the broadcast result sizes consumed by this job vertex.
+- ***broadcastCapRatio*** is the cap ratio of broadcast bytes that affects the parallelism calculation.
+- ***normalize(***x***)*** is a function that round ***x*** to the closest power of 2.
+
+then the parallelism of this job vertex ***P*** will be:
+<center>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/parallelism-formula.png" width="60%"/>
+</center>
+
+Note that we introduced two special treatment in the above formula :
+
+- [Limit the cap ratio of broadcast bytes](#limit-the-cap-ratio-of-broadcast-bytes)
+- [Normalize the parallelism to the closest power of 2](#normalize-the-parallelism-to-the-closest-power-of-2)
+
+However, the above formula cannot be used to decide the parallelism of the source vertices, because the source vertices have no input. To solve it, we introduced the configuration option `jobmanager.adaptive-batch-scheduler.default-source-parallelism` to allow users to manually configure the parallelism of source vertices. Note that not all data sources need this option, because some data sources can automatically infer parallelism (For example, HiveTableSource, see [HiveParallelismInference](https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveParallelismInference.java) for more detail). For these sources, it is recommended to decide parallelism by themselves.
+
+### Limit the cap ratio of broadcast bytes
+As you can see, we limit the cap ratio of broadcast bytes that affects the parallelism calculation to ***broadcastCapRatio***. That is, the non-broadcast bytes processed by each task is at least ***(1-broadcastCapRatio) * V***. If not so,when the total broadcast bytes is close to ***V***, even if the total non-broadcast bytes is very small, it may cause a large parallelism, which is unnecessary and may lead to resource waste and large task deployment overhead.
+
+Generally, the broadcast dataset is usually relatively small against the other co-processed datasets. Therefore, we set the cap ratio to 0.5 by default because we usually expect the broadcast bytes to be smaller than non-broadcast bytes. The value is hard coded in the first version, and we may make it configurable later.
+
+
+### Normalize the parallelism to the closest power of 2
+The normalize is to avoid introducing data skew. To better understand this section, we suggest you read the [Flexible subpartition mapping](#flexible-subpartition-mapping) section first.
+
+Taking Fig. 4 (b) as example, A1/A2 produces 4 subpartitions, and the decided parallelism of B is 3. In this case, B1 will consume 1 subpartition, B2 will consume 1 subpartition, and B3 will consume 2 subpartitions. We assume that subpartitions have the same amount of data, which means B3 will consume twice the data of other tasks, data skew is introduced due to the subpartition mapping.
+
+To solve this problem, we need to make the subpartitions evenly consumed by downstream tasks, which means the number of subpartitions should be a multiple of the number of downstream tasks. For simplicity, we require the user-specified max parallelism to be 2<sup>N</sup>, and then adjust the calculated parallelism to a closest 2<sup>M</sup> (M <= N), so that we can guarantee that subpartitions will be evenly consumed by downstream tasks.
+
+Note that this is a temporary solution, the ultimate solution would be the [Auto-rebalancing of workloads](#auto-rebalancing-of-workloads), which may come soon.
+
+## Build up execution graph dynamically
+Before Flink 1.15, the execution graph was fully built in a static way before starting scheduling. To allow parallelisms of job vertices to be decided lazily, the execution graph must be able to be built up dynamically.
+
+### Create execution vertices and execution edges lazily
+A dynamic execution graph means that a Flink job starts with an empty execution topology, and then gradually attaches vertices during job execution, as shown in Fig. 2.
+
+The execution topology consists of execution vertices and execution edges. The execution vertices will be created and attached to the execution topology only when:
+
+- The parallelism of the corresponding job vertex is decided.
+- All upstream execution vertices are already attached.
+
+A decided parallelism of the job vertex is needed so that Flink knows how many execution vertices should be created. Upstream execution vertices need to be attached first so that Flink can connect the newly created execution vertices to the upstream vertices with execution edges. 
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/2-dynamic-graph.png" width="90%"/>
+<br/>
+Fig. 2 - Build up execution graph dynamically
+</center>
+
+<br/>
+
+### Flexible subpartition mapping
+Before Flink 1.15, when deploying a task, Flink needs to know the parallelism of its consumer job vertex. This is because consumer vertex parallelism is used to decide the number of subpartitions produced by each upstream task. The reason behind that is, for one result partition, different subpartitions serve different consumer execution vertices. More specifically, one consumer execution vertex only consumes data from subpartition with the same index. 
+
+Taking Fig. 3 as example, parallelism of the consumer B is 2, so the result partition produced by A1/A2 should contain 2 subpartitions, the subpartition with index 0 serves B1, and the subpartition with index 1 serves B2.
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/3-static-graph-subpartition-mapping.png" width="30%"/>
+<br/>
+Fig. 3 - How subpartitions serve consumer execution vertices with static execution graph
+</center>
+
+<br/>
+
+But obviously, this doesn't work for dynamic graphs, because when a job vertex is deployed, the parallelism of its consumer job vertices may not have been decided yet. To enable Flink to work in this case, we need a way to allow a job vertex to run without knowing the parallelism of its consumer job vertices(or rather, we need a way to allow execution vertices to run without knowing the number of their consumer execution vertices). 

Review Comment:
   I removed `(or rather ...)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-web] zhuzhurk commented on a diff in pull request #546: Add blogs for FLIP-187 adaptive batch scheduler

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #546:
URL: https://github.com/apache/flink-web/pull/546#discussion_r899697425


##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -1,6 +1,6 @@
 ---
 layout: post
-title: "Automatically decide parallelism for Flink batch jobs"
+title: "Adaptive Batch Scheduler: Automatically Decide Parallelism for Flink Batch Jobs"

Review Comment:
   for -> of



##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -1,6 +1,6 @@
 ---
 layout: post
-title: "Automatically decide parallelism for Flink batch jobs"
+title: "Adaptive Batch Scheduler: Automatically Decide Parallelism for Flink Batch Jobs"

Review Comment:
   you can search for "parallelism for" and "parallelisms for" for other occurrences.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-web] wanglijie95 commented on a diff in pull request #546: Add blogs for FLIP-187 adaptive batch scheduler

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on code in PR #546:
URL: https://github.com/apache/flink-web/pull/546#discussion_r896636772


##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,198 @@
+---
+layout: post
+title: "Automatically decide parallelism for Flink batch jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+  name: "Lijie Wang"
+- Zhu Zhu:
+  name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not easy work for many users. For batch jobs, a small parallelism may result in long execution time and big failover regression. While an unnecessary large parallelism may result in resource waste and more overhead cost in task deployment and network shuffling. 
+
+To decide a proper parallelism, one needs to know how much data each operator needs to process. However, It can be hard to predict data volume to be processed by a job because it can be different everyday. And it can be harder or even impossible (due to complex operators or UDFs) to predict data volume to be processed by each operator.
+
+To solve this problem, we introduced the adaptive batch scheduler in Flink 1.15. The adaptive batch scheduler can automatically decide parallelism for an operator according to the size of its consumed datasets. Here are the benefits the adaptive batch scheduler can bring:
+
+1. Batch job users can be relieved from parallelism tuning.
+2. Parallelism tuning is fine grained considering different operators. This is particularly beneficial for SQL jobs which can only be set with a global parallelism previously.
+3. Parallelism tuning can better fit consumed datasets which have a varying volume size every day.
+
+# Get Started
+
+To automatically decide parallelism for operators, you need to:
+
+1. Configure to use adaptive batch scheduler.
+2. Set the parallelism of operators to -1.
+
+
+## Configure to use adaptive batch scheduler
+
+To use adaptive batch scheduler, you need to set configurations as below:
+
+- Set `jobmanager.scheduler: AdaptiveBatch`.
+- Leave the [execution.batch-shuffle-mode]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#execution-batch-shuffle-mode) unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value). Currently, the adaptive batch scheduler only supports batch jobs whose shuffle mode is `ALL-EXCHANGES-BLOCKING`.
+
+In addition, there are several related configuration options to control the upper bounds and lower bounds of tuned parallelisms, to specify expected data volume to process by each operator, and to specify the default parallelism of sources. More details can be found in the [feature documentation page]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/elastic_scaling/#configure-to-use-adaptive-batch-scheduler).
+
+## Set the parallelism of operators to -1
+
+The adaptive batch scheduler will only automatically decide parallelism for operators whose parallelism is not set (which means the parallelism is -1).To leave parallelism unset, you should configure as follows:
+
+- Set `parallelism.default: -1`
+- Set `table.exec.resource.default-parallelism: -1` in SQL jobs.
+- Don't call `setParallelism()` for operators in DataStream/DataSet jobs.
+- Don't call `setParallelism()` on `StreamExecutionEnvironment/ExecutionEnvironment` in DataStream/DataSet jobs.
+
+
+# Implementation Details
+
+In this section, we will elaborate the details of the implementation. To automatically decide parallelism of operators, we introduced the following changes:
+
+1. Enables the scheduler to collect sizes of finished datasets.
+2. Introduces a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results.
+3. Enables to dynamically build up ExecutionGraph to allow the parallelisms of job vertices to be decided lazily. The execution graph starts with an empty execution topology and then gradually attaches the vertices during job execution.
+4. Introduces the adaptive batch scheduler to update and schedule the dynamic execution graph.
+
+The details will be introduced in the following sections.
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/1-overall-structure.png" width="60%"/>
+<br/>
+Fig. 1 - The overall structure of automatically deciding parallelism
+</center>
+
+<br/>
+
+## Collect sizes of consumed datasets
+
+The adaptive batch scheduler decides the parallelism of vertices by the size of input results, so the scheduler needs to know the sizes of result partitions produced by tasks. We introduced a numBytesProduced counter to record the size of each produced result partition, the accumulated result of the counter will be sent to the scheduler when tasks finish. 
+
+## Decide proper parallelisms for job vertices
+
+We introduced a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results. The computation algorithm is as follows:
+Suppose:

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-web] gaoyunhaii commented on pull request #546: Add blogs for FLIP-187 adaptive batch scheduler

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on PR #546:
URL: https://github.com/apache/flink-web/pull/546#issuecomment-1157174790

   Thanks @wanglijie95 for the update! LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-web] gaoyunhaii commented on a diff in pull request #546: Add blogs for FLIP-187 adaptive batch scheduler

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on code in PR #546:
URL: https://github.com/apache/flink-web/pull/546#discussion_r894264806


##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,198 @@
+---
+layout: post
+title: "Automatically decide parallelism for Flink batch jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+  name: "Lijie Wang"
+- Zhu Zhu:
+  name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not easy work for many users. For batch jobs, a small parallelism may result in long execution time and big failover regression. While an unnecessary large parallelism may result in resource waste and more overhead cost in task deployment and network shuffling. 

Review Comment:
   `is not an easy work` ?



##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,198 @@
+---
+layout: post
+title: "Automatically decide parallelism for Flink batch jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+  name: "Lijie Wang"
+- Zhu Zhu:
+  name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not easy work for many users. For batch jobs, a small parallelism may result in long execution time and big failover regression. While an unnecessary large parallelism may result in resource waste and more overhead cost in task deployment and network shuffling. 
+
+To decide a proper parallelism, one needs to know how much data each operator needs to process. However, It can be hard to predict data volume to be processed by a job because it can be different everyday. And it can be harder or even impossible (due to complex operators or UDFs) to predict data volume to be processed by each operator.
+
+To solve this problem, we introduced the adaptive batch scheduler in Flink 1.15. The adaptive batch scheduler can automatically decide parallelism for an operator according to the size of its consumed datasets. Here are the benefits the adaptive batch scheduler can bring:
+
+1. Batch job users can be relieved from parallelism tuning.
+2. Parallelism tuning is fine grained considering different operators. This is particularly beneficial for SQL jobs which can only be set with a global parallelism previously.
+3. Parallelism tuning can better fit consumed datasets which have a varying volume size every day.
+
+# Get Started
+
+To automatically decide parallelism for operators, you need to:
+
+1. Configure to use adaptive batch scheduler.
+2. Set the parallelism of operators to -1.
+
+
+## Configure to use adaptive batch scheduler
+
+To use adaptive batch scheduler, you need to set configurations as below:
+
+- Set `jobmanager.scheduler: AdaptiveBatch`.
+- Leave the [execution.batch-shuffle-mode]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#execution-batch-shuffle-mode) unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value). Currently, the adaptive batch scheduler only supports batch jobs whose shuffle mode is `ALL-EXCHANGES-BLOCKING`.
+
+In addition, there are several related configuration options to control the upper bounds and lower bounds of tuned parallelisms, to specify expected data volume to process by each operator, and to specify the default parallelism of sources. More details can be found in the [feature documentation page]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/elastic_scaling/#configure-to-use-adaptive-batch-scheduler).
+
+## Set the parallelism of operators to -1
+
+The adaptive batch scheduler will only automatically decide parallelism for operators whose parallelism is not set (which means the parallelism is -1).To leave parallelism unset, you should configure as follows:

Review Comment:
   `will only automatically decide` -> `only automatically decides`?
   Add a space after `.` 



##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,198 @@
+---
+layout: post
+title: "Automatically decide parallelism for Flink batch jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+  name: "Lijie Wang"
+- Zhu Zhu:
+  name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not easy work for many users. For batch jobs, a small parallelism may result in long execution time and big failover regression. While an unnecessary large parallelism may result in resource waste and more overhead cost in task deployment and network shuffling. 
+
+To decide a proper parallelism, one needs to know how much data each operator needs to process. However, It can be hard to predict data volume to be processed by a job because it can be different everyday. And it can be harder or even impossible (due to complex operators or UDFs) to predict data volume to be processed by each operator.
+
+To solve this problem, we introduced the adaptive batch scheduler in Flink 1.15. The adaptive batch scheduler can automatically decide parallelism for an operator according to the size of its consumed datasets. Here are the benefits the adaptive batch scheduler can bring:
+
+1. Batch job users can be relieved from parallelism tuning.
+2. Parallelism tuning is fine grained considering different operators. This is particularly beneficial for SQL jobs which can only be set with a global parallelism previously.
+3. Parallelism tuning can better fit consumed datasets which have a varying volume size every day.
+
+# Get Started
+
+To automatically decide parallelism for operators, you need to:
+
+1. Configure to use adaptive batch scheduler.
+2. Set the parallelism of operators to -1.
+
+
+## Configure to use adaptive batch scheduler
+
+To use adaptive batch scheduler, you need to set configurations as below:
+
+- Set `jobmanager.scheduler: AdaptiveBatch`.
+- Leave the [execution.batch-shuffle-mode]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#execution-batch-shuffle-mode) unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value). Currently, the adaptive batch scheduler only supports batch jobs whose shuffle mode is `ALL-EXCHANGES-BLOCKING`.
+
+In addition, there are several related configuration options to control the upper bounds and lower bounds of tuned parallelisms, to specify expected data volume to process by each operator, and to specify the default parallelism of sources. More details can be found in the [feature documentation page]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/elastic_scaling/#configure-to-use-adaptive-batch-scheduler).
+
+## Set the parallelism of operators to -1
+
+The adaptive batch scheduler will only automatically decide parallelism for operators whose parallelism is not set (which means the parallelism is -1).To leave parallelism unset, you should configure as follows:
+
+- Set `parallelism.default: -1`
+- Set `table.exec.resource.default-parallelism: -1` in SQL jobs.
+- Don't call `setParallelism()` for operators in DataStream/DataSet jobs.
+- Don't call `setParallelism()` on `StreamExecutionEnvironment/ExecutionEnvironment` in DataStream/DataSet jobs.
+
+
+# Implementation Details
+
+In this section, we will elaborate the details of the implementation. To automatically decide parallelism of operators, we introduced the following changes:
+
+1. Enables the scheduler to collect sizes of finished datasets.

Review Comment:
   Should here be `Enabled` ? Also for the following items.



##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,198 @@
+---
+layout: post
+title: "Automatically decide parallelism for Flink batch jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+  name: "Lijie Wang"
+- Zhu Zhu:
+  name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not easy work for many users. For batch jobs, a small parallelism may result in long execution time and big failover regression. While an unnecessary large parallelism may result in resource waste and more overhead cost in task deployment and network shuffling. 
+
+To decide a proper parallelism, one needs to know how much data each operator needs to process. However, It can be hard to predict data volume to be processed by a job because it can be different everyday. And it can be harder or even impossible (due to complex operators or UDFs) to predict data volume to be processed by each operator.
+
+To solve this problem, we introduced the adaptive batch scheduler in Flink 1.15. The adaptive batch scheduler can automatically decide parallelism for an operator according to the size of its consumed datasets. Here are the benefits the adaptive batch scheduler can bring:
+
+1. Batch job users can be relieved from parallelism tuning.
+2. Parallelism tuning is fine grained considering different operators. This is particularly beneficial for SQL jobs which can only be set with a global parallelism previously.
+3. Parallelism tuning can better fit consumed datasets which have a varying volume size every day.
+
+# Get Started
+
+To automatically decide parallelism for operators, you need to:
+
+1. Configure to use adaptive batch scheduler.
+2. Set the parallelism of operators to -1.
+
+
+## Configure to use adaptive batch scheduler
+
+To use adaptive batch scheduler, you need to set configurations as below:
+
+- Set `jobmanager.scheduler: AdaptiveBatch`.
+- Leave the [execution.batch-shuffle-mode]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#execution-batch-shuffle-mode) unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value). Currently, the adaptive batch scheduler only supports batch jobs whose shuffle mode is `ALL-EXCHANGES-BLOCKING`.
+
+In addition, there are several related configuration options to control the upper bounds and lower bounds of tuned parallelisms, to specify expected data volume to process by each operator, and to specify the default parallelism of sources. More details can be found in the [feature documentation page]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/elastic_scaling/#configure-to-use-adaptive-batch-scheduler).
+
+## Set the parallelism of operators to -1
+
+The adaptive batch scheduler will only automatically decide parallelism for operators whose parallelism is not set (which means the parallelism is -1).To leave parallelism unset, you should configure as follows:
+
+- Set `parallelism.default: -1`
+- Set `table.exec.resource.default-parallelism: -1` in SQL jobs.
+- Don't call `setParallelism()` for operators in DataStream/DataSet jobs.
+- Don't call `setParallelism()` on `StreamExecutionEnvironment/ExecutionEnvironment` in DataStream/DataSet jobs.
+
+
+# Implementation Details
+
+In this section, we will elaborate the details of the implementation. To automatically decide parallelism of operators, we introduced the following changes:
+
+1. Enables the scheduler to collect sizes of finished datasets.
+2. Introduces a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results.
+3. Enables to dynamically build up ExecutionGraph to allow the parallelisms of job vertices to be decided lazily. The execution graph starts with an empty execution topology and then gradually attaches the vertices during job execution.
+4. Introduces the adaptive batch scheduler to update and schedule the dynamic execution graph.
+
+The details will be introduced in the following sections.
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/1-overall-structure.png" width="60%"/>
+<br/>
+Fig. 1 - The overall structure of automatically deciding parallelism
+</center>
+
+<br/>
+
+## Collect sizes of consumed datasets
+
+The adaptive batch scheduler decides the parallelism of vertices by the size of input results, so the scheduler needs to know the sizes of result partitions produced by tasks. We introduced a numBytesProduced counter to record the size of each produced result partition, the accumulated result of the counter will be sent to the scheduler when tasks finish. 
+
+## Decide proper parallelisms for job vertices
+
+We introduced a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results. The computation algorithm is as follows:
+Suppose:
+
+- ***V*** is the bytes of data the user expects to be processed by each task.
+- ***totalBytes<sub>non-broadcast</sub>*** is the sum of the non-broadcast result sizes consumed by this job vertex.
+- ***totalBytes<sub>broadcast</sub>*** is the sum of the broadcast result sizes consumed by this job vertex.
+- ***broadcastCapRatio*** is the cap ratio of broadcast bytes that affects the parallelism calculation.
+- ***normalize(***x***)*** is a function that round ***x*** to the closest power of 2.
+
+then the parallelism of this job vertex ***P*** will be:
+<center>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/parallelism-formula.png" width="60%"/>
+</center>
+
+Note that we introduced two special treatment in the above formula :
+
+- [Limit the cap ratio of broadcast bytes](#limit-the-cap-ratio-of-broadcast-bytes)
+- [Normalize the parallelism to the closest power of 2](#normalize-the-parallelism-to-the-closest-power-of-2)
+
+However, the above formula cannot be used to decide the parallelism of the source vertices, because the source vertices have no input. To solve it, we introduced the configuration option `jobmanager.adaptive-batch-scheduler.default-source-parallelism` to allow users to manually configure the parallelism of source vertices. Note that not all data sources need this option, because some data sources can automatically infer parallelism (For example, HiveTableSource, see [HiveParallelismInference](https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveParallelismInference.java) for more detail). For these sources, it is recommended to decide parallelism by themselves.
+
+### Limit the cap ratio of broadcast bytes
+As you can see, we limit the cap ratio of broadcast bytes that affects the parallelism calculation to ***broadcastCapRatio***. That is, the non-broadcast bytes processed by each task is at least ***(1-broadcastCapRatio) * V***. If not so,when the total broadcast bytes is close to ***V***, even if the total non-broadcast bytes is very small, it may cause a large parallelism, which is unnecessary and may lead to resource waste and large task deployment overhead.
+
+Generally, the broadcast dataset is usually relatively small against the other co-processed datasets. Therefore, we set the cap ratio to 0.5 by default because we usually expect the broadcast bytes to be smaller than non-broadcast bytes. The value is hard coded in the first version, and we may make it configurable later.
+
+
+### Normalize the parallelism to the closest power of 2
+The normalize is to avoid introducing data skew. To better understand this section, we suggest you read the [Flexible subpartition mapping](#flexible-subpartition-mapping) section first.
+
+Taking Fig. 4 (b) as example, A1/A2 produces 4 subpartitions, and the decided parallelism of B is 3. In this case, B1 will consume 1 subpartition, B2 will consume 1 subpartition, and B3 will consume 2 subpartitions. We assume that subpartitions have the same amount of data, which means B3 will consume twice the data of other tasks, data skew is introduced due to the subpartition mapping.
+
+To solve this problem, we need to make the subpartitions evenly consumed by downstream tasks, which means the number of subpartitions should be a multiple of the number of downstream tasks. For simplicity, we require the user-specified max parallelism to be 2<sup>N</sup>, and then adjust the calculated parallelism to a closest 2<sup>M</sup> (M <= N), so that we can guarantee that subpartitions will be evenly consumed by downstream tasks.
+
+Note that this is a temporary solution, the ultimate solution would be the [Auto-rebalancing of workloads](#auto-rebalancing-of-workloads), which may come soon.
+
+## Build up execution graph dynamically
+Before Flink 1.15, the execution graph was fully built in a static way before starting scheduling. To allow parallelisms of job vertices to be decided lazily, the execution graph must be able to be built up dynamically.
+
+### Create execution vertices and execution edges lazily
+A dynamic execution graph means that a Flink job starts with an empty execution topology, and then gradually attaches vertices during job execution, as shown in Fig. 2.
+
+The execution topology consists of execution vertices and execution edges. The execution vertices will be created and attached to the execution topology only when:
+
+- The parallelism of the corresponding job vertex is decided.
+- All upstream execution vertices are already attached.
+
+A decided parallelism of the job vertex is needed so that Flink knows how many execution vertices should be created. Upstream execution vertices need to be attached first so that Flink can connect the newly created execution vertices to the upstream vertices with execution edges. 
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/2-dynamic-graph.png" width="90%"/>
+<br/>
+Fig. 2 - Build up execution graph dynamically
+</center>
+
+<br/>
+
+### Flexible subpartition mapping
+Before Flink 1.15, when deploying a task, Flink needs to know the parallelism of its consumer job vertex. This is because consumer vertex parallelism is used to decide the number of subpartitions produced by each upstream task. The reason behind that is, for one result partition, different subpartitions serve different consumer execution vertices. More specifically, one consumer execution vertex only consumes data from subpartition with the same index. 
+
+Taking Fig. 3 as example, parallelism of the consumer B is 2, so the result partition produced by A1/A2 should contain 2 subpartitions, the subpartition with index 0 serves B1, and the subpartition with index 1 serves B2.
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/3-static-graph-subpartition-mapping.png" width="30%"/>
+<br/>
+Fig. 3 - How subpartitions serve consumer execution vertices with static execution graph
+</center>
+
+<br/>
+
+But obviously, this doesn't work for dynamic graphs, because when a job vertex is deployed, the parallelism of its consumer job vertices may not have been decided yet. To enable Flink to work in this case, we need a way to allow a job vertex to run without knowing the parallelism of its consumer job vertices(or rather, we need a way to allow execution vertices to run without knowing the number of their consumer execution vertices). 

Review Comment:
   Add space before `(`?
   `the number of their consumer execution vertices` -> `the parallelism of their consumer execution vertices`?



##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,198 @@
+---
+layout: post
+title: "Automatically decide parallelism for Flink batch jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+  name: "Lijie Wang"
+- Zhu Zhu:
+  name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not easy work for many users. For batch jobs, a small parallelism may result in long execution time and big failover regression. While an unnecessary large parallelism may result in resource waste and more overhead cost in task deployment and network shuffling. 
+
+To decide a proper parallelism, one needs to know how much data each operator needs to process. However, It can be hard to predict data volume to be processed by a job because it can be different everyday. And it can be harder or even impossible (due to complex operators or UDFs) to predict data volume to be processed by each operator.
+
+To solve this problem, we introduced the adaptive batch scheduler in Flink 1.15. The adaptive batch scheduler can automatically decide parallelism for an operator according to the size of its consumed datasets. Here are the benefits the adaptive batch scheduler can bring:
+
+1. Batch job users can be relieved from parallelism tuning.
+2. Parallelism tuning is fine grained considering different operators. This is particularly beneficial for SQL jobs which can only be set with a global parallelism previously.
+3. Parallelism tuning can better fit consumed datasets which have a varying volume size every day.
+
+# Get Started
+
+To automatically decide parallelism for operators, you need to:
+
+1. Configure to use adaptive batch scheduler.
+2. Set the parallelism of operators to -1.
+
+
+## Configure to use adaptive batch scheduler
+
+To use adaptive batch scheduler, you need to set configurations as below:
+
+- Set `jobmanager.scheduler: AdaptiveBatch`.
+- Leave the [execution.batch-shuffle-mode]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#execution-batch-shuffle-mode) unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value). Currently, the adaptive batch scheduler only supports batch jobs whose shuffle mode is `ALL-EXCHANGES-BLOCKING`.
+
+In addition, there are several related configuration options to control the upper bounds and lower bounds of tuned parallelisms, to specify expected data volume to process by each operator, and to specify the default parallelism of sources. More details can be found in the [feature documentation page]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/elastic_scaling/#configure-to-use-adaptive-batch-scheduler).
+
+## Set the parallelism of operators to -1
+
+The adaptive batch scheduler will only automatically decide parallelism for operators whose parallelism is not set (which means the parallelism is -1).To leave parallelism unset, you should configure as follows:
+
+- Set `parallelism.default: -1`
+- Set `table.exec.resource.default-parallelism: -1` in SQL jobs.
+- Don't call `setParallelism()` for operators in DataStream/DataSet jobs.
+- Don't call `setParallelism()` on `StreamExecutionEnvironment/ExecutionEnvironment` in DataStream/DataSet jobs.
+
+
+# Implementation Details
+
+In this section, we will elaborate the details of the implementation. To automatically decide parallelism of operators, we introduced the following changes:
+
+1. Enables the scheduler to collect sizes of finished datasets.
+2. Introduces a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results.
+3. Enables to dynamically build up ExecutionGraph to allow the parallelisms of job vertices to be decided lazily. The execution graph starts with an empty execution topology and then gradually attaches the vertices during job execution.
+4. Introduces the adaptive batch scheduler to update and schedule the dynamic execution graph.
+
+The details will be introduced in the following sections.
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/1-overall-structure.png" width="60%"/>
+<br/>
+Fig. 1 - The overall structure of automatically deciding parallelism
+</center>
+
+<br/>
+
+## Collect sizes of consumed datasets
+
+The adaptive batch scheduler decides the parallelism of vertices by the size of input results, so the scheduler needs to know the sizes of result partitions produced by tasks. We introduced a numBytesProduced counter to record the size of each produced result partition, the accumulated result of the counter will be sent to the scheduler when tasks finish. 
+
+## Decide proper parallelisms for job vertices
+
+We introduced a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results. The computation algorithm is as follows:
+Suppose:
+
+- ***V*** is the bytes of data the user expects to be processed by each task.
+- ***totalBytes<sub>non-broadcast</sub>*** is the sum of the non-broadcast result sizes consumed by this job vertex.
+- ***totalBytes<sub>broadcast</sub>*** is the sum of the broadcast result sizes consumed by this job vertex.
+- ***broadcastCapRatio*** is the cap ratio of broadcast bytes that affects the parallelism calculation.
+- ***normalize(***x***)*** is a function that round ***x*** to the closest power of 2.
+
+then the parallelism of this job vertex ***P*** will be:
+<center>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/parallelism-formula.png" width="60%"/>
+</center>
+
+Note that we introduced two special treatment in the above formula :
+
+- [Limit the cap ratio of broadcast bytes](#limit-the-cap-ratio-of-broadcast-bytes)
+- [Normalize the parallelism to the closest power of 2](#normalize-the-parallelism-to-the-closest-power-of-2)
+
+However, the above formula cannot be used to decide the parallelism of the source vertices, because the source vertices have no input. To solve it, we introduced the configuration option `jobmanager.adaptive-batch-scheduler.default-source-parallelism` to allow users to manually configure the parallelism of source vertices. Note that not all data sources need this option, because some data sources can automatically infer parallelism (For example, HiveTableSource, see [HiveParallelismInference](https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveParallelismInference.java) for more detail). For these sources, it is recommended to decide parallelism by themselves.
+
+### Limit the cap ratio of broadcast bytes
+As you can see, we limit the cap ratio of broadcast bytes that affects the parallelism calculation to ***broadcastCapRatio***. That is, the non-broadcast bytes processed by each task is at least ***(1-broadcastCapRatio) * V***. If not so,when the total broadcast bytes is close to ***V***, even if the total non-broadcast bytes is very small, it may cause a large parallelism, which is unnecessary and may lead to resource waste and large task deployment overhead.
+
+Generally, the broadcast dataset is usually relatively small against the other co-processed datasets. Therefore, we set the cap ratio to 0.5 by default because we usually expect the broadcast bytes to be smaller than non-broadcast bytes. The value is hard coded in the first version, and we may make it configurable later.
+
+
+### Normalize the parallelism to the closest power of 2
+The normalize is to avoid introducing data skew. To better understand this section, we suggest you read the [Flexible subpartition mapping](#flexible-subpartition-mapping) section first.
+
+Taking Fig. 4 (b) as example, A1/A2 produces 4 subpartitions, and the decided parallelism of B is 3. In this case, B1 will consume 1 subpartition, B2 will consume 1 subpartition, and B3 will consume 2 subpartitions. We assume that subpartitions have the same amount of data, which means B3 will consume twice the data of other tasks, data skew is introduced due to the subpartition mapping.
+
+To solve this problem, we need to make the subpartitions evenly consumed by downstream tasks, which means the number of subpartitions should be a multiple of the number of downstream tasks. For simplicity, we require the user-specified max parallelism to be 2<sup>N</sup>, and then adjust the calculated parallelism to a closest 2<sup>M</sup> (M <= N), so that we can guarantee that subpartitions will be evenly consumed by downstream tasks.
+
+Note that this is a temporary solution, the ultimate solution would be the [Auto-rebalancing of workloads](#auto-rebalancing-of-workloads), which may come soon.
+
+## Build up execution graph dynamically
+Before Flink 1.15, the execution graph was fully built in a static way before starting scheduling. To allow parallelisms of job vertices to be decided lazily, the execution graph must be able to be built up dynamically.
+
+### Create execution vertices and execution edges lazily
+A dynamic execution graph means that a Flink job starts with an empty execution topology, and then gradually attaches vertices during job execution, as shown in Fig. 2.
+
+The execution topology consists of execution vertices and execution edges. The execution vertices will be created and attached to the execution topology only when:
+
+- The parallelism of the corresponding job vertex is decided.
+- All upstream execution vertices are already attached.
+
+A decided parallelism of the job vertex is needed so that Flink knows how many execution vertices should be created. Upstream execution vertices need to be attached first so that Flink can connect the newly created execution vertices to the upstream vertices with execution edges. 
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/2-dynamic-graph.png" width="90%"/>
+<br/>
+Fig. 2 - Build up execution graph dynamically
+</center>
+
+<br/>
+
+### Flexible subpartition mapping
+Before Flink 1.15, when deploying a task, Flink needs to know the parallelism of its consumer job vertex. This is because consumer vertex parallelism is used to decide the number of subpartitions produced by each upstream task. The reason behind that is, for one result partition, different subpartitions serve different consumer execution vertices. More specifically, one consumer execution vertex only consumes data from subpartition with the same index. 
+
+Taking Fig. 3 as example, parallelism of the consumer B is 2, so the result partition produced by A1/A2 should contain 2 subpartitions, the subpartition with index 0 serves B1, and the subpartition with index 1 serves B2.
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/3-static-graph-subpartition-mapping.png" width="30%"/>
+<br/>
+Fig. 3 - How subpartitions serve consumer execution vertices with static execution graph
+</center>
+
+<br/>
+
+But obviously, this doesn't work for dynamic graphs, because when a job vertex is deployed, the parallelism of its consumer job vertices may not have been decided yet. To enable Flink to work in this case, we need a way to allow a job vertex to run without knowing the parallelism of its consumer job vertices(or rather, we need a way to allow execution vertices to run without knowing the number of their consumer execution vertices). 
+
+To achieve this goal, we can set the number of subpartitions to be the max parallelism of the consumer job vertex. Then when the consumer execution vertices are deployed, they should be assigned with a subpartition range to consume. Suppose N is the number of consumer execution vertices and P is the number of subpartitions. For the kth consumer execution vertex, the consumed subpartition range should be:
+
+<center>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/range-formula.png" width="55%"/>
+</center>
+
+Taking Fig. 4 as example, the max parallelism of B is 4, so A1/A2 have 4 subpartitions. And then if the decided parallelism of B is 2, then the subpartitions mapping will be Fig. 4 (a), if the decided parallelism of B is 3, then the subpartitions mapping will be  Fig. 4 (b).
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/4-dynamic-graph-subpartition-mapping.png" width="75%"/>
+<br/>
+Fig. 4 - How subpartitions serve consumer execution vertices with dynamic graph
+</center>
+
+<br/>
+
+## Update and schedule the dynamic execution graph
+The adaptive batch scheduler scheduling is similar to the default scheduler, the only difference is that an empty dynamic execution graph will be generated initially and vertices will be attached later. Before handling any scheduling event, the scheduler will try deciding the parallelisms for job vertices, and then initialize them to generate execution vertices, connecting execution edges, and update the execution graph.
+
+The scheduler will try to decide the parallelism for all job vertices before each scheduling, and the parallelism decision will be made for each job vertex in topological order:
+
+- For source vertices, the parallelism should have been decided before starting scheduling. 
+- For non-source vertices, the parallelism can be decided only when all its consumed results are fully finished.

Review Comment:
   nit: when all its precedent tasks are fully finished or when all its consumed results are fully produced?



##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,198 @@
+---
+layout: post
+title: "Automatically decide parallelism for Flink batch jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+  name: "Lijie Wang"
+- Zhu Zhu:
+  name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not easy work for many users. For batch jobs, a small parallelism may result in long execution time and big failover regression. While an unnecessary large parallelism may result in resource waste and more overhead cost in task deployment and network shuffling. 
+
+To decide a proper parallelism, one needs to know how much data each operator needs to process. However, It can be hard to predict data volume to be processed by a job because it can be different everyday. And it can be harder or even impossible (due to complex operators or UDFs) to predict data volume to be processed by each operator.
+
+To solve this problem, we introduced the adaptive batch scheduler in Flink 1.15. The adaptive batch scheduler can automatically decide parallelism for an operator according to the size of its consumed datasets. Here are the benefits the adaptive batch scheduler can bring:
+
+1. Batch job users can be relieved from parallelism tuning.
+2. Parallelism tuning is fine grained considering different operators. This is particularly beneficial for SQL jobs which can only be set with a global parallelism previously.
+3. Parallelism tuning can better fit consumed datasets which have a varying volume size every day.
+
+# Get Started
+
+To automatically decide parallelism for operators, you need to:
+
+1. Configure to use adaptive batch scheduler.
+2. Set the parallelism of operators to -1.
+
+
+## Configure to use adaptive batch scheduler
+
+To use adaptive batch scheduler, you need to set configurations as below:
+
+- Set `jobmanager.scheduler: AdaptiveBatch`.
+- Leave the [execution.batch-shuffle-mode]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#execution-batch-shuffle-mode) unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value). Currently, the adaptive batch scheduler only supports batch jobs whose shuffle mode is `ALL-EXCHANGES-BLOCKING`.
+
+In addition, there are several related configuration options to control the upper bounds and lower bounds of tuned parallelisms, to specify expected data volume to process by each operator, and to specify the default parallelism of sources. More details can be found in the [feature documentation page]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/elastic_scaling/#configure-to-use-adaptive-batch-scheduler).
+
+## Set the parallelism of operators to -1
+
+The adaptive batch scheduler will only automatically decide parallelism for operators whose parallelism is not set (which means the parallelism is -1).To leave parallelism unset, you should configure as follows:
+
+- Set `parallelism.default: -1`

Review Comment:
   Also, is this configuration required for both DataStream and SQL jobs or it only required for DataStream Jobs? 



##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,198 @@
+---
+layout: post
+title: "Automatically decide parallelism for Flink batch jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+  name: "Lijie Wang"
+- Zhu Zhu:
+  name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not easy work for many users. For batch jobs, a small parallelism may result in long execution time and big failover regression. While an unnecessary large parallelism may result in resource waste and more overhead cost in task deployment and network shuffling. 
+
+To decide a proper parallelism, one needs to know how much data each operator needs to process. However, It can be hard to predict data volume to be processed by a job because it can be different everyday. And it can be harder or even impossible (due to complex operators or UDFs) to predict data volume to be processed by each operator.
+
+To solve this problem, we introduced the adaptive batch scheduler in Flink 1.15. The adaptive batch scheduler can automatically decide parallelism for an operator according to the size of its consumed datasets. Here are the benefits the adaptive batch scheduler can bring:
+
+1. Batch job users can be relieved from parallelism tuning.
+2. Parallelism tuning is fine grained considering different operators. This is particularly beneficial for SQL jobs which can only be set with a global parallelism previously.
+3. Parallelism tuning can better fit consumed datasets which have a varying volume size every day.
+
+# Get Started
+
+To automatically decide parallelism for operators, you need to:
+
+1. Configure to use adaptive batch scheduler.
+2. Set the parallelism of operators to -1.
+
+
+## Configure to use adaptive batch scheduler
+
+To use adaptive batch scheduler, you need to set configurations as below:
+
+- Set `jobmanager.scheduler: AdaptiveBatch`.
+- Leave the [execution.batch-shuffle-mode]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#execution-batch-shuffle-mode) unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value). Currently, the adaptive batch scheduler only supports batch jobs whose shuffle mode is `ALL-EXCHANGES-BLOCKING`.
+
+In addition, there are several related configuration options to control the upper bounds and lower bounds of tuned parallelisms, to specify expected data volume to process by each operator, and to specify the default parallelism of sources. More details can be found in the [feature documentation page]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/elastic_scaling/#configure-to-use-adaptive-batch-scheduler).
+
+## Set the parallelism of operators to -1
+
+The adaptive batch scheduler will only automatically decide parallelism for operators whose parallelism is not set (which means the parallelism is -1).To leave parallelism unset, you should configure as follows:
+
+- Set `parallelism.default: -1`

Review Comment:
   Add `.` ?



##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,198 @@
+---
+layout: post
+title: "Automatically decide parallelism for Flink batch jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+  name: "Lijie Wang"
+- Zhu Zhu:
+  name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not easy work for many users. For batch jobs, a small parallelism may result in long execution time and big failover regression. While an unnecessary large parallelism may result in resource waste and more overhead cost in task deployment and network shuffling. 
+
+To decide a proper parallelism, one needs to know how much data each operator needs to process. However, It can be hard to predict data volume to be processed by a job because it can be different everyday. And it can be harder or even impossible (due to complex operators or UDFs) to predict data volume to be processed by each operator.
+
+To solve this problem, we introduced the adaptive batch scheduler in Flink 1.15. The adaptive batch scheduler can automatically decide parallelism for an operator according to the size of its consumed datasets. Here are the benefits the adaptive batch scheduler can bring:
+
+1. Batch job users can be relieved from parallelism tuning.
+2. Parallelism tuning is fine grained considering different operators. This is particularly beneficial for SQL jobs which can only be set with a global parallelism previously.
+3. Parallelism tuning can better fit consumed datasets which have a varying volume size every day.
+
+# Get Started
+
+To automatically decide parallelism for operators, you need to:
+
+1. Configure to use adaptive batch scheduler.
+2. Set the parallelism of operators to -1.
+
+
+## Configure to use adaptive batch scheduler
+
+To use adaptive batch scheduler, you need to set configurations as below:
+
+- Set `jobmanager.scheduler: AdaptiveBatch`.
+- Leave the [execution.batch-shuffle-mode]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#execution-batch-shuffle-mode) unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value). Currently, the adaptive batch scheduler only supports batch jobs whose shuffle mode is `ALL-EXCHANGES-BLOCKING`.
+
+In addition, there are several related configuration options to control the upper bounds and lower bounds of tuned parallelisms, to specify expected data volume to process by each operator, and to specify the default parallelism of sources. More details can be found in the [feature documentation page]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/elastic_scaling/#configure-to-use-adaptive-batch-scheduler).
+
+## Set the parallelism of operators to -1
+
+The adaptive batch scheduler will only automatically decide parallelism for operators whose parallelism is not set (which means the parallelism is -1).To leave parallelism unset, you should configure as follows:
+
+- Set `parallelism.default: -1`
+- Set `table.exec.resource.default-parallelism: -1` in SQL jobs.
+- Don't call `setParallelism()` for operators in DataStream/DataSet jobs.
+- Don't call `setParallelism()` on `StreamExecutionEnvironment/ExecutionEnvironment` in DataStream/DataSet jobs.
+
+
+# Implementation Details
+
+In this section, we will elaborate the details of the implementation. To automatically decide parallelism of operators, we introduced the following changes:
+
+1. Enables the scheduler to collect sizes of finished datasets.
+2. Introduces a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results.
+3. Enables to dynamically build up ExecutionGraph to allow the parallelisms of job vertices to be decided lazily. The execution graph starts with an empty execution topology and then gradually attaches the vertices during job execution.
+4. Introduces the adaptive batch scheduler to update and schedule the dynamic execution graph.
+
+The details will be introduced in the following sections.
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/1-overall-structure.png" width="60%"/>
+<br/>
+Fig. 1 - The overall structure of automatically deciding parallelism
+</center>
+
+<br/>
+
+## Collect sizes of consumed datasets
+
+The adaptive batch scheduler decides the parallelism of vertices by the size of input results, so the scheduler needs to know the sizes of result partitions produced by tasks. We introduced a numBytesProduced counter to record the size of each produced result partition, the accumulated result of the counter will be sent to the scheduler when tasks finish. 
+
+## Decide proper parallelisms for job vertices
+
+We introduced a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results. The computation algorithm is as follows:
+Suppose:
+
+- ***V*** is the bytes of data the user expects to be processed by each task.
+- ***totalBytes<sub>non-broadcast</sub>*** is the sum of the non-broadcast result sizes consumed by this job vertex.
+- ***totalBytes<sub>broadcast</sub>*** is the sum of the broadcast result sizes consumed by this job vertex.
+- ***broadcastCapRatio*** is the cap ratio of broadcast bytes that affects the parallelism calculation.
+- ***normalize(***x***)*** is a function that round ***x*** to the closest power of 2.
+
+then the parallelism of this job vertex ***P*** will be:
+<center>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/parallelism-formula.png" width="60%"/>
+</center>
+
+Note that we introduced two special treatment in the above formula :
+
+- [Limit the cap ratio of broadcast bytes](#limit-the-cap-ratio-of-broadcast-bytes)
+- [Normalize the parallelism to the closest power of 2](#normalize-the-parallelism-to-the-closest-power-of-2)
+
+However, the above formula cannot be used to decide the parallelism of the source vertices, because the source vertices have no input. To solve it, we introduced the configuration option `jobmanager.adaptive-batch-scheduler.default-source-parallelism` to allow users to manually configure the parallelism of source vertices. Note that not all data sources need this option, because some data sources can automatically infer parallelism (For example, HiveTableSource, see [HiveParallelismInference](https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveParallelismInference.java) for more detail). For these sources, it is recommended to decide parallelism by themselves.
+
+### Limit the cap ratio of broadcast bytes
+As you can see, we limit the cap ratio of broadcast bytes that affects the parallelism calculation to ***broadcastCapRatio***. That is, the non-broadcast bytes processed by each task is at least ***(1-broadcastCapRatio) * V***. If not so,when the total broadcast bytes is close to ***V***, even if the total non-broadcast bytes is very small, it may cause a large parallelism, which is unnecessary and may lead to resource waste and large task deployment overhead.
+
+Generally, the broadcast dataset is usually relatively small against the other co-processed datasets. Therefore, we set the cap ratio to 0.5 by default because we usually expect the broadcast bytes to be smaller than non-broadcast bytes. The value is hard coded in the first version, and we may make it configurable later.
+
+
+### Normalize the parallelism to the closest power of 2
+The normalize is to avoid introducing data skew. To better understand this section, we suggest you read the [Flexible subpartition mapping](#flexible-subpartition-mapping) section first.
+
+Taking Fig. 4 (b) as example, A1/A2 produces 4 subpartitions, and the decided parallelism of B is 3. In this case, B1 will consume 1 subpartition, B2 will consume 1 subpartition, and B3 will consume 2 subpartitions. We assume that subpartitions have the same amount of data, which means B3 will consume twice the data of other tasks, data skew is introduced due to the subpartition mapping.
+
+To solve this problem, we need to make the subpartitions evenly consumed by downstream tasks, which means the number of subpartitions should be a multiple of the number of downstream tasks. For simplicity, we require the user-specified max parallelism to be 2<sup>N</sup>, and then adjust the calculated parallelism to a closest 2<sup>M</sup> (M <= N), so that we can guarantee that subpartitions will be evenly consumed by downstream tasks.
+
+Note that this is a temporary solution, the ultimate solution would be the [Auto-rebalancing of workloads](#auto-rebalancing-of-workloads), which may come soon.
+
+## Build up execution graph dynamically
+Before Flink 1.15, the execution graph was fully built in a static way before starting scheduling. To allow parallelisms of job vertices to be decided lazily, the execution graph must be able to be built up dynamically.
+
+### Create execution vertices and execution edges lazily
+A dynamic execution graph means that a Flink job starts with an empty execution topology, and then gradually attaches vertices during job execution, as shown in Fig. 2.
+
+The execution topology consists of execution vertices and execution edges. The execution vertices will be created and attached to the execution topology only when:
+
+- The parallelism of the corresponding job vertex is decided.
+- All upstream execution vertices are already attached.
+
+A decided parallelism of the job vertex is needed so that Flink knows how many execution vertices should be created. Upstream execution vertices need to be attached first so that Flink can connect the newly created execution vertices to the upstream vertices with execution edges. 

Review Comment:
   The decided parallelism?



##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,198 @@
+---
+layout: post
+title: "Automatically decide parallelism for Flink batch jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+  name: "Lijie Wang"
+- Zhu Zhu:
+  name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not easy work for many users. For batch jobs, a small parallelism may result in long execution time and big failover regression. While an unnecessary large parallelism may result in resource waste and more overhead cost in task deployment and network shuffling. 
+
+To decide a proper parallelism, one needs to know how much data each operator needs to process. However, It can be hard to predict data volume to be processed by a job because it can be different everyday. And it can be harder or even impossible (due to complex operators or UDFs) to predict data volume to be processed by each operator.
+
+To solve this problem, we introduced the adaptive batch scheduler in Flink 1.15. The adaptive batch scheduler can automatically decide parallelism for an operator according to the size of its consumed datasets. Here are the benefits the adaptive batch scheduler can bring:
+
+1. Batch job users can be relieved from parallelism tuning.
+2. Parallelism tuning is fine grained considering different operators. This is particularly beneficial for SQL jobs which can only be set with a global parallelism previously.
+3. Parallelism tuning can better fit consumed datasets which have a varying volume size every day.
+
+# Get Started
+
+To automatically decide parallelism for operators, you need to:
+
+1. Configure to use adaptive batch scheduler.
+2. Set the parallelism of operators to -1.
+
+
+## Configure to use adaptive batch scheduler
+
+To use adaptive batch scheduler, you need to set configurations as below:
+
+- Set `jobmanager.scheduler: AdaptiveBatch`.
+- Leave the [execution.batch-shuffle-mode]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#execution-batch-shuffle-mode) unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value). Currently, the adaptive batch scheduler only supports batch jobs whose shuffle mode is `ALL-EXCHANGES-BLOCKING`.
+
+In addition, there are several related configuration options to control the upper bounds and lower bounds of tuned parallelisms, to specify expected data volume to process by each operator, and to specify the default parallelism of sources. More details can be found in the [feature documentation page]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/elastic_scaling/#configure-to-use-adaptive-batch-scheduler).
+
+## Set the parallelism of operators to -1
+
+The adaptive batch scheduler will only automatically decide parallelism for operators whose parallelism is not set (which means the parallelism is -1).To leave parallelism unset, you should configure as follows:
+
+- Set `parallelism.default: -1`
+- Set `table.exec.resource.default-parallelism: -1` in SQL jobs.
+- Don't call `setParallelism()` for operators in DataStream/DataSet jobs.
+- Don't call `setParallelism()` on `StreamExecutionEnvironment/ExecutionEnvironment` in DataStream/DataSet jobs.
+
+
+# Implementation Details
+
+In this section, we will elaborate the details of the implementation. To automatically decide parallelism of operators, we introduced the following changes:
+
+1. Enables the scheduler to collect sizes of finished datasets.
+2. Introduces a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results.
+3. Enables to dynamically build up ExecutionGraph to allow the parallelisms of job vertices to be decided lazily. The execution graph starts with an empty execution topology and then gradually attaches the vertices during job execution.
+4. Introduces the adaptive batch scheduler to update and schedule the dynamic execution graph.
+
+The details will be introduced in the following sections.
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/1-overall-structure.png" width="60%"/>
+<br/>
+Fig. 1 - The overall structure of automatically deciding parallelism
+</center>
+
+<br/>
+
+## Collect sizes of consumed datasets
+
+The adaptive batch scheduler decides the parallelism of vertices by the size of input results, so the scheduler needs to know the sizes of result partitions produced by tasks. We introduced a numBytesProduced counter to record the size of each produced result partition, the accumulated result of the counter will be sent to the scheduler when tasks finish. 
+
+## Decide proper parallelisms for job vertices
+
+We introduced a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results. The computation algorithm is as follows:

Review Comment:
   Perhaps simplified as 
   ```
   a new component `VertexParallelismDecider`
   ```



##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,198 @@
+---
+layout: post
+title: "Automatically decide parallelism for Flink batch jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+  name: "Lijie Wang"
+- Zhu Zhu:
+  name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not easy work for many users. For batch jobs, a small parallelism may result in long execution time and big failover regression. While an unnecessary large parallelism may result in resource waste and more overhead cost in task deployment and network shuffling. 
+
+To decide a proper parallelism, one needs to know how much data each operator needs to process. However, It can be hard to predict data volume to be processed by a job because it can be different everyday. And it can be harder or even impossible (due to complex operators or UDFs) to predict data volume to be processed by each operator.
+
+To solve this problem, we introduced the adaptive batch scheduler in Flink 1.15. The adaptive batch scheduler can automatically decide parallelism for an operator according to the size of its consumed datasets. Here are the benefits the adaptive batch scheduler can bring:
+
+1. Batch job users can be relieved from parallelism tuning.
+2. Parallelism tuning is fine grained considering different operators. This is particularly beneficial for SQL jobs which can only be set with a global parallelism previously.
+3. Parallelism tuning can better fit consumed datasets which have a varying volume size every day.
+
+# Get Started
+
+To automatically decide parallelism for operators, you need to:
+
+1. Configure to use adaptive batch scheduler.
+2. Set the parallelism of operators to -1.
+
+
+## Configure to use adaptive batch scheduler
+
+To use adaptive batch scheduler, you need to set configurations as below:
+
+- Set `jobmanager.scheduler: AdaptiveBatch`.
+- Leave the [execution.batch-shuffle-mode]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#execution-batch-shuffle-mode) unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value). Currently, the adaptive batch scheduler only supports batch jobs whose shuffle mode is `ALL-EXCHANGES-BLOCKING`.
+
+In addition, there are several related configuration options to control the upper bounds and lower bounds of tuned parallelisms, to specify expected data volume to process by each operator, and to specify the default parallelism of sources. More details can be found in the [feature documentation page]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/elastic_scaling/#configure-to-use-adaptive-batch-scheduler).
+
+## Set the parallelism of operators to -1
+
+The adaptive batch scheduler will only automatically decide parallelism for operators whose parallelism is not set (which means the parallelism is -1).To leave parallelism unset, you should configure as follows:
+
+- Set `parallelism.default: -1`
+- Set `table.exec.resource.default-parallelism: -1` in SQL jobs.
+- Don't call `setParallelism()` for operators in DataStream/DataSet jobs.
+- Don't call `setParallelism()` on `StreamExecutionEnvironment/ExecutionEnvironment` in DataStream/DataSet jobs.
+
+
+# Implementation Details
+
+In this section, we will elaborate the details of the implementation. To automatically decide parallelism of operators, we introduced the following changes:
+
+1. Enables the scheduler to collect sizes of finished datasets.
+2. Introduces a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results.
+3. Enables to dynamically build up ExecutionGraph to allow the parallelisms of job vertices to be decided lazily. The execution graph starts with an empty execution topology and then gradually attaches the vertices during job execution.
+4. Introduces the adaptive batch scheduler to update and schedule the dynamic execution graph.
+
+The details will be introduced in the following sections.
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/1-overall-structure.png" width="60%"/>
+<br/>
+Fig. 1 - The overall structure of automatically deciding parallelism
+</center>
+
+<br/>
+
+## Collect sizes of consumed datasets
+
+The adaptive batch scheduler decides the parallelism of vertices by the size of input results, so the scheduler needs to know the sizes of result partitions produced by tasks. We introduced a numBytesProduced counter to record the size of each produced result partition, the accumulated result of the counter will be sent to the scheduler when tasks finish. 
+
+## Decide proper parallelisms for job vertices
+
+We introduced a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results. The computation algorithm is as follows:
+Suppose:
+
+- ***V*** is the bytes of data the user expects to be processed by each task.
+- ***totalBytes<sub>non-broadcast</sub>*** is the sum of the non-broadcast result sizes consumed by this job vertex.
+- ***totalBytes<sub>broadcast</sub>*** is the sum of the broadcast result sizes consumed by this job vertex.
+- ***broadcastCapRatio*** is the cap ratio of broadcast bytes that affects the parallelism calculation.

Review Comment:
   Might have some explaination for `cap ratio`? 



##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,198 @@
+---
+layout: post
+title: "Automatically decide parallelism for Flink batch jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+  name: "Lijie Wang"
+- Zhu Zhu:
+  name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not easy work for many users. For batch jobs, a small parallelism may result in long execution time and big failover regression. While an unnecessary large parallelism may result in resource waste and more overhead cost in task deployment and network shuffling. 
+
+To decide a proper parallelism, one needs to know how much data each operator needs to process. However, It can be hard to predict data volume to be processed by a job because it can be different everyday. And it can be harder or even impossible (due to complex operators or UDFs) to predict data volume to be processed by each operator.
+
+To solve this problem, we introduced the adaptive batch scheduler in Flink 1.15. The adaptive batch scheduler can automatically decide parallelism for an operator according to the size of its consumed datasets. Here are the benefits the adaptive batch scheduler can bring:
+
+1. Batch job users can be relieved from parallelism tuning.
+2. Parallelism tuning is fine grained considering different operators. This is particularly beneficial for SQL jobs which can only be set with a global parallelism previously.
+3. Parallelism tuning can better fit consumed datasets which have a varying volume size every day.
+
+# Get Started
+
+To automatically decide parallelism for operators, you need to:
+
+1. Configure to use adaptive batch scheduler.
+2. Set the parallelism of operators to -1.
+
+
+## Configure to use adaptive batch scheduler
+
+To use adaptive batch scheduler, you need to set configurations as below:
+
+- Set `jobmanager.scheduler: AdaptiveBatch`.
+- Leave the [execution.batch-shuffle-mode]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#execution-batch-shuffle-mode) unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value). Currently, the adaptive batch scheduler only supports batch jobs whose shuffle mode is `ALL-EXCHANGES-BLOCKING`.
+
+In addition, there are several related configuration options to control the upper bounds and lower bounds of tuned parallelisms, to specify expected data volume to process by each operator, and to specify the default parallelism of sources. More details can be found in the [feature documentation page]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/elastic_scaling/#configure-to-use-adaptive-batch-scheduler).
+
+## Set the parallelism of operators to -1
+
+The adaptive batch scheduler will only automatically decide parallelism for operators whose parallelism is not set (which means the parallelism is -1).To leave parallelism unset, you should configure as follows:
+
+- Set `parallelism.default: -1`
+- Set `table.exec.resource.default-parallelism: -1` in SQL jobs.
+- Don't call `setParallelism()` for operators in DataStream/DataSet jobs.
+- Don't call `setParallelism()` on `StreamExecutionEnvironment/ExecutionEnvironment` in DataStream/DataSet jobs.
+
+
+# Implementation Details
+
+In this section, we will elaborate the details of the implementation. To automatically decide parallelism of operators, we introduced the following changes:
+
+1. Enables the scheduler to collect sizes of finished datasets.
+2. Introduces a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results.

Review Comment:
   Perhaps simplified as
   ```
   a new component `VertexParallelismDecider`
   ```



##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,198 @@
+---
+layout: post
+title: "Automatically decide parallelism for Flink batch jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+  name: "Lijie Wang"
+- Zhu Zhu:
+  name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not easy work for many users. For batch jobs, a small parallelism may result in long execution time and big failover regression. While an unnecessary large parallelism may result in resource waste and more overhead cost in task deployment and network shuffling. 
+
+To decide a proper parallelism, one needs to know how much data each operator needs to process. However, It can be hard to predict data volume to be processed by a job because it can be different everyday. And it can be harder or even impossible (due to complex operators or UDFs) to predict data volume to be processed by each operator.
+
+To solve this problem, we introduced the adaptive batch scheduler in Flink 1.15. The adaptive batch scheduler can automatically decide parallelism for an operator according to the size of its consumed datasets. Here are the benefits the adaptive batch scheduler can bring:
+
+1. Batch job users can be relieved from parallelism tuning.
+2. Parallelism tuning is fine grained considering different operators. This is particularly beneficial for SQL jobs which can only be set with a global parallelism previously.
+3. Parallelism tuning can better fit consumed datasets which have a varying volume size every day.
+
+# Get Started
+
+To automatically decide parallelism for operators, you need to:
+
+1. Configure to use adaptive batch scheduler.
+2. Set the parallelism of operators to -1.
+
+
+## Configure to use adaptive batch scheduler
+
+To use adaptive batch scheduler, you need to set configurations as below:
+
+- Set `jobmanager.scheduler: AdaptiveBatch`.
+- Leave the [execution.batch-shuffle-mode]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#execution-batch-shuffle-mode) unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value). Currently, the adaptive batch scheduler only supports batch jobs whose shuffle mode is `ALL-EXCHANGES-BLOCKING`.
+
+In addition, there are several related configuration options to control the upper bounds and lower bounds of tuned parallelisms, to specify expected data volume to process by each operator, and to specify the default parallelism of sources. More details can be found in the [feature documentation page]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/elastic_scaling/#configure-to-use-adaptive-batch-scheduler).
+
+## Set the parallelism of operators to -1
+
+The adaptive batch scheduler will only automatically decide parallelism for operators whose parallelism is not set (which means the parallelism is -1).To leave parallelism unset, you should configure as follows:
+
+- Set `parallelism.default: -1`
+- Set `table.exec.resource.default-parallelism: -1` in SQL jobs.
+- Don't call `setParallelism()` for operators in DataStream/DataSet jobs.
+- Don't call `setParallelism()` on `StreamExecutionEnvironment/ExecutionEnvironment` in DataStream/DataSet jobs.
+
+
+# Implementation Details
+
+In this section, we will elaborate the details of the implementation. To automatically decide parallelism of operators, we introduced the following changes:
+
+1. Enables the scheduler to collect sizes of finished datasets.
+2. Introduces a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results.
+3. Enables to dynamically build up ExecutionGraph to allow the parallelisms of job vertices to be decided lazily. The execution graph starts with an empty execution topology and then gradually attaches the vertices during job execution.
+4. Introduces the adaptive batch scheduler to update and schedule the dynamic execution graph.
+
+The details will be introduced in the following sections.
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/1-overall-structure.png" width="60%"/>
+<br/>
+Fig. 1 - The overall structure of automatically deciding parallelism
+</center>
+
+<br/>
+
+## Collect sizes of consumed datasets
+
+The adaptive batch scheduler decides the parallelism of vertices by the size of input results, so the scheduler needs to know the sizes of result partitions produced by tasks. We introduced a numBytesProduced counter to record the size of each produced result partition, the accumulated result of the counter will be sent to the scheduler when tasks finish. 
+
+## Decide proper parallelisms for job vertices
+
+We introduced a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results. The computation algorithm is as follows:
+Suppose:
+
+- ***V*** is the bytes of data the user expects to be processed by each task.
+- ***totalBytes<sub>non-broadcast</sub>*** is the sum of the non-broadcast result sizes consumed by this job vertex.
+- ***totalBytes<sub>broadcast</sub>*** is the sum of the broadcast result sizes consumed by this job vertex.
+- ***broadcastCapRatio*** is the cap ratio of broadcast bytes that affects the parallelism calculation.
+- ***normalize(***x***)*** is a function that round ***x*** to the closest power of 2.
+
+then the parallelism of this job vertex ***P*** will be:
+<center>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/parallelism-formula.png" width="60%"/>
+</center>
+
+Note that we introduced two special treatment in the above formula :
+
+- [Limit the cap ratio of broadcast bytes](#limit-the-cap-ratio-of-broadcast-bytes)
+- [Normalize the parallelism to the closest power of 2](#normalize-the-parallelism-to-the-closest-power-of-2)
+
+However, the above formula cannot be used to decide the parallelism of the source vertices, because the source vertices have no input. To solve it, we introduced the configuration option `jobmanager.adaptive-batch-scheduler.default-source-parallelism` to allow users to manually configure the parallelism of source vertices. Note that not all data sources need this option, because some data sources can automatically infer parallelism (For example, HiveTableSource, see [HiveParallelismInference](https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveParallelismInference.java) for more detail). For these sources, it is recommended to decide parallelism by themselves.
+
+### Limit the cap ratio of broadcast bytes
+As you can see, we limit the cap ratio of broadcast bytes that affects the parallelism calculation to ***broadcastCapRatio***. That is, the non-broadcast bytes processed by each task is at least ***(1-broadcastCapRatio) * V***. If not so,when the total broadcast bytes is close to ***V***, even if the total non-broadcast bytes is very small, it may cause a large parallelism, which is unnecessary and may lead to resource waste and large task deployment overhead.
+
+Generally, the broadcast dataset is usually relatively small against the other co-processed datasets. Therefore, we set the cap ratio to 0.5 by default because we usually expect the broadcast bytes to be smaller than non-broadcast bytes. The value is hard coded in the first version, and we may make it configurable later.
+
+
+### Normalize the parallelism to the closest power of 2
+The normalize is to avoid introducing data skew. To better understand this section, we suggest you read the [Flexible subpartition mapping](#flexible-subpartition-mapping) section first.
+
+Taking Fig. 4 (b) as example, A1/A2 produces 4 subpartitions, and the decided parallelism of B is 3. In this case, B1 will consume 1 subpartition, B2 will consume 1 subpartition, and B3 will consume 2 subpartitions. We assume that subpartitions have the same amount of data, which means B3 will consume twice the data of other tasks, data skew is introduced due to the subpartition mapping.
+
+To solve this problem, we need to make the subpartitions evenly consumed by downstream tasks, which means the number of subpartitions should be a multiple of the number of downstream tasks. For simplicity, we require the user-specified max parallelism to be 2<sup>N</sup>, and then adjust the calculated parallelism to a closest 2<sup>M</sup> (M <= N), so that we can guarantee that subpartitions will be evenly consumed by downstream tasks.
+
+Note that this is a temporary solution, the ultimate solution would be the [Auto-rebalancing of workloads](#auto-rebalancing-of-workloads), which may come soon.
+
+## Build up execution graph dynamically
+Before Flink 1.15, the execution graph was fully built in a static way before starting scheduling. To allow parallelisms of job vertices to be decided lazily, the execution graph must be able to be built up dynamically.
+
+### Create execution vertices and execution edges lazily
+A dynamic execution graph means that a Flink job starts with an empty execution topology, and then gradually attaches vertices during job execution, as shown in Fig. 2.
+
+The execution topology consists of execution vertices and execution edges. The execution vertices will be created and attached to the execution topology only when:
+
+- The parallelism of the corresponding job vertex is decided.
+- All upstream execution vertices are already attached.
+
+A decided parallelism of the job vertex is needed so that Flink knows how many execution vertices should be created. Upstream execution vertices need to be attached first so that Flink can connect the newly created execution vertices to the upstream vertices with execution edges. 
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/2-dynamic-graph.png" width="90%"/>
+<br/>
+Fig. 2 - Build up execution graph dynamically
+</center>
+
+<br/>
+
+### Flexible subpartition mapping
+Before Flink 1.15, when deploying a task, Flink needs to know the parallelism of its consumer job vertex. This is because consumer vertex parallelism is used to decide the number of subpartitions produced by each upstream task. The reason behind that is, for one result partition, different subpartitions serve different consumer execution vertices. More specifically, one consumer execution vertex only consumes data from subpartition with the same index. 
+
+Taking Fig. 3 as example, parallelism of the consumer B is 2, so the result partition produced by A1/A2 should contain 2 subpartitions, the subpartition with index 0 serves B1, and the subpartition with index 1 serves B2.
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/3-static-graph-subpartition-mapping.png" width="30%"/>
+<br/>
+Fig. 3 - How subpartitions serve consumer execution vertices with static execution graph
+</center>
+
+<br/>
+
+But obviously, this doesn't work for dynamic graphs, because when a job vertex is deployed, the parallelism of its consumer job vertices may not have been decided yet. To enable Flink to work in this case, we need a way to allow a job vertex to run without knowing the parallelism of its consumer job vertices(or rather, we need a way to allow execution vertices to run without knowing the number of their consumer execution vertices). 
+
+To achieve this goal, we can set the number of subpartitions to be the max parallelism of the consumer job vertex. Then when the consumer execution vertices are deployed, they should be assigned with a subpartition range to consume. Suppose N is the number of consumer execution vertices and P is the number of subpartitions. For the kth consumer execution vertex, the consumed subpartition range should be:
+
+<center>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/range-formula.png" width="55%"/>
+</center>
+
+Taking Fig. 4 as example, the max parallelism of B is 4, so A1/A2 have 4 subpartitions. And then if the decided parallelism of B is 2, then the subpartitions mapping will be Fig. 4 (a), if the decided parallelism of B is 3, then the subpartitions mapping will be  Fig. 4 (b).
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/4-dynamic-graph-subpartition-mapping.png" width="75%"/>
+<br/>
+Fig. 4 - How subpartitions serve consumer execution vertices with dynamic graph
+</center>
+
+<br/>
+
+## Update and schedule the dynamic execution graph
+The adaptive batch scheduler scheduling is similar to the default scheduler, the only difference is that an empty dynamic execution graph will be generated initially and vertices will be attached later. Before handling any scheduling event, the scheduler will try deciding the parallelisms for job vertices, and then initialize them to generate execution vertices, connecting execution edges, and update the execution graph.
+
+The scheduler will try to decide the parallelism for all job vertices before each scheduling, and the parallelism decision will be made for each job vertex in topological order:

Review Comment:
   `before each scheduling` -> `before scheduling`?



##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,198 @@
+---
+layout: post
+title: "Automatically decide parallelism for Flink batch jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+  name: "Lijie Wang"
+- Zhu Zhu:
+  name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not easy work for many users. For batch jobs, a small parallelism may result in long execution time and big failover regression. While an unnecessary large parallelism may result in resource waste and more overhead cost in task deployment and network shuffling. 
+
+To decide a proper parallelism, one needs to know how much data each operator needs to process. However, It can be hard to predict data volume to be processed by a job because it can be different everyday. And it can be harder or even impossible (due to complex operators or UDFs) to predict data volume to be processed by each operator.
+
+To solve this problem, we introduced the adaptive batch scheduler in Flink 1.15. The adaptive batch scheduler can automatically decide parallelism for an operator according to the size of its consumed datasets. Here are the benefits the adaptive batch scheduler can bring:
+
+1. Batch job users can be relieved from parallelism tuning.
+2. Parallelism tuning is fine grained considering different operators. This is particularly beneficial for SQL jobs which can only be set with a global parallelism previously.
+3. Parallelism tuning can better fit consumed datasets which have a varying volume size every day.
+
+# Get Started
+
+To automatically decide parallelism for operators, you need to:
+
+1. Configure to use adaptive batch scheduler.
+2. Set the parallelism of operators to -1.
+
+
+## Configure to use adaptive batch scheduler
+
+To use adaptive batch scheduler, you need to set configurations as below:
+
+- Set `jobmanager.scheduler: AdaptiveBatch`.
+- Leave the [execution.batch-shuffle-mode]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#execution-batch-shuffle-mode) unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value). Currently, the adaptive batch scheduler only supports batch jobs whose shuffle mode is `ALL-EXCHANGES-BLOCKING`.
+
+In addition, there are several related configuration options to control the upper bounds and lower bounds of tuned parallelisms, to specify expected data volume to process by each operator, and to specify the default parallelism of sources. More details can be found in the [feature documentation page]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/elastic_scaling/#configure-to-use-adaptive-batch-scheduler).
+
+## Set the parallelism of operators to -1
+
+The adaptive batch scheduler will only automatically decide parallelism for operators whose parallelism is not set (which means the parallelism is -1).To leave parallelism unset, you should configure as follows:
+
+- Set `parallelism.default: -1`
+- Set `table.exec.resource.default-parallelism: -1` in SQL jobs.
+- Don't call `setParallelism()` for operators in DataStream/DataSet jobs.
+- Don't call `setParallelism()` on `StreamExecutionEnvironment/ExecutionEnvironment` in DataStream/DataSet jobs.
+
+
+# Implementation Details
+
+In this section, we will elaborate the details of the implementation. To automatically decide parallelism of operators, we introduced the following changes:
+
+1. Enables the scheduler to collect sizes of finished datasets.
+2. Introduces a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results.
+3. Enables to dynamically build up ExecutionGraph to allow the parallelisms of job vertices to be decided lazily. The execution graph starts with an empty execution topology and then gradually attaches the vertices during job execution.
+4. Introduces the adaptive batch scheduler to update and schedule the dynamic execution graph.
+
+The details will be introduced in the following sections.
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/1-overall-structure.png" width="60%"/>
+<br/>
+Fig. 1 - The overall structure of automatically deciding parallelism
+</center>
+
+<br/>
+
+## Collect sizes of consumed datasets
+
+The adaptive batch scheduler decides the parallelism of vertices by the size of input results, so the scheduler needs to know the sizes of result partitions produced by tasks. We introduced a numBytesProduced counter to record the size of each produced result partition, the accumulated result of the counter will be sent to the scheduler when tasks finish. 
+
+## Decide proper parallelisms for job vertices
+
+We introduced a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results. The computation algorithm is as follows:
+Suppose:
+
+- ***V*** is the bytes of data the user expects to be processed by each task.
+- ***totalBytes<sub>non-broadcast</sub>*** is the sum of the non-broadcast result sizes consumed by this job vertex.
+- ***totalBytes<sub>broadcast</sub>*** is the sum of the broadcast result sizes consumed by this job vertex.
+- ***broadcastCapRatio*** is the cap ratio of broadcast bytes that affects the parallelism calculation.
+- ***normalize(***x***)*** is a function that round ***x*** to the closest power of 2.
+
+then the parallelism of this job vertex ***P*** will be:
+<center>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/parallelism-formula.png" width="60%"/>
+</center>
+
+Note that we introduced two special treatment in the above formula :
+
+- [Limit the cap ratio of broadcast bytes](#limit-the-cap-ratio-of-broadcast-bytes)
+- [Normalize the parallelism to the closest power of 2](#normalize-the-parallelism-to-the-closest-power-of-2)
+
+However, the above formula cannot be used to decide the parallelism of the source vertices, because the source vertices have no input. To solve it, we introduced the configuration option `jobmanager.adaptive-batch-scheduler.default-source-parallelism` to allow users to manually configure the parallelism of source vertices. Note that not all data sources need this option, because some data sources can automatically infer parallelism (For example, HiveTableSource, see [HiveParallelismInference](https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveParallelismInference.java) for more detail). For these sources, it is recommended to decide parallelism by themselves.
+
+### Limit the cap ratio of broadcast bytes
+As you can see, we limit the cap ratio of broadcast bytes that affects the parallelism calculation to ***broadcastCapRatio***. That is, the non-broadcast bytes processed by each task is at least ***(1-broadcastCapRatio) * V***. If not so,when the total broadcast bytes is close to ***V***, even if the total non-broadcast bytes is very small, it may cause a large parallelism, which is unnecessary and may lead to resource waste and large task deployment overhead.
+
+Generally, the broadcast dataset is usually relatively small against the other co-processed datasets. Therefore, we set the cap ratio to 0.5 by default because we usually expect the broadcast bytes to be smaller than non-broadcast bytes. The value is hard coded in the first version, and we may make it configurable later.
+
+
+### Normalize the parallelism to the closest power of 2
+The normalize is to avoid introducing data skew. To better understand this section, we suggest you read the [Flexible subpartition mapping](#flexible-subpartition-mapping) section first.
+
+Taking Fig. 4 (b) as example, A1/A2 produces 4 subpartitions, and the decided parallelism of B is 3. In this case, B1 will consume 1 subpartition, B2 will consume 1 subpartition, and B3 will consume 2 subpartitions. We assume that subpartitions have the same amount of data, which means B3 will consume twice the data of other tasks, data skew is introduced due to the subpartition mapping.
+
+To solve this problem, we need to make the subpartitions evenly consumed by downstream tasks, which means the number of subpartitions should be a multiple of the number of downstream tasks. For simplicity, we require the user-specified max parallelism to be 2<sup>N</sup>, and then adjust the calculated parallelism to a closest 2<sup>M</sup> (M <= N), so that we can guarantee that subpartitions will be evenly consumed by downstream tasks.
+
+Note that this is a temporary solution, the ultimate solution would be the [Auto-rebalancing of workloads](#auto-rebalancing-of-workloads), which may come soon.
+
+## Build up execution graph dynamically
+Before Flink 1.15, the execution graph was fully built in a static way before starting scheduling. To allow parallelisms of job vertices to be decided lazily, the execution graph must be able to be built up dynamically.
+
+### Create execution vertices and execution edges lazily
+A dynamic execution graph means that a Flink job starts with an empty execution topology, and then gradually attaches vertices during job execution, as shown in Fig. 2.
+
+The execution topology consists of execution vertices and execution edges. The execution vertices will be created and attached to the execution topology only when:
+
+- The parallelism of the corresponding job vertex is decided.
+- All upstream execution vertices are already attached.
+
+A decided parallelism of the job vertex is needed so that Flink knows how many execution vertices should be created. Upstream execution vertices need to be attached first so that Flink can connect the newly created execution vertices to the upstream vertices with execution edges. 
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/2-dynamic-graph.png" width="90%"/>
+<br/>
+Fig. 2 - Build up execution graph dynamically
+</center>
+
+<br/>
+
+### Flexible subpartition mapping
+Before Flink 1.15, when deploying a task, Flink needs to know the parallelism of its consumer job vertex. This is because consumer vertex parallelism is used to decide the number of subpartitions produced by each upstream task. The reason behind that is, for one result partition, different subpartitions serve different consumer execution vertices. More specifically, one consumer execution vertex only consumes data from subpartition with the same index. 
+
+Taking Fig. 3 as example, parallelism of the consumer B is 2, so the result partition produced by A1/A2 should contain 2 subpartitions, the subpartition with index 0 serves B1, and the subpartition with index 1 serves B2.
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/3-static-graph-subpartition-mapping.png" width="30%"/>
+<br/>
+Fig. 3 - How subpartitions serve consumer execution vertices with static execution graph
+</center>
+
+<br/>
+
+But obviously, this doesn't work for dynamic graphs, because when a job vertex is deployed, the parallelism of its consumer job vertices may not have been decided yet. To enable Flink to work in this case, we need a way to allow a job vertex to run without knowing the parallelism of its consumer job vertices(or rather, we need a way to allow execution vertices to run without knowing the number of their consumer execution vertices). 
+
+To achieve this goal, we can set the number of subpartitions to be the max parallelism of the consumer job vertex. Then when the consumer execution vertices are deployed, they should be assigned with a subpartition range to consume. Suppose N is the number of consumer execution vertices and P is the number of subpartitions. For the kth consumer execution vertex, the consumed subpartition range should be:
+
+<center>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/range-formula.png" width="55%"/>
+</center>
+
+Taking Fig. 4 as example, the max parallelism of B is 4, so A1/A2 have 4 subpartitions. And then if the decided parallelism of B is 2, then the subpartitions mapping will be Fig. 4 (a), if the decided parallelism of B is 3, then the subpartitions mapping will be  Fig. 4 (b).
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/4-dynamic-graph-subpartition-mapping.png" width="75%"/>
+<br/>
+Fig. 4 - How subpartitions serve consumer execution vertices with dynamic graph
+</center>
+
+<br/>
+
+## Update and schedule the dynamic execution graph
+The adaptive batch scheduler scheduling is similar to the default scheduler, the only difference is that an empty dynamic execution graph will be generated initially and vertices will be attached later. Before handling any scheduling event, the scheduler will try deciding the parallelisms for job vertices, and then initialize them to generate execution vertices, connecting execution edges, and update the execution graph.
+
+The scheduler will try to decide the parallelism for all job vertices before each scheduling, and the parallelism decision will be made for each job vertex in topological order:
+
+- For source vertices, the parallelism should have been decided before starting scheduling. 
+- For non-source vertices, the parallelism can be decided only when all its consumed results are fully finished.
+
+After trying to decide the parallelism for all job vertices, the scheduler will try to initialize the job vertices in topological order. A job vertex that can be initialized should meet the following conditions:

Review Comment:
   After decided the parallelism for ... ?
   will try to initialize  -> will initialize?



##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,198 @@
+---
+layout: post
+title: "Automatically decide parallelism for Flink batch jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+  name: "Lijie Wang"
+- Zhu Zhu:
+  name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not easy work for many users. For batch jobs, a small parallelism may result in long execution time and big failover regression. While an unnecessary large parallelism may result in resource waste and more overhead cost in task deployment and network shuffling. 
+
+To decide a proper parallelism, one needs to know how much data each operator needs to process. However, It can be hard to predict data volume to be processed by a job because it can be different everyday. And it can be harder or even impossible (due to complex operators or UDFs) to predict data volume to be processed by each operator.
+
+To solve this problem, we introduced the adaptive batch scheduler in Flink 1.15. The adaptive batch scheduler can automatically decide parallelism for an operator according to the size of its consumed datasets. Here are the benefits the adaptive batch scheduler can bring:
+
+1. Batch job users can be relieved from parallelism tuning.
+2. Parallelism tuning is fine grained considering different operators. This is particularly beneficial for SQL jobs which can only be set with a global parallelism previously.
+3. Parallelism tuning can better fit consumed datasets which have a varying volume size every day.
+
+# Get Started
+
+To automatically decide parallelism for operators, you need to:
+
+1. Configure to use adaptive batch scheduler.
+2. Set the parallelism of operators to -1.
+
+
+## Configure to use adaptive batch scheduler
+
+To use adaptive batch scheduler, you need to set configurations as below:
+
+- Set `jobmanager.scheduler: AdaptiveBatch`.
+- Leave the [execution.batch-shuffle-mode]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#execution-batch-shuffle-mode) unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value). Currently, the adaptive batch scheduler only supports batch jobs whose shuffle mode is `ALL-EXCHANGES-BLOCKING`.
+
+In addition, there are several related configuration options to control the upper bounds and lower bounds of tuned parallelisms, to specify expected data volume to process by each operator, and to specify the default parallelism of sources. More details can be found in the [feature documentation page]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/elastic_scaling/#configure-to-use-adaptive-batch-scheduler).
+
+## Set the parallelism of operators to -1
+
+The adaptive batch scheduler will only automatically decide parallelism for operators whose parallelism is not set (which means the parallelism is -1).To leave parallelism unset, you should configure as follows:
+
+- Set `parallelism.default: -1`
+- Set `table.exec.resource.default-parallelism: -1` in SQL jobs.
+- Don't call `setParallelism()` for operators in DataStream/DataSet jobs.
+- Don't call `setParallelism()` on `StreamExecutionEnvironment/ExecutionEnvironment` in DataStream/DataSet jobs.
+
+
+# Implementation Details
+
+In this section, we will elaborate the details of the implementation. To automatically decide parallelism of operators, we introduced the following changes:
+
+1. Enables the scheduler to collect sizes of finished datasets.
+2. Introduces a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results.
+3. Enables to dynamically build up ExecutionGraph to allow the parallelisms of job vertices to be decided lazily. The execution graph starts with an empty execution topology and then gradually attaches the vertices during job execution.
+4. Introduces the adaptive batch scheduler to update and schedule the dynamic execution graph.
+
+The details will be introduced in the following sections.
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/1-overall-structure.png" width="60%"/>
+<br/>
+Fig. 1 - The overall structure of automatically deciding parallelism
+</center>
+
+<br/>
+
+## Collect sizes of consumed datasets
+
+The adaptive batch scheduler decides the parallelism of vertices by the size of input results, so the scheduler needs to know the sizes of result partitions produced by tasks. We introduced a numBytesProduced counter to record the size of each produced result partition, the accumulated result of the counter will be sent to the scheduler when tasks finish. 
+
+## Decide proper parallelisms for job vertices
+
+We introduced a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results. The computation algorithm is as follows:
+Suppose:

Review Comment:
   Add an empty line before `Suppose: `.
   
   Also might not use two adjacent `:`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-web] wanglijie95 commented on a diff in pull request #546: Add blogs for FLIP-187 adaptive batch scheduler

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on code in PR #546:
URL: https://github.com/apache/flink-web/pull/546#discussion_r896616269


##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,198 @@
+---
+layout: post
+title: "Automatically decide parallelism for Flink batch jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+  name: "Lijie Wang"
+- Zhu Zhu:
+  name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not easy work for many users. For batch jobs, a small parallelism may result in long execution time and big failover regression. While an unnecessary large parallelism may result in resource waste and more overhead cost in task deployment and network shuffling. 
+
+To decide a proper parallelism, one needs to know how much data each operator needs to process. However, It can be hard to predict data volume to be processed by a job because it can be different everyday. And it can be harder or even impossible (due to complex operators or UDFs) to predict data volume to be processed by each operator.
+
+To solve this problem, we introduced the adaptive batch scheduler in Flink 1.15. The adaptive batch scheduler can automatically decide parallelism for an operator according to the size of its consumed datasets. Here are the benefits the adaptive batch scheduler can bring:
+
+1. Batch job users can be relieved from parallelism tuning.
+2. Parallelism tuning is fine grained considering different operators. This is particularly beneficial for SQL jobs which can only be set with a global parallelism previously.
+3. Parallelism tuning can better fit consumed datasets which have a varying volume size every day.
+
+# Get Started
+
+To automatically decide parallelism for operators, you need to:
+
+1. Configure to use adaptive batch scheduler.
+2. Set the parallelism of operators to -1.
+
+
+## Configure to use adaptive batch scheduler
+
+To use adaptive batch scheduler, you need to set configurations as below:
+
+- Set `jobmanager.scheduler: AdaptiveBatch`.
+- Leave the [execution.batch-shuffle-mode]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#execution-batch-shuffle-mode) unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value). Currently, the adaptive batch scheduler only supports batch jobs whose shuffle mode is `ALL-EXCHANGES-BLOCKING`.
+
+In addition, there are several related configuration options to control the upper bounds and lower bounds of tuned parallelisms, to specify expected data volume to process by each operator, and to specify the default parallelism of sources. More details can be found in the [feature documentation page]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/elastic_scaling/#configure-to-use-adaptive-batch-scheduler).
+
+## Set the parallelism of operators to -1
+
+The adaptive batch scheduler will only automatically decide parallelism for operators whose parallelism is not set (which means the parallelism is -1).To leave parallelism unset, you should configure as follows:
+
+- Set `parallelism.default: -1`
+- Set `table.exec.resource.default-parallelism: -1` in SQL jobs.
+- Don't call `setParallelism()` for operators in DataStream/DataSet jobs.
+- Don't call `setParallelism()` on `StreamExecutionEnvironment/ExecutionEnvironment` in DataStream/DataSet jobs.
+
+
+# Implementation Details
+
+In this section, we will elaborate the details of the implementation. To automatically decide parallelism of operators, we introduced the following changes:
+
+1. Enables the scheduler to collect sizes of finished datasets.
+2. Introduces a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results.
+3. Enables to dynamically build up ExecutionGraph to allow the parallelisms of job vertices to be decided lazily. The execution graph starts with an empty execution topology and then gradually attaches the vertices during job execution.
+4. Introduces the adaptive batch scheduler to update and schedule the dynamic execution graph.
+
+The details will be introduced in the following sections.
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/1-overall-structure.png" width="60%"/>
+<br/>
+Fig. 1 - The overall structure of automatically deciding parallelism
+</center>
+
+<br/>
+
+## Collect sizes of consumed datasets
+
+The adaptive batch scheduler decides the parallelism of vertices by the size of input results, so the scheduler needs to know the sizes of result partitions produced by tasks. We introduced a numBytesProduced counter to record the size of each produced result partition, the accumulated result of the counter will be sent to the scheduler when tasks finish. 
+
+## Decide proper parallelisms for job vertices
+
+We introduced a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results. The computation algorithm is as follows:

Review Comment:
   I add a link for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-web] wanglijie95 commented on a diff in pull request #546: Add blogs for FLIP-187 adaptive batch scheduler

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on code in PR #546:
URL: https://github.com/apache/flink-web/pull/546#discussion_r896636195


##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,198 @@
+---
+layout: post
+title: "Automatically decide parallelism for Flink batch jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+  name: "Lijie Wang"
+- Zhu Zhu:
+  name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not easy work for many users. For batch jobs, a small parallelism may result in long execution time and big failover regression. While an unnecessary large parallelism may result in resource waste and more overhead cost in task deployment and network shuffling. 
+
+To decide a proper parallelism, one needs to know how much data each operator needs to process. However, It can be hard to predict data volume to be processed by a job because it can be different everyday. And it can be harder or even impossible (due to complex operators or UDFs) to predict data volume to be processed by each operator.
+
+To solve this problem, we introduced the adaptive batch scheduler in Flink 1.15. The adaptive batch scheduler can automatically decide parallelism for an operator according to the size of its consumed datasets. Here are the benefits the adaptive batch scheduler can bring:
+
+1. Batch job users can be relieved from parallelism tuning.
+2. Parallelism tuning is fine grained considering different operators. This is particularly beneficial for SQL jobs which can only be set with a global parallelism previously.
+3. Parallelism tuning can better fit consumed datasets which have a varying volume size every day.
+
+# Get Started
+
+To automatically decide parallelism for operators, you need to:
+
+1. Configure to use adaptive batch scheduler.
+2. Set the parallelism of operators to -1.
+
+
+## Configure to use adaptive batch scheduler
+
+To use adaptive batch scheduler, you need to set configurations as below:
+
+- Set `jobmanager.scheduler: AdaptiveBatch`.
+- Leave the [execution.batch-shuffle-mode]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#execution-batch-shuffle-mode) unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value). Currently, the adaptive batch scheduler only supports batch jobs whose shuffle mode is `ALL-EXCHANGES-BLOCKING`.
+
+In addition, there are several related configuration options to control the upper bounds and lower bounds of tuned parallelisms, to specify expected data volume to process by each operator, and to specify the default parallelism of sources. More details can be found in the [feature documentation page]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/elastic_scaling/#configure-to-use-adaptive-batch-scheduler).
+
+## Set the parallelism of operators to -1
+
+The adaptive batch scheduler will only automatically decide parallelism for operators whose parallelism is not set (which means the parallelism is -1).To leave parallelism unset, you should configure as follows:
+
+- Set `parallelism.default: -1`

Review Comment:
   Fixed.
   It's required for both DataStream and SQL jobs. For SQL jobs, when `table.exec.resource.default-parallelism` is `-1`, it will use `parallelism.default`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-web] wanglijie95 commented on a diff in pull request #546: Add blogs for FLIP-187 adaptive batch scheduler

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on code in PR #546:
URL: https://github.com/apache/flink-web/pull/546#discussion_r896615871


##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,198 @@
+---
+layout: post
+title: "Automatically decide parallelism for Flink batch jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+  name: "Lijie Wang"
+- Zhu Zhu:
+  name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not easy work for many users. For batch jobs, a small parallelism may result in long execution time and big failover regression. While an unnecessary large parallelism may result in resource waste and more overhead cost in task deployment and network shuffling. 
+
+To decide a proper parallelism, one needs to know how much data each operator needs to process. However, It can be hard to predict data volume to be processed by a job because it can be different everyday. And it can be harder or even impossible (due to complex operators or UDFs) to predict data volume to be processed by each operator.
+
+To solve this problem, we introduced the adaptive batch scheduler in Flink 1.15. The adaptive batch scheduler can automatically decide parallelism for an operator according to the size of its consumed datasets. Here are the benefits the adaptive batch scheduler can bring:
+
+1. Batch job users can be relieved from parallelism tuning.
+2. Parallelism tuning is fine grained considering different operators. This is particularly beneficial for SQL jobs which can only be set with a global parallelism previously.
+3. Parallelism tuning can better fit consumed datasets which have a varying volume size every day.
+
+# Get Started
+
+To automatically decide parallelism for operators, you need to:
+
+1. Configure to use adaptive batch scheduler.
+2. Set the parallelism of operators to -1.
+
+
+## Configure to use adaptive batch scheduler
+
+To use adaptive batch scheduler, you need to set configurations as below:
+
+- Set `jobmanager.scheduler: AdaptiveBatch`.
+- Leave the [execution.batch-shuffle-mode]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#execution-batch-shuffle-mode) unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value). Currently, the adaptive batch scheduler only supports batch jobs whose shuffle mode is `ALL-EXCHANGES-BLOCKING`.
+
+In addition, there are several related configuration options to control the upper bounds and lower bounds of tuned parallelisms, to specify expected data volume to process by each operator, and to specify the default parallelism of sources. More details can be found in the [feature documentation page]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/elastic_scaling/#configure-to-use-adaptive-batch-scheduler).
+
+## Set the parallelism of operators to -1
+
+The adaptive batch scheduler will only automatically decide parallelism for operators whose parallelism is not set (which means the parallelism is -1).To leave parallelism unset, you should configure as follows:
+
+- Set `parallelism.default: -1`
+- Set `table.exec.resource.default-parallelism: -1` in SQL jobs.
+- Don't call `setParallelism()` for operators in DataStream/DataSet jobs.
+- Don't call `setParallelism()` on `StreamExecutionEnvironment/ExecutionEnvironment` in DataStream/DataSet jobs.
+
+
+# Implementation Details
+
+In this section, we will elaborate the details of the implementation. To automatically decide parallelism of operators, we introduced the following changes:
+
+1. Enables the scheduler to collect sizes of finished datasets.
+2. Introduces a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results.
+3. Enables to dynamically build up ExecutionGraph to allow the parallelisms of job vertices to be decided lazily. The execution graph starts with an empty execution topology and then gradually attaches the vertices during job execution.
+4. Introduces the adaptive batch scheduler to update and schedule the dynamic execution graph.
+
+The details will be introduced in the following sections.
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/1-overall-structure.png" width="60%"/>
+<br/>
+Fig. 1 - The overall structure of automatically deciding parallelism
+</center>
+
+<br/>
+
+## Collect sizes of consumed datasets
+
+The adaptive batch scheduler decides the parallelism of vertices by the size of input results, so the scheduler needs to know the sizes of result partitions produced by tasks. We introduced a numBytesProduced counter to record the size of each produced result partition, the accumulated result of the counter will be sent to the scheduler when tasks finish. 
+
+## Decide proper parallelisms for job vertices
+
+We introduced a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results. The computation algorithm is as follows:
+Suppose:
+
+- ***V*** is the bytes of data the user expects to be processed by each task.
+- ***totalBytes<sub>non-broadcast</sub>*** is the sum of the non-broadcast result sizes consumed by this job vertex.
+- ***totalBytes<sub>broadcast</sub>*** is the sum of the broadcast result sizes consumed by this job vertex.
+- ***broadcastCapRatio*** is the cap ratio of broadcast bytes that affects the parallelism calculation.
+- ***normalize(***x***)*** is a function that round ***x*** to the closest power of 2.
+
+then the parallelism of this job vertex ***P*** will be:
+<center>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/parallelism-formula.png" width="60%"/>
+</center>
+
+Note that we introduced two special treatment in the above formula :
+
+- [Limit the cap ratio of broadcast bytes](#limit-the-cap-ratio-of-broadcast-bytes)
+- [Normalize the parallelism to the closest power of 2](#normalize-the-parallelism-to-the-closest-power-of-2)
+
+However, the above formula cannot be used to decide the parallelism of the source vertices, because the source vertices have no input. To solve it, we introduced the configuration option `jobmanager.adaptive-batch-scheduler.default-source-parallelism` to allow users to manually configure the parallelism of source vertices. Note that not all data sources need this option, because some data sources can automatically infer parallelism (For example, HiveTableSource, see [HiveParallelismInference](https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveParallelismInference.java) for more detail). For these sources, it is recommended to decide parallelism by themselves.
+
+### Limit the cap ratio of broadcast bytes
+As you can see, we limit the cap ratio of broadcast bytes that affects the parallelism calculation to ***broadcastCapRatio***. That is, the non-broadcast bytes processed by each task is at least ***(1-broadcastCapRatio) * V***. If not so,when the total broadcast bytes is close to ***V***, even if the total non-broadcast bytes is very small, it may cause a large parallelism, which is unnecessary and may lead to resource waste and large task deployment overhead.
+
+Generally, the broadcast dataset is usually relatively small against the other co-processed datasets. Therefore, we set the cap ratio to 0.5 by default because we usually expect the broadcast bytes to be smaller than non-broadcast bytes. The value is hard coded in the first version, and we may make it configurable later.
+
+
+### Normalize the parallelism to the closest power of 2
+The normalize is to avoid introducing data skew. To better understand this section, we suggest you read the [Flexible subpartition mapping](#flexible-subpartition-mapping) section first.
+
+Taking Fig. 4 (b) as example, A1/A2 produces 4 subpartitions, and the decided parallelism of B is 3. In this case, B1 will consume 1 subpartition, B2 will consume 1 subpartition, and B3 will consume 2 subpartitions. We assume that subpartitions have the same amount of data, which means B3 will consume twice the data of other tasks, data skew is introduced due to the subpartition mapping.
+
+To solve this problem, we need to make the subpartitions evenly consumed by downstream tasks, which means the number of subpartitions should be a multiple of the number of downstream tasks. For simplicity, we require the user-specified max parallelism to be 2<sup>N</sup>, and then adjust the calculated parallelism to a closest 2<sup>M</sup> (M <= N), so that we can guarantee that subpartitions will be evenly consumed by downstream tasks.
+
+Note that this is a temporary solution, the ultimate solution would be the [Auto-rebalancing of workloads](#auto-rebalancing-of-workloads), which may come soon.
+
+## Build up execution graph dynamically
+Before Flink 1.15, the execution graph was fully built in a static way before starting scheduling. To allow parallelisms of job vertices to be decided lazily, the execution graph must be able to be built up dynamically.
+
+### Create execution vertices and execution edges lazily
+A dynamic execution graph means that a Flink job starts with an empty execution topology, and then gradually attaches vertices during job execution, as shown in Fig. 2.
+
+The execution topology consists of execution vertices and execution edges. The execution vertices will be created and attached to the execution topology only when:
+
+- The parallelism of the corresponding job vertex is decided.
+- All upstream execution vertices are already attached.
+
+A decided parallelism of the job vertex is needed so that Flink knows how many execution vertices should be created. Upstream execution vertices need to be attached first so that Flink can connect the newly created execution vertices to the upstream vertices with execution edges. 

Review Comment:
   I changed it to "The parallelism of the job vertex needs to be decided first so that Flink knows how many execution vertices should be created"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-web] wanglijie95 commented on a diff in pull request #546: Add blogs for FLIP-187 adaptive batch scheduler

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on code in PR #546:
URL: https://github.com/apache/flink-web/pull/546#discussion_r896699847


##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,198 @@
+---
+layout: post
+title: "Automatically decide parallelism for Flink batch jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+  name: "Lijie Wang"
+- Zhu Zhu:
+  name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not easy work for many users. For batch jobs, a small parallelism may result in long execution time and big failover regression. While an unnecessary large parallelism may result in resource waste and more overhead cost in task deployment and network shuffling. 
+
+To decide a proper parallelism, one needs to know how much data each operator needs to process. However, It can be hard to predict data volume to be processed by a job because it can be different everyday. And it can be harder or even impossible (due to complex operators or UDFs) to predict data volume to be processed by each operator.
+
+To solve this problem, we introduced the adaptive batch scheduler in Flink 1.15. The adaptive batch scheduler can automatically decide parallelism for an operator according to the size of its consumed datasets. Here are the benefits the adaptive batch scheduler can bring:
+
+1. Batch job users can be relieved from parallelism tuning.
+2. Parallelism tuning is fine grained considering different operators. This is particularly beneficial for SQL jobs which can only be set with a global parallelism previously.
+3. Parallelism tuning can better fit consumed datasets which have a varying volume size every day.
+
+# Get Started
+
+To automatically decide parallelism for operators, you need to:
+
+1. Configure to use adaptive batch scheduler.
+2. Set the parallelism of operators to -1.
+
+
+## Configure to use adaptive batch scheduler
+
+To use adaptive batch scheduler, you need to set configurations as below:
+
+- Set `jobmanager.scheduler: AdaptiveBatch`.
+- Leave the [execution.batch-shuffle-mode]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#execution-batch-shuffle-mode) unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value). Currently, the adaptive batch scheduler only supports batch jobs whose shuffle mode is `ALL-EXCHANGES-BLOCKING`.
+
+In addition, there are several related configuration options to control the upper bounds and lower bounds of tuned parallelisms, to specify expected data volume to process by each operator, and to specify the default parallelism of sources. More details can be found in the [feature documentation page]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/elastic_scaling/#configure-to-use-adaptive-batch-scheduler).
+
+## Set the parallelism of operators to -1
+
+The adaptive batch scheduler will only automatically decide parallelism for operators whose parallelism is not set (which means the parallelism is -1).To leave parallelism unset, you should configure as follows:
+
+- Set `parallelism.default: -1`
+- Set `table.exec.resource.default-parallelism: -1` in SQL jobs.
+- Don't call `setParallelism()` for operators in DataStream/DataSet jobs.
+- Don't call `setParallelism()` on `StreamExecutionEnvironment/ExecutionEnvironment` in DataStream/DataSet jobs.
+
+
+# Implementation Details
+
+In this section, we will elaborate the details of the implementation. To automatically decide parallelism of operators, we introduced the following changes:
+
+1. Enables the scheduler to collect sizes of finished datasets.
+2. Introduces a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results.
+3. Enables to dynamically build up ExecutionGraph to allow the parallelisms of job vertices to be decided lazily. The execution graph starts with an empty execution topology and then gradually attaches the vertices during job execution.
+4. Introduces the adaptive batch scheduler to update and schedule the dynamic execution graph.
+
+The details will be introduced in the following sections.
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/1-overall-structure.png" width="60%"/>
+<br/>
+Fig. 1 - The overall structure of automatically deciding parallelism
+</center>
+
+<br/>
+
+## Collect sizes of consumed datasets
+
+The adaptive batch scheduler decides the parallelism of vertices by the size of input results, so the scheduler needs to know the sizes of result partitions produced by tasks. We introduced a numBytesProduced counter to record the size of each produced result partition, the accumulated result of the counter will be sent to the scheduler when tasks finish. 
+
+## Decide proper parallelisms for job vertices
+
+We introduced a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results. The computation algorithm is as follows:
+Suppose:
+
+- ***V*** is the bytes of data the user expects to be processed by each task.
+- ***totalBytes<sub>non-broadcast</sub>*** is the sum of the non-broadcast result sizes consumed by this job vertex.
+- ***totalBytes<sub>broadcast</sub>*** is the sum of the broadcast result sizes consumed by this job vertex.
+- ***broadcastCapRatio*** is the cap ratio of broadcast bytes that affects the parallelism calculation.
+- ***normalize(***x***)*** is a function that round ***x*** to the closest power of 2.
+
+then the parallelism of this job vertex ***P*** will be:
+<center>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/parallelism-formula.png" width="60%"/>
+</center>
+
+Note that we introduced two special treatment in the above formula :
+
+- [Limit the cap ratio of broadcast bytes](#limit-the-cap-ratio-of-broadcast-bytes)
+- [Normalize the parallelism to the closest power of 2](#normalize-the-parallelism-to-the-closest-power-of-2)
+
+However, the above formula cannot be used to decide the parallelism of the source vertices, because the source vertices have no input. To solve it, we introduced the configuration option `jobmanager.adaptive-batch-scheduler.default-source-parallelism` to allow users to manually configure the parallelism of source vertices. Note that not all data sources need this option, because some data sources can automatically infer parallelism (For example, HiveTableSource, see [HiveParallelismInference](https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveParallelismInference.java) for more detail). For these sources, it is recommended to decide parallelism by themselves.
+
+### Limit the cap ratio of broadcast bytes
+As you can see, we limit the cap ratio of broadcast bytes that affects the parallelism calculation to ***broadcastCapRatio***. That is, the non-broadcast bytes processed by each task is at least ***(1-broadcastCapRatio) * V***. If not so,when the total broadcast bytes is close to ***V***, even if the total non-broadcast bytes is very small, it may cause a large parallelism, which is unnecessary and may lead to resource waste and large task deployment overhead.
+
+Generally, the broadcast dataset is usually relatively small against the other co-processed datasets. Therefore, we set the cap ratio to 0.5 by default because we usually expect the broadcast bytes to be smaller than non-broadcast bytes. The value is hard coded in the first version, and we may make it configurable later.
+
+
+### Normalize the parallelism to the closest power of 2
+The normalize is to avoid introducing data skew. To better understand this section, we suggest you read the [Flexible subpartition mapping](#flexible-subpartition-mapping) section first.
+
+Taking Fig. 4 (b) as example, A1/A2 produces 4 subpartitions, and the decided parallelism of B is 3. In this case, B1 will consume 1 subpartition, B2 will consume 1 subpartition, and B3 will consume 2 subpartitions. We assume that subpartitions have the same amount of data, which means B3 will consume twice the data of other tasks, data skew is introduced due to the subpartition mapping.
+
+To solve this problem, we need to make the subpartitions evenly consumed by downstream tasks, which means the number of subpartitions should be a multiple of the number of downstream tasks. For simplicity, we require the user-specified max parallelism to be 2<sup>N</sup>, and then adjust the calculated parallelism to a closest 2<sup>M</sup> (M <= N), so that we can guarantee that subpartitions will be evenly consumed by downstream tasks.
+
+Note that this is a temporary solution, the ultimate solution would be the [Auto-rebalancing of workloads](#auto-rebalancing-of-workloads), which may come soon.
+
+## Build up execution graph dynamically
+Before Flink 1.15, the execution graph was fully built in a static way before starting scheduling. To allow parallelisms of job vertices to be decided lazily, the execution graph must be able to be built up dynamically.
+
+### Create execution vertices and execution edges lazily
+A dynamic execution graph means that a Flink job starts with an empty execution topology, and then gradually attaches vertices during job execution, as shown in Fig. 2.
+
+The execution topology consists of execution vertices and execution edges. The execution vertices will be created and attached to the execution topology only when:
+
+- The parallelism of the corresponding job vertex is decided.
+- All upstream execution vertices are already attached.
+
+A decided parallelism of the job vertex is needed so that Flink knows how many execution vertices should be created. Upstream execution vertices need to be attached first so that Flink can connect the newly created execution vertices to the upstream vertices with execution edges. 
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/2-dynamic-graph.png" width="90%"/>
+<br/>
+Fig. 2 - Build up execution graph dynamically
+</center>
+
+<br/>
+
+### Flexible subpartition mapping
+Before Flink 1.15, when deploying a task, Flink needs to know the parallelism of its consumer job vertex. This is because consumer vertex parallelism is used to decide the number of subpartitions produced by each upstream task. The reason behind that is, for one result partition, different subpartitions serve different consumer execution vertices. More specifically, one consumer execution vertex only consumes data from subpartition with the same index. 
+
+Taking Fig. 3 as example, parallelism of the consumer B is 2, so the result partition produced by A1/A2 should contain 2 subpartitions, the subpartition with index 0 serves B1, and the subpartition with index 1 serves B2.
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/3-static-graph-subpartition-mapping.png" width="30%"/>
+<br/>
+Fig. 3 - How subpartitions serve consumer execution vertices with static execution graph
+</center>
+
+<br/>
+
+But obviously, this doesn't work for dynamic graphs, because when a job vertex is deployed, the parallelism of its consumer job vertices may not have been decided yet. To enable Flink to work in this case, we need a way to allow a job vertex to run without knowing the parallelism of its consumer job vertices(or rather, we need a way to allow execution vertices to run without knowing the number of their consumer execution vertices). 
+
+To achieve this goal, we can set the number of subpartitions to be the max parallelism of the consumer job vertex. Then when the consumer execution vertices are deployed, they should be assigned with a subpartition range to consume. Suppose N is the number of consumer execution vertices and P is the number of subpartitions. For the kth consumer execution vertex, the consumed subpartition range should be:
+
+<center>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/range-formula.png" width="55%"/>
+</center>
+
+Taking Fig. 4 as example, the max parallelism of B is 4, so A1/A2 have 4 subpartitions. And then if the decided parallelism of B is 2, then the subpartitions mapping will be Fig. 4 (a), if the decided parallelism of B is 3, then the subpartitions mapping will be  Fig. 4 (b).
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/4-dynamic-graph-subpartition-mapping.png" width="75%"/>
+<br/>
+Fig. 4 - How subpartitions serve consumer execution vertices with dynamic graph
+</center>
+
+<br/>
+
+## Update and schedule the dynamic execution graph
+The adaptive batch scheduler scheduling is similar to the default scheduler, the only difference is that an empty dynamic execution graph will be generated initially and vertices will be attached later. Before handling any scheduling event, the scheduler will try deciding the parallelisms for job vertices, and then initialize them to generate execution vertices, connecting execution edges, and update the execution graph.
+
+The scheduler will try to decide the parallelism for all job vertices before each scheduling, and the parallelism decision will be made for each job vertex in topological order:

Review Comment:
   I change it to `before handling any scheduling event`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-web] zhuzhurk commented on a diff in pull request #546: Add blogs for FLIP-187 adaptive batch scheduler

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #546:
URL: https://github.com/apache/flink-web/pull/546#discussion_r899708483


##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,204 @@
+---
+layout: post
+title: "Adaptive Batch Scheduler: Automatically Decide Parallelism for Flink Batch Jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+  name: "Lijie Wang"
+- Zhu Zhu:
+  name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not an easy work for many users. For batch jobs, a small parallelism may result in long execution time and big failover regression. While an unnecessary large parallelism may result in resource waste and more overhead cost in task deployment and network shuffling. 
+
+To decide a proper parallelism, one needs to know how much data each operator needs to process. However, It can be hard to predict data volume to be processed by a job because it can be different everyday. And it can be harder or even impossible (due to complex operators or UDFs) to predict data volume to be processed by each operator.
+
+To solve this problem, we introduced the adaptive batch scheduler in Flink 1.15. The adaptive batch scheduler can automatically decide parallelism for an operator according to the size of its consumed datasets. Here are the benefits the adaptive batch scheduler can bring:
+
+1. Batch job users can be relieved from parallelism tuning.
+2. Parallelism tuning is fine grained considering different operators. This is particularly beneficial for SQL jobs which can only be set with a global parallelism previously.
+3. Parallelism tuning can better fit consumed datasets which have a varying volume size every day.
+
+# Get Started
+
+To automatically decide parallelism for operators, you need to:
+
+1. Configure to use adaptive batch scheduler.
+2. Set the parallelism of operators to -1.
+
+
+## Configure to use adaptive batch scheduler
+
+To use adaptive batch scheduler, you need to set configurations as below:
+
+- Set `jobmanager.scheduler: AdaptiveBatch`.
+- Leave the [execution.batch-shuffle-mode]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#execution-batch-shuffle-mode) unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value). Currently, the adaptive batch scheduler only supports batch jobs whose shuffle mode is `ALL-EXCHANGES-BLOCKING`.
+
+In addition, there are several related configuration options to control the upper bounds and lower bounds of tuned parallelisms, to specify expected data volume to process by each operator, and to specify the default parallelism of sources. More details can be found in the [feature documentation page]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/elastic_scaling/#configure-to-use-adaptive-batch-scheduler).
+
+## Set the parallelism of operators to -1
+
+The adaptive batch scheduler only automatically decides parallelism for operators whose parallelism is not set (which means the parallelism is -1). To leave parallelism unset, you should configure as follows:
+
+- Set `parallelism.default: -1` for all jobs.
+- Set `table.exec.resource.default-parallelism: -1` for SQL jobs.
+- Don't call `setParallelism()` for operators in DataStream/DataSet jobs.
+- Don't call `setParallelism()` on `StreamExecutionEnvironment/ExecutionEnvironment` in DataStream/DataSet jobs.
+
+
+# Implementation Details
+
+In this section, we will elaborate the details of the implementation. Before that, we need to briefly introduce some concepts involved:
+
+- [JobVertex](https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java) and [JobGraph](https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java): A job vertex is an operator chain formed by chaining several operators together for better performance. The job graph is a data flow consisting of job vertices.
+- [ExecutionVertex](https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java) and [ExecutionGraph](https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java): An execution vertex represents a parallel subtask of a job vertex, which will eventually be instantiated as a physical task. For example, a job vertex with a parallelism of 100 will generate 100 execution vertices. The execution graph is the physical execution topology consisting of all execution vertices.
+
+More details about the above concepts can be found in the [Flink documentation](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/internals/job_scheduling/#jobmanager-data-structures). Note that the adaptive batch scheduler decides the parallelism of operators by deciding the parallelism of the job vertices to which they belong. To automatically decide parallelism of job vertices, we introduced the following changes:

Review Comment:
   to which they belong -> which consist of these operators



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-web] zhuzhurk commented on a diff in pull request #546: Add blogs for FLIP-187 adaptive batch scheduler

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #546:
URL: https://github.com/apache/flink-web/pull/546#discussion_r893201499


##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,206 @@
+---
+layout: post
+title: "Automatically decide parallelism for Flink batch jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+  name: "Lijie Wang"
+- Zhu Zhu:
+  name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not easy work for many users. For batch jobs, a small parallelism may result in long execution time and big failover regression. While an unnecessary large parallelism may result in resource waste and more overhead cost in task deployment and network shuffling. 
+
+To decide a proper parallelism, one needs to know how much data each operator needs to process. However, It can be hard to predict data volume to be processed by a job because it can be different everyday. And it can be harder or even impossible (due to complex operators or UDFs) to predict data volume to be processed by each operator.
+
+To solve this problem, we introduced the adaptive batch scheduler in Flink 1.15. The adaptive batch scheduler can automatically decide parallelism for an operator according to the size of its consumed datasets. Here are the benefits the adaptive batch scheduler can bring:
+
+1. Batch job users can be relieved from parallelism tuning.
+2. Automatically tuned parallelisms can be fine grained and can better fit consumed datasets which have a varying volume size every day.
+3. Currently, operators of SQL batch jobs can only be assigned with the same parallelism.The adaptive batch scheduler allows SQL operators to be tuned with different parallelisms which are automatically tuned.
+
+# Get Started
+
+To automatically decide parallelism for operators, you need to:
+
+1. Configure to use adaptive batch scheduler.
+2. Set the parallelism of operators to -1.
+
+
+## Configure to use adaptive batch scheduler
+
+To use adaptive batch scheduler, you need to set configurations as below:
+
+- Set `jobmanager.scheduler: AdaptiveBatch`.
+- Leave the [execution.batch-shuffle-mode]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#execution-batch-shuffle-mode) unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value). Currently, the adaptive batch scheduler only supports batch jobs whose shuffle mode is `ALL-EXCHANGES-BLOCKING`.
+
+In addition, there are several related configuration options that may need adjustment when using adaptive batch scheduler:

Review Comment:
   I feel the configs listed below are too detailed and not of the first importance.
   
   I would remove them and change the above statement as: 
   In addition, there are several related configuration options to control the upper bounds and lower bounds of tuned parallelisms, to specify expected data volume to process by each operator, and to specify the default parallelism of sources. More details can be found in the [feature documentation page](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/#configure-to-use-adaptive-batch-scheduler).



##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,206 @@
+---
+layout: post
+title: "Automatically decide parallelism for Flink batch jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+  name: "Lijie Wang"
+- Zhu Zhu:
+  name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not easy work for many users. For batch jobs, a small parallelism may result in long execution time and big failover regression. While an unnecessary large parallelism may result in resource waste and more overhead cost in task deployment and network shuffling. 
+
+To decide a proper parallelism, one needs to know how much data each operator needs to process. However, It can be hard to predict data volume to be processed by a job because it can be different everyday. And it can be harder or even impossible (due to complex operators or UDFs) to predict data volume to be processed by each operator.
+
+To solve this problem, we introduced the adaptive batch scheduler in Flink 1.15. The adaptive batch scheduler can automatically decide parallelism for an operator according to the size of its consumed datasets. Here are the benefits the adaptive batch scheduler can bring:
+
+1. Batch job users can be relieved from parallelism tuning.
+2. Automatically tuned parallelisms can be fine grained and can better fit consumed datasets which have a varying volume size every day.

Review Comment:
   maybe:
   2. Parallelism tuning is fine grained considering different operators. This is particularly beneficial for SQL jobs which can only be set with a global parallelism previously.
   3. Parallelism tuning can better fit consumed datasets which have a varying volume size every day



##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,206 @@
+---
+layout: post
+title: "Automatically decide parallelism for Flink batch jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+  name: "Lijie Wang"
+- Zhu Zhu:
+  name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not easy work for many users. For batch jobs, a small parallelism may result in long execution time and big failover regression. While an unnecessary large parallelism may result in resource waste and more overhead cost in task deployment and network shuffling. 
+
+To decide a proper parallelism, one needs to know how much data each operator needs to process. However, It can be hard to predict data volume to be processed by a job because it can be different everyday. And it can be harder or even impossible (due to complex operators or UDFs) to predict data volume to be processed by each operator.
+
+To solve this problem, we introduced the adaptive batch scheduler in Flink 1.15. The adaptive batch scheduler can automatically decide parallelism for an operator according to the size of its consumed datasets. Here are the benefits the adaptive batch scheduler can bring:
+
+1. Batch job users can be relieved from parallelism tuning.
+2. Automatically tuned parallelisms can be fine grained and can better fit consumed datasets which have a varying volume size every day.
+3. Currently, operators of SQL batch jobs can only be assigned with the same parallelism.The adaptive batch scheduler allows SQL operators to be tuned with different parallelisms which are automatically tuned.
+
+# Get Started
+
+To automatically decide parallelism for operators, you need to:
+
+1. Configure to use adaptive batch scheduler.
+2. Set the parallelism of operators to -1.
+
+
+## Configure to use adaptive batch scheduler
+
+To use adaptive batch scheduler, you need to set configurations as below:
+
+- Set `jobmanager.scheduler: AdaptiveBatch`.
+- Leave the [execution.batch-shuffle-mode]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#execution-batch-shuffle-mode) unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value). Currently, the adaptive batch scheduler only supports batch jobs whose shuffle mode is `ALL-EXCHANGES-BLOCKING`.
+
+In addition, there are several related configuration options that may need adjustment when using adaptive batch scheduler:
+
+- [jobmanager.adaptive-batch-scheduler.min-parallelism]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-min-parallelism): The lower bound of allowed parallelism to set adaptively. Currently, this option should be configured as a power of 2, otherwise it will be rounded up to a power of 2 automatically.
+- [jobmanager.adaptive-batch-scheduler.max-parallelism]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-max-parallelism): The upper bound of allowed parallelism to set adaptively. Currently, this option should be configured as a power of 2, otherwise it will be rounded down to a power of 2 automatically.
+- [jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-avg-data-volume-per-task): The average size of data volume to expect each task instance to process. Note that since the parallelism of the vertices is adjusted to a power of 2, the actual average size will be 0.75~1.5 times this value. It is also important to note that when data skew occurs, or the decided parallelism reaches the max parallelism (due to too much data), the data actually processed by some tasks may far exceed this value.
+- [jobmanager.adaptive-batch-scheduler.default-source-parallelism]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-default-source-parallelism): The default parallelism of data source.
+
+
+## Set the parallelism of operators to -1
+
+The adaptive batch scheduler will only automatically decide parallelism for operators whose parallelism is not set (which means the parallelism is -1).To leave parallelism unset, you should configure as follows:
+
+- Set `parallelism.default: -1`
+- Set `table.exec.resource.default-parallelism: -1` in SQL jobs.
+- Don't call `setParallelism()` for operators in DataStream/DataSet jobs.
+- Don't call `setParallelism()` on `StreamExecutionEnvironment/ExecutionEnvironment` in DataStream/DataSet jobs.
+
+
+# Implementation Details
+
+We will elaborate the details of the implementation. To automatically decide parallelism of operators, we introduced the following changes:
+
+1. Enables the scheduler to collect sizes of finished datasets.
+2. Introduces a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results.
+3. Enables to dynamically build up ExecutionGraph to allow the parallelisms of job vertices to be decided lazily. The execution graph starts with an empty execution topology and then gradually attaches the remainder as it is scheduled.

Review Comment:
   the remainder as it is scheduled -> vertices during job execution



##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,206 @@
+---
+layout: post
+title: "Automatically decide parallelism for Flink batch jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+  name: "Lijie Wang"
+- Zhu Zhu:
+  name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not easy work for many users. For batch jobs, a small parallelism may result in long execution time and big failover regression. While an unnecessary large parallelism may result in resource waste and more overhead cost in task deployment and network shuffling. 
+
+To decide a proper parallelism, one needs to know how much data each operator needs to process. However, It can be hard to predict data volume to be processed by a job because it can be different everyday. And it can be harder or even impossible (due to complex operators or UDFs) to predict data volume to be processed by each operator.
+
+To solve this problem, we introduced the adaptive batch scheduler in Flink 1.15. The adaptive batch scheduler can automatically decide parallelism for an operator according to the size of its consumed datasets. Here are the benefits the adaptive batch scheduler can bring:
+
+1. Batch job users can be relieved from parallelism tuning.
+2. Automatically tuned parallelisms can be fine grained and can better fit consumed datasets which have a varying volume size every day.
+3. Currently, operators of SQL batch jobs can only be assigned with the same parallelism.The adaptive batch scheduler allows SQL operators to be tuned with different parallelisms which are automatically tuned.
+
+# Get Started
+
+To automatically decide parallelism for operators, you need to:
+
+1. Configure to use adaptive batch scheduler.
+2. Set the parallelism of operators to -1.
+
+
+## Configure to use adaptive batch scheduler
+
+To use adaptive batch scheduler, you need to set configurations as below:
+
+- Set `jobmanager.scheduler: AdaptiveBatch`.
+- Leave the [execution.batch-shuffle-mode]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#execution-batch-shuffle-mode) unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value). Currently, the adaptive batch scheduler only supports batch jobs whose shuffle mode is `ALL-EXCHANGES-BLOCKING`.
+
+In addition, there are several related configuration options that may need adjustment when using adaptive batch scheduler:
+
+- [jobmanager.adaptive-batch-scheduler.min-parallelism]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-min-parallelism): The lower bound of allowed parallelism to set adaptively. Currently, this option should be configured as a power of 2, otherwise it will be rounded up to a power of 2 automatically.
+- [jobmanager.adaptive-batch-scheduler.max-parallelism]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-max-parallelism): The upper bound of allowed parallelism to set adaptively. Currently, this option should be configured as a power of 2, otherwise it will be rounded down to a power of 2 automatically.
+- [jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-avg-data-volume-per-task): The average size of data volume to expect each task instance to process. Note that since the parallelism of the vertices is adjusted to a power of 2, the actual average size will be 0.75~1.5 times this value. It is also important to note that when data skew occurs, or the decided parallelism reaches the max parallelism (due to too much data), the data actually processed by some tasks may far exceed this value.
+- [jobmanager.adaptive-batch-scheduler.default-source-parallelism]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-default-source-parallelism): The default parallelism of data source.
+
+
+## Set the parallelism of operators to -1
+
+The adaptive batch scheduler will only automatically decide parallelism for operators whose parallelism is not set (which means the parallelism is -1).To leave parallelism unset, you should configure as follows:
+
+- Set `parallelism.default: -1`
+- Set `table.exec.resource.default-parallelism: -1` in SQL jobs.
+- Don't call `setParallelism()` for operators in DataStream/DataSet jobs.
+- Don't call `setParallelism()` on `StreamExecutionEnvironment/ExecutionEnvironment` in DataStream/DataSet jobs.
+
+
+# Implementation Details
+
+We will elaborate the details of the implementation. To automatically decide parallelism of operators, we introduced the following changes:
+
+1. Enables the scheduler to collect sizes of finished datasets.
+2. Introduces a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results.
+3. Enables to dynamically build up ExecutionGraph to allow the parallelisms of job vertices to be decided lazily. The execution graph starts with an empty execution topology and then gradually attaches the remainder as it is scheduled.
+4. Introduces the adaptive batch scheduler to update and schedule the dynamic execution graph.
+
+The details will be introduced in the following sections.
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/1-overall-structure.png" width="60%"/>
+<br/>
+Fig. 1 - The overall structure of automatically deciding parallelism
+</center>
+
+<br/>
+
+## Collect sizes of consumed datasets
+
+The adaptive batch scheduler decides the parallelism of vertices by the size of input results, so the scheduler needs to know the sizes of result partitions produced by tasks. We introduced a numBytesProduced counter to record the size of each produced result partition, the accumulated result of the counter will be sent to the scheduler when tasks finish. 
+
+## Decide proper parallelisms for job vertices
+
+We introduced a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results. The computation algorithm is as follows:
+Suppose:
+
+- ***V*** is the bytes of data to be processed by each task.
+- ***totalBytes<sub>non-broadcast</sub>*** is the sum of the non-broadcast result sizes consumed by this job vertex.
+- ***totalBytes<sub>broadcast</sub>*** is the sum of the broadcast result sizes consumed by this job vertex.
+- ***broadcastCapRatio*** is the cap ratio of broadcast bytes that affects the parallelism calculation.
+- ***normalize(***x***)*** is a function that round ***x*** to the closest power of 2.
+
+then the parallelism of this job vertex ***P*** will be:
+<center>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/parallelism-formula.png" width="60%"/>
+</center>
+
+Note that we introduced two special treatment in the above formula :
+
+- [Limit the cap ratio of broadcast bytes](#limit-the-cap-ratio-of-broadcast-bytes)
+- [Normalize the parallelism to the closest power of 2](#normalize-the-parallelism-to-the-closest-power-of-2)
+
+However, the above formula cannot be used to decide the parallelism of the source vertices, because the source vertices have no input. To solve it, we introduced the configuration option "jobmanager.adaptive-batch-scheduler.default-source-parallelism"  to allow users to manually configure the parallelism of source vertices. Note that not all data sources need this option, because some data sources can automatically infer parallelism (For example, HiveTableSource, see [HiveParallelismInference](https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveParallelismInference.java) for more detail). For these sources, it is recommended to decide parallelism by themselves.
+
+### Limit the cap ratio of broadcast bytes
+As you can see, we limit the cap ratio of broadcast bytes that affects the parallelism calculation to ***broadcastCapRatio***. That is, the non-broadcast bytes processed by each task is at least ***(1-broadcastCapRatio) * V***. If not so,when the total broadcast bytes is close to ***V***, even if the total non-broadcast bytes is very small, it may cause a large parallelism, which is unnecessary and may lead to resource waste and large task deployment overhead.
+
+Generally, the broadcast dataset is usually relatively small against the other co-processed datasets. Therefore, we set the cap ratio to 0.5 by default because we usually expect the broadcast bytes to be smaller than non-broadcast bytes. The value is hard coded in the first version, and we may make it configurable later.
+
+
+### Normalize the parallelism to the closest power of 2
+The normalize is to avoid introducing data skew. To better understand this section, we suggest you read the [Flexible subpartition mapping](#flexible-subpartition-mapping) section first.
+
+Taking Fig. 4 (b) as example, A1/A2 produces 4 subpartitions, and the decided parallelism of B is 3. In this case, B1 will consume 1 subpartition, B2 will consume 1 subpartition, and B3 will consume 2 subpartitions. We assume that subpartitions have the same amount of data, which means B3 will consume twice the data of other tasks, data skew is introduced due to the subpartition mapping.
+
+To solve this problem, we need to make the subpartitions evenly consumed by downstream tasks, which means the number of subpartitions should be a multiple of the number of downstream tasks. For simplicity, we require the user-specified max parallelism to be 2<sup>N</sup>, and then adjust the calculated parallelism to a closest 2<sup>M</sup> (M <= N), so that we can guarantee that subpartitions will be evenly consumed by downstream tasks.
+
+Note that this is a temporary solution, the ultimate solution would be the [Auto-rebalancing of workloads](#auto-rebalancing-of-workloads), which may come soon.
+
+## Build up execution graph dynamically
+Before Flink 1.15, the execution graph was fully built in a static way before starting scheduling. To allow parallelisms of job vertices to be decided lazily, the execution graph must be able to be built up dynamically.
+
+### Create execution vertices and execution edges lazily
+A dynamic execution graph means that a Flink job starts with an empty execution topology, and then gradually attachs the remainder as it is scheduled, as shown in Fig. 2.

Review Comment:
   > , 
   
   Wrong type comma



##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,206 @@
+---
+layout: post
+title: "Automatically decide parallelism for Flink batch jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+  name: "Lijie Wang"
+- Zhu Zhu:
+  name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not easy work for many users. For batch jobs, a small parallelism may result in long execution time and big failover regression. While an unnecessary large parallelism may result in resource waste and more overhead cost in task deployment and network shuffling. 
+
+To decide a proper parallelism, one needs to know how much data each operator needs to process. However, It can be hard to predict data volume to be processed by a job because it can be different everyday. And it can be harder or even impossible (due to complex operators or UDFs) to predict data volume to be processed by each operator.
+
+To solve this problem, we introduced the adaptive batch scheduler in Flink 1.15. The adaptive batch scheduler can automatically decide parallelism for an operator according to the size of its consumed datasets. Here are the benefits the adaptive batch scheduler can bring:
+
+1. Batch job users can be relieved from parallelism tuning.
+2. Automatically tuned parallelisms can be fine grained and can better fit consumed datasets which have a varying volume size every day.
+3. Currently, operators of SQL batch jobs can only be assigned with the same parallelism.The adaptive batch scheduler allows SQL operators to be tuned with different parallelisms which are automatically tuned.
+
+# Get Started
+
+To automatically decide parallelism for operators, you need to:
+
+1. Configure to use adaptive batch scheduler.
+2. Set the parallelism of operators to -1.
+
+
+## Configure to use adaptive batch scheduler
+
+To use adaptive batch scheduler, you need to set configurations as below:
+
+- Set `jobmanager.scheduler: AdaptiveBatch`.
+- Leave the [execution.batch-shuffle-mode]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#execution-batch-shuffle-mode) unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value). Currently, the adaptive batch scheduler only supports batch jobs whose shuffle mode is `ALL-EXCHANGES-BLOCKING`.
+
+In addition, there are several related configuration options that may need adjustment when using adaptive batch scheduler:
+
+- [jobmanager.adaptive-batch-scheduler.min-parallelism]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-min-parallelism): The lower bound of allowed parallelism to set adaptively. Currently, this option should be configured as a power of 2, otherwise it will be rounded up to a power of 2 automatically.
+- [jobmanager.adaptive-batch-scheduler.max-parallelism]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-max-parallelism): The upper bound of allowed parallelism to set adaptively. Currently, this option should be configured as a power of 2, otherwise it will be rounded down to a power of 2 automatically.
+- [jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-avg-data-volume-per-task): The average size of data volume to expect each task instance to process. Note that since the parallelism of the vertices is adjusted to a power of 2, the actual average size will be 0.75~1.5 times this value. It is also important to note that when data skew occurs, or the decided parallelism reaches the max parallelism (due to too much data), the data actually processed by some tasks may far exceed this value.
+- [jobmanager.adaptive-batch-scheduler.default-source-parallelism]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-default-source-parallelism): The default parallelism of data source.
+
+
+## Set the parallelism of operators to -1
+
+The adaptive batch scheduler will only automatically decide parallelism for operators whose parallelism is not set (which means the parallelism is -1).To leave parallelism unset, you should configure as follows:
+
+- Set `parallelism.default: -1`
+- Set `table.exec.resource.default-parallelism: -1` in SQL jobs.
+- Don't call `setParallelism()` for operators in DataStream/DataSet jobs.
+- Don't call `setParallelism()` on `StreamExecutionEnvironment/ExecutionEnvironment` in DataStream/DataSet jobs.
+
+
+# Implementation Details
+
+We will elaborate the details of the implementation. To automatically decide parallelism of operators, we introduced the following changes:
+
+1. Enables the scheduler to collect sizes of finished datasets.
+2. Introduces a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results.
+3. Enables to dynamically build up ExecutionGraph to allow the parallelisms of job vertices to be decided lazily. The execution graph starts with an empty execution topology and then gradually attaches the remainder as it is scheduled.
+4. Introduces the adaptive batch scheduler to update and schedule the dynamic execution graph.
+
+The details will be introduced in the following sections.
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/1-overall-structure.png" width="60%"/>
+<br/>
+Fig. 1 - The overall structure of automatically deciding parallelism
+</center>
+
+<br/>
+
+## Collect sizes of consumed datasets
+
+The adaptive batch scheduler decides the parallelism of vertices by the size of input results, so the scheduler needs to know the sizes of result partitions produced by tasks. We introduced a numBytesProduced counter to record the size of each produced result partition, the accumulated result of the counter will be sent to the scheduler when tasks finish. 
+
+## Decide proper parallelisms for job vertices
+
+We introduced a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results. The computation algorithm is as follows:
+Suppose:
+
+- ***V*** is the bytes of data to be processed by each task.
+- ***totalBytes<sub>non-broadcast</sub>*** is the sum of the non-broadcast result sizes consumed by this job vertex.
+- ***totalBytes<sub>broadcast</sub>*** is the sum of the broadcast result sizes consumed by this job vertex.
+- ***broadcastCapRatio*** is the cap ratio of broadcast bytes that affects the parallelism calculation.
+- ***normalize(***x***)*** is a function that round ***x*** to the closest power of 2.
+
+then the parallelism of this job vertex ***P*** will be:
+<center>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/parallelism-formula.png" width="60%"/>
+</center>
+
+Note that we introduced two special treatment in the above formula :
+
+- [Limit the cap ratio of broadcast bytes](#limit-the-cap-ratio-of-broadcast-bytes)
+- [Normalize the parallelism to the closest power of 2](#normalize-the-parallelism-to-the-closest-power-of-2)
+
+However, the above formula cannot be used to decide the parallelism of the source vertices, because the source vertices have no input. To solve it, we introduced the configuration option "jobmanager.adaptive-batch-scheduler.default-source-parallelism"  to allow users to manually configure the parallelism of source vertices. Note that not all data sources need this option, because some data sources can automatically infer parallelism (For example, HiveTableSource, see [HiveParallelismInference](https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveParallelismInference.java) for more detail). For these sources, it is recommended to decide parallelism by themselves.

Review Comment:
   > parallelism"  to
   
   duplicated spaces



##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,206 @@
+---
+layout: post
+title: "Automatically decide parallelism for Flink batch jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+  name: "Lijie Wang"
+- Zhu Zhu:
+  name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not easy work for many users. For batch jobs, a small parallelism may result in long execution time and big failover regression. While an unnecessary large parallelism may result in resource waste and more overhead cost in task deployment and network shuffling. 
+
+To decide a proper parallelism, one needs to know how much data each operator needs to process. However, It can be hard to predict data volume to be processed by a job because it can be different everyday. And it can be harder or even impossible (due to complex operators or UDFs) to predict data volume to be processed by each operator.
+
+To solve this problem, we introduced the adaptive batch scheduler in Flink 1.15. The adaptive batch scheduler can automatically decide parallelism for an operator according to the size of its consumed datasets. Here are the benefits the adaptive batch scheduler can bring:
+
+1. Batch job users can be relieved from parallelism tuning.
+2. Automatically tuned parallelisms can be fine grained and can better fit consumed datasets which have a varying volume size every day.
+3. Currently, operators of SQL batch jobs can only be assigned with the same parallelism.The adaptive batch scheduler allows SQL operators to be tuned with different parallelisms which are automatically tuned.
+
+# Get Started
+
+To automatically decide parallelism for operators, you need to:
+
+1. Configure to use adaptive batch scheduler.
+2. Set the parallelism of operators to -1.
+
+
+## Configure to use adaptive batch scheduler
+
+To use adaptive batch scheduler, you need to set configurations as below:
+
+- Set `jobmanager.scheduler: AdaptiveBatch`.
+- Leave the [execution.batch-shuffle-mode]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#execution-batch-shuffle-mode) unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value). Currently, the adaptive batch scheduler only supports batch jobs whose shuffle mode is `ALL-EXCHANGES-BLOCKING`.
+
+In addition, there are several related configuration options that may need adjustment when using adaptive batch scheduler:
+
+- [jobmanager.adaptive-batch-scheduler.min-parallelism]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-min-parallelism): The lower bound of allowed parallelism to set adaptively. Currently, this option should be configured as a power of 2, otherwise it will be rounded up to a power of 2 automatically.
+- [jobmanager.adaptive-batch-scheduler.max-parallelism]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-max-parallelism): The upper bound of allowed parallelism to set adaptively. Currently, this option should be configured as a power of 2, otherwise it will be rounded down to a power of 2 automatically.
+- [jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-avg-data-volume-per-task): The average size of data volume to expect each task instance to process. Note that since the parallelism of the vertices is adjusted to a power of 2, the actual average size will be 0.75~1.5 times this value. It is also important to note that when data skew occurs, or the decided parallelism reaches the max parallelism (due to too much data), the data actually processed by some tasks may far exceed this value.
+- [jobmanager.adaptive-batch-scheduler.default-source-parallelism]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-default-source-parallelism): The default parallelism of data source.
+
+
+## Set the parallelism of operators to -1
+
+The adaptive batch scheduler will only automatically decide parallelism for operators whose parallelism is not set (which means the parallelism is -1).To leave parallelism unset, you should configure as follows:
+
+- Set `parallelism.default: -1`
+- Set `table.exec.resource.default-parallelism: -1` in SQL jobs.
+- Don't call `setParallelism()` for operators in DataStream/DataSet jobs.
+- Don't call `setParallelism()` on `StreamExecutionEnvironment/ExecutionEnvironment` in DataStream/DataSet jobs.
+
+
+# Implementation Details
+
+We will elaborate the details of the implementation. To automatically decide parallelism of operators, we introduced the following changes:
+
+1. Enables the scheduler to collect sizes of finished datasets.
+2. Introduces a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results.
+3. Enables to dynamically build up ExecutionGraph to allow the parallelisms of job vertices to be decided lazily. The execution graph starts with an empty execution topology and then gradually attaches the remainder as it is scheduled.
+4. Introduces the adaptive batch scheduler to update and schedule the dynamic execution graph.
+
+The details will be introduced in the following sections.
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/1-overall-structure.png" width="60%"/>
+<br/>
+Fig. 1 - The overall structure of automatically deciding parallelism
+</center>
+
+<br/>
+
+## Collect sizes of consumed datasets
+
+The adaptive batch scheduler decides the parallelism of vertices by the size of input results, so the scheduler needs to know the sizes of result partitions produced by tasks. We introduced a numBytesProduced counter to record the size of each produced result partition, the accumulated result of the counter will be sent to the scheduler when tasks finish. 
+
+## Decide proper parallelisms for job vertices
+
+We introduced a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results. The computation algorithm is as follows:
+Suppose:
+
+- ***V*** is the bytes of data to be processed by each task.
+- ***totalBytes<sub>non-broadcast</sub>*** is the sum of the non-broadcast result sizes consumed by this job vertex.
+- ***totalBytes<sub>broadcast</sub>*** is the sum of the broadcast result sizes consumed by this job vertex.
+- ***broadcastCapRatio*** is the cap ratio of broadcast bytes that affects the parallelism calculation.
+- ***normalize(***x***)*** is a function that round ***x*** to the closest power of 2.
+
+then the parallelism of this job vertex ***P*** will be:
+<center>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/parallelism-formula.png" width="60%"/>
+</center>
+
+Note that we introduced two special treatment in the above formula :
+
+- [Limit the cap ratio of broadcast bytes](#limit-the-cap-ratio-of-broadcast-bytes)
+- [Normalize the parallelism to the closest power of 2](#normalize-the-parallelism-to-the-closest-power-of-2)
+
+However, the above formula cannot be used to decide the parallelism of the source vertices, because the source vertices have no input. To solve it, we introduced the configuration option "jobmanager.adaptive-batch-scheduler.default-source-parallelism"  to allow users to manually configure the parallelism of source vertices. Note that not all data sources need this option, because some data sources can automatically infer parallelism (For example, HiveTableSource, see [HiveParallelismInference](https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveParallelismInference.java) for more detail). For these sources, it is recommended to decide parallelism by themselves.
+
+### Limit the cap ratio of broadcast bytes
+As you can see, we limit the cap ratio of broadcast bytes that affects the parallelism calculation to ***broadcastCapRatio***. That is, the non-broadcast bytes processed by each task is at least ***(1-broadcastCapRatio) * V***. If not so,when the total broadcast bytes is close to ***V***, even if the total non-broadcast bytes is very small, it may cause a large parallelism, which is unnecessary and may lead to resource waste and large task deployment overhead.
+
+Generally, the broadcast dataset is usually relatively small against the other co-processed datasets. Therefore, we set the cap ratio to 0.5 by default because we usually expect the broadcast bytes to be smaller than non-broadcast bytes. The value is hard coded in the first version, and we may make it configurable later.
+
+
+### Normalize the parallelism to the closest power of 2
+The normalize is to avoid introducing data skew. To better understand this section, we suggest you read the [Flexible subpartition mapping](#flexible-subpartition-mapping) section first.
+
+Taking Fig. 4 (b) as example, A1/A2 produces 4 subpartitions, and the decided parallelism of B is 3. In this case, B1 will consume 1 subpartition, B2 will consume 1 subpartition, and B3 will consume 2 subpartitions. We assume that subpartitions have the same amount of data, which means B3 will consume twice the data of other tasks, data skew is introduced due to the subpartition mapping.
+
+To solve this problem, we need to make the subpartitions evenly consumed by downstream tasks, which means the number of subpartitions should be a multiple of the number of downstream tasks. For simplicity, we require the user-specified max parallelism to be 2<sup>N</sup>, and then adjust the calculated parallelism to a closest 2<sup>M</sup> (M <= N), so that we can guarantee that subpartitions will be evenly consumed by downstream tasks.
+
+Note that this is a temporary solution, the ultimate solution would be the [Auto-rebalancing of workloads](#auto-rebalancing-of-workloads), which may come soon.
+
+## Build up execution graph dynamically
+Before Flink 1.15, the execution graph was fully built in a static way before starting scheduling. To allow parallelisms of job vertices to be decided lazily, the execution graph must be able to be built up dynamically.
+
+### Create execution vertices and execution edges lazily
+A dynamic execution graph means that a Flink job starts with an empty execution topology, and then gradually attachs the remainder as it is scheduled, as shown in Fig. 2.

Review Comment:
   attachs the remainder as it is scheduled -> attaches  vertices during job execution



##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,206 @@
+---
+layout: post
+title: "Automatically decide parallelism for Flink batch jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+  name: "Lijie Wang"
+- Zhu Zhu:
+  name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not easy work for many users. For batch jobs, a small parallelism may result in long execution time and big failover regression. While an unnecessary large parallelism may result in resource waste and more overhead cost in task deployment and network shuffling. 
+
+To decide a proper parallelism, one needs to know how much data each operator needs to process. However, It can be hard to predict data volume to be processed by a job because it can be different everyday. And it can be harder or even impossible (due to complex operators or UDFs) to predict data volume to be processed by each operator.
+
+To solve this problem, we introduced the adaptive batch scheduler in Flink 1.15. The adaptive batch scheduler can automatically decide parallelism for an operator according to the size of its consumed datasets. Here are the benefits the adaptive batch scheduler can bring:
+
+1. Batch job users can be relieved from parallelism tuning.
+2. Automatically tuned parallelisms can be fine grained and can better fit consumed datasets which have a varying volume size every day.
+3. Currently, operators of SQL batch jobs can only be assigned with the same parallelism.The adaptive batch scheduler allows SQL operators to be tuned with different parallelisms which are automatically tuned.
+
+# Get Started
+
+To automatically decide parallelism for operators, you need to:
+
+1. Configure to use adaptive batch scheduler.
+2. Set the parallelism of operators to -1.
+
+
+## Configure to use adaptive batch scheduler
+
+To use adaptive batch scheduler, you need to set configurations as below:
+
+- Set `jobmanager.scheduler: AdaptiveBatch`.
+- Leave the [execution.batch-shuffle-mode]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#execution-batch-shuffle-mode) unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value). Currently, the adaptive batch scheduler only supports batch jobs whose shuffle mode is `ALL-EXCHANGES-BLOCKING`.
+
+In addition, there are several related configuration options that may need adjustment when using adaptive batch scheduler:
+
+- [jobmanager.adaptive-batch-scheduler.min-parallelism]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-min-parallelism): The lower bound of allowed parallelism to set adaptively. Currently, this option should be configured as a power of 2, otherwise it will be rounded up to a power of 2 automatically.
+- [jobmanager.adaptive-batch-scheduler.max-parallelism]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-max-parallelism): The upper bound of allowed parallelism to set adaptively. Currently, this option should be configured as a power of 2, otherwise it will be rounded down to a power of 2 automatically.
+- [jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-avg-data-volume-per-task): The average size of data volume to expect each task instance to process. Note that since the parallelism of the vertices is adjusted to a power of 2, the actual average size will be 0.75~1.5 times this value. It is also important to note that when data skew occurs, or the decided parallelism reaches the max parallelism (due to too much data), the data actually processed by some tasks may far exceed this value.
+- [jobmanager.adaptive-batch-scheduler.default-source-parallelism]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-default-source-parallelism): The default parallelism of data source.
+
+
+## Set the parallelism of operators to -1
+
+The adaptive batch scheduler will only automatically decide parallelism for operators whose parallelism is not set (which means the parallelism is -1).To leave parallelism unset, you should configure as follows:
+
+- Set `parallelism.default: -1`
+- Set `table.exec.resource.default-parallelism: -1` in SQL jobs.
+- Don't call `setParallelism()` for operators in DataStream/DataSet jobs.
+- Don't call `setParallelism()` on `StreamExecutionEnvironment/ExecutionEnvironment` in DataStream/DataSet jobs.
+
+
+# Implementation Details
+
+We will elaborate the details of the implementation. To automatically decide parallelism of operators, we introduced the following changes:
+
+1. Enables the scheduler to collect sizes of finished datasets.
+2. Introduces a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results.
+3. Enables to dynamically build up ExecutionGraph to allow the parallelisms of job vertices to be decided lazily. The execution graph starts with an empty execution topology and then gradually attaches the remainder as it is scheduled.
+4. Introduces the adaptive batch scheduler to update and schedule the dynamic execution graph.
+
+The details will be introduced in the following sections.
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/1-overall-structure.png" width="60%"/>
+<br/>
+Fig. 1 - The overall structure of automatically deciding parallelism
+</center>
+
+<br/>
+
+## Collect sizes of consumed datasets
+
+The adaptive batch scheduler decides the parallelism of vertices by the size of input results, so the scheduler needs to know the sizes of result partitions produced by tasks. We introduced a numBytesProduced counter to record the size of each produced result partition, the accumulated result of the counter will be sent to the scheduler when tasks finish. 
+
+## Decide proper parallelisms for job vertices
+
+We introduced a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results. The computation algorithm is as follows:
+Suppose:
+
+- ***V*** is the bytes of data to be processed by each task.
+- ***totalBytes<sub>non-broadcast</sub>*** is the sum of the non-broadcast result sizes consumed by this job vertex.
+- ***totalBytes<sub>broadcast</sub>*** is the sum of the broadcast result sizes consumed by this job vertex.
+- ***broadcastCapRatio*** is the cap ratio of broadcast bytes that affects the parallelism calculation.
+- ***normalize(***x***)*** is a function that round ***x*** to the closest power of 2.
+
+then the parallelism of this job vertex ***P*** will be:
+<center>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/parallelism-formula.png" width="60%"/>
+</center>
+
+Note that we introduced two special treatment in the above formula :
+
+- [Limit the cap ratio of broadcast bytes](#limit-the-cap-ratio-of-broadcast-bytes)
+- [Normalize the parallelism to the closest power of 2](#normalize-the-parallelism-to-the-closest-power-of-2)
+
+However, the above formula cannot be used to decide the parallelism of the source vertices, because the source vertices have no input. To solve it, we introduced the configuration option "jobmanager.adaptive-batch-scheduler.default-source-parallelism"  to allow users to manually configure the parallelism of source vertices. Note that not all data sources need this option, because some data sources can automatically infer parallelism (For example, HiveTableSource, see [HiveParallelismInference](https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveParallelismInference.java) for more detail). For these sources, it is recommended to decide parallelism by themselves.
+
+### Limit the cap ratio of broadcast bytes
+As you can see, we limit the cap ratio of broadcast bytes that affects the parallelism calculation to ***broadcastCapRatio***. That is, the non-broadcast bytes processed by each task is at least ***(1-broadcastCapRatio) * V***. If not so,when the total broadcast bytes is close to ***V***, even if the total non-broadcast bytes is very small, it may cause a large parallelism, which is unnecessary and may lead to resource waste and large task deployment overhead.
+
+Generally, the broadcast dataset is usually relatively small against the other co-processed datasets. Therefore, we set the cap ratio to 0.5 by default because we usually expect the broadcast bytes to be smaller than non-broadcast bytes. The value is hard coded in the first version, and we may make it configurable later.
+
+
+### Normalize the parallelism to the closest power of 2
+The normalize is to avoid introducing data skew. To better understand this section, we suggest you read the [Flexible subpartition mapping](#flexible-subpartition-mapping) section first.
+
+Taking Fig. 4 (b) as example, A1/A2 produces 4 subpartitions, and the decided parallelism of B is 3. In this case, B1 will consume 1 subpartition, B2 will consume 1 subpartition, and B3 will consume 2 subpartitions. We assume that subpartitions have the same amount of data, which means B3 will consume twice the data of other tasks, data skew is introduced due to the subpartition mapping.
+
+To solve this problem, we need to make the subpartitions evenly consumed by downstream tasks, which means the number of subpartitions should be a multiple of the number of downstream tasks. For simplicity, we require the user-specified max parallelism to be 2<sup>N</sup>, and then adjust the calculated parallelism to a closest 2<sup>M</sup> (M <= N), so that we can guarantee that subpartitions will be evenly consumed by downstream tasks.
+
+Note that this is a temporary solution, the ultimate solution would be the [Auto-rebalancing of workloads](#auto-rebalancing-of-workloads), which may come soon.
+
+## Build up execution graph dynamically
+Before Flink 1.15, the execution graph was fully built in a static way before starting scheduling. To allow parallelisms of job vertices to be decided lazily, the execution graph must be able to be built up dynamically.
+
+### Create execution vertices and execution edges lazily
+A dynamic execution graph means that a Flink job starts with an empty execution topology, and then gradually attachs the remainder as it is scheduled, as shown in Fig. 2.
+
+The execution topology consists of execution vertices and execution edges. The execution vertices will be created and attached to the execution topology only when:
+
+- The parallelism of the corresponding job vertex is decided.
+- All upstream execution vertices are already attached.
+
+A decided parallelism of the job vertex is needed so that Flink knows how many execution vertices should be created. Upstream execution vertices need to be attached first so that Flink can connect the newly created execution vertices to the upstream vertices with execution edges. 
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/2-dynamic-graph.png" width="90%"/>
+<br/>
+Fig. 2 - Build up execution graph dynamically
+</center>
+
+<br/>
+
+### Flexible subpartition mapping
+Before Flink 1.15, when deploying a task, Flink needs to know the parallelism of its consumer job vertex. This is because consumer vertex parallelism is used to decide the number of subpartitions produced by each upstream task. The reason behind that is, for one result partition, different subpartitions serve different consumer execution vertices. More specifically, one consumer execution vertex only consumes data from subpartition with the same index. 
+
+Taking Fig. 3 as example, parallelism of the consumer B is 2, so the result partition produced by A1/A2 should contain 2 subpartitions, the subpartition with index 0 serves B1, and the subpartition with index 1 serves B2.
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/3-static-graph-subpartition-mapping.png" width="30%"/>
+<br/>
+Fig. 3 - How subpartitions serve consumer execution vertices with static execution graph
+</center>
+
+<br/>
+
+But obviously, this doesn't work for dynamic graphs, because when a job vertex is deployed, the parallelism of its consumer job vertices may not have been decided yet. To enable Flink to work in this case, we need a way to allow a job vertex to run without knowing the parallelism of its consumer job vertices(or rather, we need a way to allow execution vertices to run without knowing the number of their consumer execution vertices). 
+
+To achieve this goal, we can set the number of subpartitions to be the max parallelism of the consumer job vertex. Then when the consumer execution vertices are deployed, they should be assigned with a subpartition range to consume. Suppose N is the number of consumer execution vertices and P is the number of subpartitions. For the kth consumer execution vertex, the consumed subpartition range should be:
+
+<center>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/range-formula.png" width="55%"/>
+</center>
+
+Taking Fig. 4 as example, the max parallelism of B is 4, so A1/A2 have 4 subpartitions. And then if the decided parallelism of B is 2, then the subpartitions mapping will be Fig. 4 (a), if the decided parallelism of B is 3, then the subpartitions mapping will be  Fig. 4 (b).
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/4-dynamic-graph-subpartition-mapping.png" width="75%"/>
+<br/>
+Fig. 4 - How subpartitions serve consumer execution vertices with dynamic graph
+</center>
+
+<br/>
+
+For broadcast results, we can't directly use the above formula, but need to do some special processing. Because for the broadcast results, each subpartition contains all the data to broadcast in the result partition, consuming multiple subpartitions means consuming a record multiple times, which is not correct. Therefore, we propose that the number of subpartitions for broadcast partitions should always be 1, and then all downstream tasks consume this single subpartition(The partition range of the broadcast partition is always set to 0).
+
+## Update and schedule the dynamic execution graph
+The adaptive batch scheduler scheduling is similar to the default scheduler, the only difference is that an empty dynamic execution graph will be generated initially and vertices will be attached later. Before handling any scheduling event, the scheduler will try deciding the parallelisms for job vertices, and then initialize them to generate execution vertices, connecting execution edges, and update the execution graph.
+
+The scheduler will try to decide the parallelism for all job vertices before each scheduling, and the parallelism decision will be made for each job vertex in topological order:
+
+- For source vertices, the parallelism should have been decided before starting scheduling. 
+- For non-source vertices, the parallelism can be decided only when all its consumed results are fully finished.
+
+After trying to decide the parallelism for all job vertices, the scheduler will try to initialize the job vertices according to the topological order. A job vertex that can be initialized should meet the following conditions:

Review Comment:
   according to the -> in



##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,206 @@
+---
+layout: post
+title: "Automatically decide parallelism for Flink batch jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+  name: "Lijie Wang"
+- Zhu Zhu:
+  name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not easy work for many users. For batch jobs, a small parallelism may result in long execution time and big failover regression. While an unnecessary large parallelism may result in resource waste and more overhead cost in task deployment and network shuffling. 
+
+To decide a proper parallelism, one needs to know how much data each operator needs to process. However, It can be hard to predict data volume to be processed by a job because it can be different everyday. And it can be harder or even impossible (due to complex operators or UDFs) to predict data volume to be processed by each operator.
+
+To solve this problem, we introduced the adaptive batch scheduler in Flink 1.15. The adaptive batch scheduler can automatically decide parallelism for an operator according to the size of its consumed datasets. Here are the benefits the adaptive batch scheduler can bring:
+
+1. Batch job users can be relieved from parallelism tuning.
+2. Automatically tuned parallelisms can be fine grained and can better fit consumed datasets which have a varying volume size every day.
+3. Currently, operators of SQL batch jobs can only be assigned with the same parallelism.The adaptive batch scheduler allows SQL operators to be tuned with different parallelisms which are automatically tuned.
+
+# Get Started
+
+To automatically decide parallelism for operators, you need to:
+
+1. Configure to use adaptive batch scheduler.
+2. Set the parallelism of operators to -1.
+
+
+## Configure to use adaptive batch scheduler
+
+To use adaptive batch scheduler, you need to set configurations as below:
+
+- Set `jobmanager.scheduler: AdaptiveBatch`.
+- Leave the [execution.batch-shuffle-mode]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#execution-batch-shuffle-mode) unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value). Currently, the adaptive batch scheduler only supports batch jobs whose shuffle mode is `ALL-EXCHANGES-BLOCKING`.
+
+In addition, there are several related configuration options that may need adjustment when using adaptive batch scheduler:
+
+- [jobmanager.adaptive-batch-scheduler.min-parallelism]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-min-parallelism): The lower bound of allowed parallelism to set adaptively. Currently, this option should be configured as a power of 2, otherwise it will be rounded up to a power of 2 automatically.
+- [jobmanager.adaptive-batch-scheduler.max-parallelism]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-max-parallelism): The upper bound of allowed parallelism to set adaptively. Currently, this option should be configured as a power of 2, otherwise it will be rounded down to a power of 2 automatically.
+- [jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-avg-data-volume-per-task): The average size of data volume to expect each task instance to process. Note that since the parallelism of the vertices is adjusted to a power of 2, the actual average size will be 0.75~1.5 times this value. It is also important to note that when data skew occurs, or the decided parallelism reaches the max parallelism (due to too much data), the data actually processed by some tasks may far exceed this value.
+- [jobmanager.adaptive-batch-scheduler.default-source-parallelism]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-default-source-parallelism): The default parallelism of data source.
+
+
+## Set the parallelism of operators to -1
+
+The adaptive batch scheduler will only automatically decide parallelism for operators whose parallelism is not set (which means the parallelism is -1).To leave parallelism unset, you should configure as follows:
+
+- Set `parallelism.default: -1`
+- Set `table.exec.resource.default-parallelism: -1` in SQL jobs.
+- Don't call `setParallelism()` for operators in DataStream/DataSet jobs.
+- Don't call `setParallelism()` on `StreamExecutionEnvironment/ExecutionEnvironment` in DataStream/DataSet jobs.
+
+
+# Implementation Details
+
+We will elaborate the details of the implementation. To automatically decide parallelism of operators, we introduced the following changes:

Review Comment:
   Maybe add "In this section, we will ..."



##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,206 @@
+---
+layout: post
+title: "Automatically decide parallelism for Flink batch jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+  name: "Lijie Wang"
+- Zhu Zhu:
+  name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not easy work for many users. For batch jobs, a small parallelism may result in long execution time and big failover regression. While an unnecessary large parallelism may result in resource waste and more overhead cost in task deployment and network shuffling. 
+
+To decide a proper parallelism, one needs to know how much data each operator needs to process. However, It can be hard to predict data volume to be processed by a job because it can be different everyday. And it can be harder or even impossible (due to complex operators or UDFs) to predict data volume to be processed by each operator.
+
+To solve this problem, we introduced the adaptive batch scheduler in Flink 1.15. The adaptive batch scheduler can automatically decide parallelism for an operator according to the size of its consumed datasets. Here are the benefits the adaptive batch scheduler can bring:
+
+1. Batch job users can be relieved from parallelism tuning.
+2. Automatically tuned parallelisms can be fine grained and can better fit consumed datasets which have a varying volume size every day.
+3. Currently, operators of SQL batch jobs can only be assigned with the same parallelism.The adaptive batch scheduler allows SQL operators to be tuned with different parallelisms which are automatically tuned.
+
+# Get Started
+
+To automatically decide parallelism for operators, you need to:
+
+1. Configure to use adaptive batch scheduler.
+2. Set the parallelism of operators to -1.
+
+
+## Configure to use adaptive batch scheduler
+
+To use adaptive batch scheduler, you need to set configurations as below:
+
+- Set `jobmanager.scheduler: AdaptiveBatch`.
+- Leave the [execution.batch-shuffle-mode]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#execution-batch-shuffle-mode) unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value). Currently, the adaptive batch scheduler only supports batch jobs whose shuffle mode is `ALL-EXCHANGES-BLOCKING`.
+
+In addition, there are several related configuration options that may need adjustment when using adaptive batch scheduler:
+
+- [jobmanager.adaptive-batch-scheduler.min-parallelism]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-min-parallelism): The lower bound of allowed parallelism to set adaptively. Currently, this option should be configured as a power of 2, otherwise it will be rounded up to a power of 2 automatically.
+- [jobmanager.adaptive-batch-scheduler.max-parallelism]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-max-parallelism): The upper bound of allowed parallelism to set adaptively. Currently, this option should be configured as a power of 2, otherwise it will be rounded down to a power of 2 automatically.
+- [jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-avg-data-volume-per-task): The average size of data volume to expect each task instance to process. Note that since the parallelism of the vertices is adjusted to a power of 2, the actual average size will be 0.75~1.5 times this value. It is also important to note that when data skew occurs, or the decided parallelism reaches the max parallelism (due to too much data), the data actually processed by some tasks may far exceed this value.
+- [jobmanager.adaptive-batch-scheduler.default-source-parallelism]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-default-source-parallelism): The default parallelism of data source.
+
+
+## Set the parallelism of operators to -1
+
+The adaptive batch scheduler will only automatically decide parallelism for operators whose parallelism is not set (which means the parallelism is -1).To leave parallelism unset, you should configure as follows:
+
+- Set `parallelism.default: -1`
+- Set `table.exec.resource.default-parallelism: -1` in SQL jobs.
+- Don't call `setParallelism()` for operators in DataStream/DataSet jobs.
+- Don't call `setParallelism()` on `StreamExecutionEnvironment/ExecutionEnvironment` in DataStream/DataSet jobs.
+
+
+# Implementation Details
+
+We will elaborate the details of the implementation. To automatically decide parallelism of operators, we introduced the following changes:
+
+1. Enables the scheduler to collect sizes of finished datasets.
+2. Introduces a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results.
+3. Enables to dynamically build up ExecutionGraph to allow the parallelisms of job vertices to be decided lazily. The execution graph starts with an empty execution topology and then gradually attaches the remainder as it is scheduled.
+4. Introduces the adaptive batch scheduler to update and schedule the dynamic execution graph.
+
+The details will be introduced in the following sections.
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/1-overall-structure.png" width="60%"/>
+<br/>
+Fig. 1 - The overall structure of automatically deciding parallelism
+</center>
+
+<br/>
+
+## Collect sizes of consumed datasets
+
+The adaptive batch scheduler decides the parallelism of vertices by the size of input results, so the scheduler needs to know the sizes of result partitions produced by tasks. We introduced a numBytesProduced counter to record the size of each produced result partition, the accumulated result of the counter will be sent to the scheduler when tasks finish. 
+
+## Decide proper parallelisms for job vertices
+
+We introduced a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results. The computation algorithm is as follows:
+Suppose:
+
+- ***V*** is the bytes of data to be processed by each task.
+- ***totalBytes<sub>non-broadcast</sub>*** is the sum of the non-broadcast result sizes consumed by this job vertex.
+- ***totalBytes<sub>broadcast</sub>*** is the sum of the broadcast result sizes consumed by this job vertex.
+- ***broadcastCapRatio*** is the cap ratio of broadcast bytes that affects the parallelism calculation.
+- ***normalize(***x***)*** is a function that round ***x*** to the closest power of 2.
+
+then the parallelism of this job vertex ***P*** will be:
+<center>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/parallelism-formula.png" width="60%"/>
+</center>
+
+Note that we introduced two special treatment in the above formula :
+
+- [Limit the cap ratio of broadcast bytes](#limit-the-cap-ratio-of-broadcast-bytes)
+- [Normalize the parallelism to the closest power of 2](#normalize-the-parallelism-to-the-closest-power-of-2)
+
+However, the above formula cannot be used to decide the parallelism of the source vertices, because the source vertices have no input. To solve it, we introduced the configuration option "jobmanager.adaptive-batch-scheduler.default-source-parallelism"  to allow users to manually configure the parallelism of source vertices. Note that not all data sources need this option, because some data sources can automatically infer parallelism (For example, HiveTableSource, see [HiveParallelismInference](https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveParallelismInference.java) for more detail). For these sources, it is recommended to decide parallelism by themselves.
+
+### Limit the cap ratio of broadcast bytes
+As you can see, we limit the cap ratio of broadcast bytes that affects the parallelism calculation to ***broadcastCapRatio***. That is, the non-broadcast bytes processed by each task is at least ***(1-broadcastCapRatio) * V***. If not so,when the total broadcast bytes is close to ***V***, even if the total non-broadcast bytes is very small, it may cause a large parallelism, which is unnecessary and may lead to resource waste and large task deployment overhead.
+
+Generally, the broadcast dataset is usually relatively small against the other co-processed datasets. Therefore, we set the cap ratio to 0.5 by default because we usually expect the broadcast bytes to be smaller than non-broadcast bytes. The value is hard coded in the first version, and we may make it configurable later.
+
+
+### Normalize the parallelism to the closest power of 2
+The normalize is to avoid introducing data skew. To better understand this section, we suggest you read the [Flexible subpartition mapping](#flexible-subpartition-mapping) section first.
+
+Taking Fig. 4 (b) as example, A1/A2 produces 4 subpartitions, and the decided parallelism of B is 3. In this case, B1 will consume 1 subpartition, B2 will consume 1 subpartition, and B3 will consume 2 subpartitions. We assume that subpartitions have the same amount of data, which means B3 will consume twice the data of other tasks, data skew is introduced due to the subpartition mapping.
+
+To solve this problem, we need to make the subpartitions evenly consumed by downstream tasks, which means the number of subpartitions should be a multiple of the number of downstream tasks. For simplicity, we require the user-specified max parallelism to be 2<sup>N</sup>, and then adjust the calculated parallelism to a closest 2<sup>M</sup> (M <= N), so that we can guarantee that subpartitions will be evenly consumed by downstream tasks.
+
+Note that this is a temporary solution, the ultimate solution would be the [Auto-rebalancing of workloads](#auto-rebalancing-of-workloads), which may come soon.
+
+## Build up execution graph dynamically
+Before Flink 1.15, the execution graph was fully built in a static way before starting scheduling. To allow parallelisms of job vertices to be decided lazily, the execution graph must be able to be built up dynamically.
+
+### Create execution vertices and execution edges lazily
+A dynamic execution graph means that a Flink job starts with an empty execution topology, and then gradually attachs the remainder as it is scheduled, as shown in Fig. 2.
+
+The execution topology consists of execution vertices and execution edges. The execution vertices will be created and attached to the execution topology only when:
+
+- The parallelism of the corresponding job vertex is decided.
+- All upstream execution vertices are already attached.
+
+A decided parallelism of the job vertex is needed so that Flink knows how many execution vertices should be created. Upstream execution vertices need to be attached first so that Flink can connect the newly created execution vertices to the upstream vertices with execution edges. 
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/2-dynamic-graph.png" width="90%"/>
+<br/>
+Fig. 2 - Build up execution graph dynamically
+</center>
+
+<br/>
+
+### Flexible subpartition mapping
+Before Flink 1.15, when deploying a task, Flink needs to know the parallelism of its consumer job vertex. This is because consumer vertex parallelism is used to decide the number of subpartitions produced by each upstream task. The reason behind that is, for one result partition, different subpartitions serve different consumer execution vertices. More specifically, one consumer execution vertex only consumes data from subpartition with the same index. 
+
+Taking Fig. 3 as example, parallelism of the consumer B is 2, so the result partition produced by A1/A2 should contain 2 subpartitions, the subpartition with index 0 serves B1, and the subpartition with index 1 serves B2.
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/3-static-graph-subpartition-mapping.png" width="30%"/>
+<br/>
+Fig. 3 - How subpartitions serve consumer execution vertices with static execution graph
+</center>
+
+<br/>
+
+But obviously, this doesn't work for dynamic graphs, because when a job vertex is deployed, the parallelism of its consumer job vertices may not have been decided yet. To enable Flink to work in this case, we need a way to allow a job vertex to run without knowing the parallelism of its consumer job vertices(or rather, we need a way to allow execution vertices to run without knowing the number of their consumer execution vertices). 
+
+To achieve this goal, we can set the number of subpartitions to be the max parallelism of the consumer job vertex. Then when the consumer execution vertices are deployed, they should be assigned with a subpartition range to consume. Suppose N is the number of consumer execution vertices and P is the number of subpartitions. For the kth consumer execution vertex, the consumed subpartition range should be:
+
+<center>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/range-formula.png" width="55%"/>
+</center>
+
+Taking Fig. 4 as example, the max parallelism of B is 4, so A1/A2 have 4 subpartitions. And then if the decided parallelism of B is 2, then the subpartitions mapping will be Fig. 4 (a), if the decided parallelism of B is 3, then the subpartitions mapping will be  Fig. 4 (b).
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/4-dynamic-graph-subpartition-mapping.png" width="75%"/>
+<br/>
+Fig. 4 - How subpartitions serve consumer execution vertices with dynamic graph
+</center>
+
+<br/>
+
+For broadcast results, we can't directly use the above formula, but need to do some special processing. Because for the broadcast results, each subpartition contains all the data to broadcast in the result partition, consuming multiple subpartitions means consuming a record multiple times, which is not correct. Therefore, we propose that the number of subpartitions for broadcast partitions should always be 1, and then all downstream tasks consume this single subpartition(The partition range of the broadcast partition is always set to 0).

Review Comment:
   to do some special processing -> a special process



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-web] wanglijie95 commented on pull request #546: Add blogs for FLIP-187 adaptive batch scheduler

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on PR #546:
URL: https://github.com/apache/flink-web/pull/546#issuecomment-1151839659

   @zhuzhurk Thanks for review. I have updated the pull request according to your comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-web] wanglijie95 commented on a diff in pull request #546: Add blogs for FLIP-187 adaptive batch scheduler

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on code in PR #546:
URL: https://github.com/apache/flink-web/pull/546#discussion_r896635172


##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,198 @@
+---
+layout: post
+title: "Automatically decide parallelism for Flink batch jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+  name: "Lijie Wang"
+- Zhu Zhu:
+  name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not easy work for many users. For batch jobs, a small parallelism may result in long execution time and big failover regression. While an unnecessary large parallelism may result in resource waste and more overhead cost in task deployment and network shuffling. 

Review Comment:
   Fixed



##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,198 @@
+---
+layout: post
+title: "Automatically decide parallelism for Flink batch jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+  name: "Lijie Wang"
+- Zhu Zhu:
+  name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not easy work for many users. For batch jobs, a small parallelism may result in long execution time and big failover regression. While an unnecessary large parallelism may result in resource waste and more overhead cost in task deployment and network shuffling. 
+
+To decide a proper parallelism, one needs to know how much data each operator needs to process. However, It can be hard to predict data volume to be processed by a job because it can be different everyday. And it can be harder or even impossible (due to complex operators or UDFs) to predict data volume to be processed by each operator.
+
+To solve this problem, we introduced the adaptive batch scheduler in Flink 1.15. The adaptive batch scheduler can automatically decide parallelism for an operator according to the size of its consumed datasets. Here are the benefits the adaptive batch scheduler can bring:
+
+1. Batch job users can be relieved from parallelism tuning.
+2. Parallelism tuning is fine grained considering different operators. This is particularly beneficial for SQL jobs which can only be set with a global parallelism previously.
+3. Parallelism tuning can better fit consumed datasets which have a varying volume size every day.
+
+# Get Started
+
+To automatically decide parallelism for operators, you need to:
+
+1. Configure to use adaptive batch scheduler.
+2. Set the parallelism of operators to -1.
+
+
+## Configure to use adaptive batch scheduler
+
+To use adaptive batch scheduler, you need to set configurations as below:
+
+- Set `jobmanager.scheduler: AdaptiveBatch`.
+- Leave the [execution.batch-shuffle-mode]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#execution-batch-shuffle-mode) unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value). Currently, the adaptive batch scheduler only supports batch jobs whose shuffle mode is `ALL-EXCHANGES-BLOCKING`.
+
+In addition, there are several related configuration options to control the upper bounds and lower bounds of tuned parallelisms, to specify expected data volume to process by each operator, and to specify the default parallelism of sources. More details can be found in the [feature documentation page]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/elastic_scaling/#configure-to-use-adaptive-batch-scheduler).
+
+## Set the parallelism of operators to -1
+
+The adaptive batch scheduler will only automatically decide parallelism for operators whose parallelism is not set (which means the parallelism is -1).To leave parallelism unset, you should configure as follows:

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-web] wanglijie95 closed pull request #546: Add blogs for FLIP-187 adaptive batch scheduler

Posted by GitBox <gi...@apache.org>.
wanglijie95 closed pull request #546: Add blogs for FLIP-187 adaptive batch scheduler
URL: https://github.com/apache/flink-web/pull/546


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-web] wanglijie95 commented on a diff in pull request #546: Add blogs for FLIP-187 adaptive batch scheduler

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on code in PR #546:
URL: https://github.com/apache/flink-web/pull/546#discussion_r898969850


##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -54,11 +54,16 @@ The adaptive batch scheduler only automatically decides parallelism for operator
 
 # Implementation Details
 
-In this section, we will elaborate the details of the implementation. To automatically decide parallelism of operators, we introduced the following changes:
+In this section, we will elaborate the details of the implementation. Before that, we need to briefly introduce some concepts involved:
+
+- [JobVertex](https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java) and [JobGraph](https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java): A job vertex is an operator chain formed by chaining several operators together for better performance. The job graph is a data flow consisting of job vertices.
+- [ExecutionVertex](https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java) and [ExecutionGraph](https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java): An execution vertex represents a parallel subtask of a job vertex, which will eventually be instantiated as a physical task. For example, a job vertex with a parallelism of 100 will generate 100 execution vertices. The execution graph is the physical execution topology consisting of all execution vertices.
+
+More details about the above concepts can be found in the [Flink documentation](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/internals/job_scheduling/#jobmanager-data-structures). To be precise, the adaptive batch scheduler actually automatically decides the parallelism of job vertices (in the previous sections, in order not to introduce more concepts, **operator** was used to refer to **job vertex**, but they are actually slightly different). We introduced the following changes to automatically decide parallelism of job vertices:

Review Comment:
   Fixed



##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -116,7 +121,7 @@ To solve this problem, we need to make the subpartitions evenly consumed by down
 Note that this is a temporary solution, the ultimate solution would be the [Auto-rebalancing of workloads](#auto-rebalancing-of-workloads), which may come soon.
 
 ## Build up execution graph dynamically
-Before Flink 1.15, the execution graph was fully built in a static way before starting scheduling. To allow parallelisms of job vertices to be decided lazily, the execution graph must be able to be built up dynamically.
+Before the introduction of adaptive batch scheduler, the execution graph was fully built in a static way before starting scheduling. To allow parallelisms of job vertices to be decided lazily, the execution graph must be able to be built up dynamically.

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-web] wanglijie95 commented on a diff in pull request #546: Add blogs for FLIP-187 adaptive batch scheduler

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on code in PR #546:
URL: https://github.com/apache/flink-web/pull/546#discussion_r896639134


##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,198 @@
+---
+layout: post
+title: "Automatically decide parallelism for Flink batch jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+  name: "Lijie Wang"
+- Zhu Zhu:
+  name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not easy work for many users. For batch jobs, a small parallelism may result in long execution time and big failover regression. While an unnecessary large parallelism may result in resource waste and more overhead cost in task deployment and network shuffling. 
+
+To decide a proper parallelism, one needs to know how much data each operator needs to process. However, It can be hard to predict data volume to be processed by a job because it can be different everyday. And it can be harder or even impossible (due to complex operators or UDFs) to predict data volume to be processed by each operator.
+
+To solve this problem, we introduced the adaptive batch scheduler in Flink 1.15. The adaptive batch scheduler can automatically decide parallelism for an operator according to the size of its consumed datasets. Here are the benefits the adaptive batch scheduler can bring:
+
+1. Batch job users can be relieved from parallelism tuning.
+2. Parallelism tuning is fine grained considering different operators. This is particularly beneficial for SQL jobs which can only be set with a global parallelism previously.
+3. Parallelism tuning can better fit consumed datasets which have a varying volume size every day.
+
+# Get Started
+
+To automatically decide parallelism for operators, you need to:
+
+1. Configure to use adaptive batch scheduler.
+2. Set the parallelism of operators to -1.
+
+
+## Configure to use adaptive batch scheduler
+
+To use adaptive batch scheduler, you need to set configurations as below:
+
+- Set `jobmanager.scheduler: AdaptiveBatch`.
+- Leave the [execution.batch-shuffle-mode]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#execution-batch-shuffle-mode) unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value). Currently, the adaptive batch scheduler only supports batch jobs whose shuffle mode is `ALL-EXCHANGES-BLOCKING`.
+
+In addition, there are several related configuration options to control the upper bounds and lower bounds of tuned parallelisms, to specify expected data volume to process by each operator, and to specify the default parallelism of sources. More details can be found in the [feature documentation page]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/elastic_scaling/#configure-to-use-adaptive-batch-scheduler).
+
+## Set the parallelism of operators to -1
+
+The adaptive batch scheduler will only automatically decide parallelism for operators whose parallelism is not set (which means the parallelism is -1).To leave parallelism unset, you should configure as follows:
+
+- Set `parallelism.default: -1`
+- Set `table.exec.resource.default-parallelism: -1` in SQL jobs.
+- Don't call `setParallelism()` for operators in DataStream/DataSet jobs.
+- Don't call `setParallelism()` on `StreamExecutionEnvironment/ExecutionEnvironment` in DataStream/DataSet jobs.
+
+
+# Implementation Details
+
+In this section, we will elaborate the details of the implementation. To automatically decide parallelism of operators, we introduced the following changes:
+
+1. Enables the scheduler to collect sizes of finished datasets.
+2. Introduces a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results.
+3. Enables to dynamically build up ExecutionGraph to allow the parallelisms of job vertices to be decided lazily. The execution graph starts with an empty execution topology and then gradually attaches the vertices during job execution.
+4. Introduces the adaptive batch scheduler to update and schedule the dynamic execution graph.
+
+The details will be introduced in the following sections.
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/1-overall-structure.png" width="60%"/>
+<br/>
+Fig. 1 - The overall structure of automatically deciding parallelism
+</center>
+
+<br/>
+
+## Collect sizes of consumed datasets
+
+The adaptive batch scheduler decides the parallelism of vertices by the size of input results, so the scheduler needs to know the sizes of result partitions produced by tasks. We introduced a numBytesProduced counter to record the size of each produced result partition, the accumulated result of the counter will be sent to the scheduler when tasks finish. 
+
+## Decide proper parallelisms for job vertices
+
+We introduced a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results. The computation algorithm is as follows:
+Suppose:
+
+- ***V*** is the bytes of data the user expects to be processed by each task.
+- ***totalBytes<sub>non-broadcast</sub>*** is the sum of the non-broadcast result sizes consumed by this job vertex.
+- ***totalBytes<sub>broadcast</sub>*** is the sum of the broadcast result sizes consumed by this job vertex.
+- ***broadcastCapRatio*** is the cap ratio of broadcast bytes that affects the parallelism calculation.
+- ***normalize(***x***)*** is a function that round ***x*** to the closest power of 2.
+
+then the parallelism of this job vertex ***P*** will be:
+<center>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/parallelism-formula.png" width="60%"/>
+</center>
+
+Note that we introduced two special treatment in the above formula :
+
+- [Limit the cap ratio of broadcast bytes](#limit-the-cap-ratio-of-broadcast-bytes)
+- [Normalize the parallelism to the closest power of 2](#normalize-the-parallelism-to-the-closest-power-of-2)
+
+However, the above formula cannot be used to decide the parallelism of the source vertices, because the source vertices have no input. To solve it, we introduced the configuration option `jobmanager.adaptive-batch-scheduler.default-source-parallelism` to allow users to manually configure the parallelism of source vertices. Note that not all data sources need this option, because some data sources can automatically infer parallelism (For example, HiveTableSource, see [HiveParallelismInference](https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveParallelismInference.java) for more detail). For these sources, it is recommended to decide parallelism by themselves.
+
+### Limit the cap ratio of broadcast bytes
+As you can see, we limit the cap ratio of broadcast bytes that affects the parallelism calculation to ***broadcastCapRatio***. That is, the non-broadcast bytes processed by each task is at least ***(1-broadcastCapRatio) * V***. If not so,when the total broadcast bytes is close to ***V***, even if the total non-broadcast bytes is very small, it may cause a large parallelism, which is unnecessary and may lead to resource waste and large task deployment overhead.
+
+Generally, the broadcast dataset is usually relatively small against the other co-processed datasets. Therefore, we set the cap ratio to 0.5 by default because we usually expect the broadcast bytes to be smaller than non-broadcast bytes. The value is hard coded in the first version, and we may make it configurable later.
+
+
+### Normalize the parallelism to the closest power of 2
+The normalize is to avoid introducing data skew. To better understand this section, we suggest you read the [Flexible subpartition mapping](#flexible-subpartition-mapping) section first.
+
+Taking Fig. 4 (b) as example, A1/A2 produces 4 subpartitions, and the decided parallelism of B is 3. In this case, B1 will consume 1 subpartition, B2 will consume 1 subpartition, and B3 will consume 2 subpartitions. We assume that subpartitions have the same amount of data, which means B3 will consume twice the data of other tasks, data skew is introduced due to the subpartition mapping.
+
+To solve this problem, we need to make the subpartitions evenly consumed by downstream tasks, which means the number of subpartitions should be a multiple of the number of downstream tasks. For simplicity, we require the user-specified max parallelism to be 2<sup>N</sup>, and then adjust the calculated parallelism to a closest 2<sup>M</sup> (M <= N), so that we can guarantee that subpartitions will be evenly consumed by downstream tasks.
+
+Note that this is a temporary solution, the ultimate solution would be the [Auto-rebalancing of workloads](#auto-rebalancing-of-workloads), which may come soon.
+
+## Build up execution graph dynamically
+Before Flink 1.15, the execution graph was fully built in a static way before starting scheduling. To allow parallelisms of job vertices to be decided lazily, the execution graph must be able to be built up dynamically.
+
+### Create execution vertices and execution edges lazily
+A dynamic execution graph means that a Flink job starts with an empty execution topology, and then gradually attaches vertices during job execution, as shown in Fig. 2.
+
+The execution topology consists of execution vertices and execution edges. The execution vertices will be created and attached to the execution topology only when:
+
+- The parallelism of the corresponding job vertex is decided.
+- All upstream execution vertices are already attached.
+
+A decided parallelism of the job vertex is needed so that Flink knows how many execution vertices should be created. Upstream execution vertices need to be attached first so that Flink can connect the newly created execution vertices to the upstream vertices with execution edges. 
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/2-dynamic-graph.png" width="90%"/>
+<br/>
+Fig. 2 - Build up execution graph dynamically
+</center>
+
+<br/>
+
+### Flexible subpartition mapping
+Before Flink 1.15, when deploying a task, Flink needs to know the parallelism of its consumer job vertex. This is because consumer vertex parallelism is used to decide the number of subpartitions produced by each upstream task. The reason behind that is, for one result partition, different subpartitions serve different consumer execution vertices. More specifically, one consumer execution vertex only consumes data from subpartition with the same index. 
+
+Taking Fig. 3 as example, parallelism of the consumer B is 2, so the result partition produced by A1/A2 should contain 2 subpartitions, the subpartition with index 0 serves B1, and the subpartition with index 1 serves B2.
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/3-static-graph-subpartition-mapping.png" width="30%"/>
+<br/>
+Fig. 3 - How subpartitions serve consumer execution vertices with static execution graph
+</center>
+
+<br/>
+
+But obviously, this doesn't work for dynamic graphs, because when a job vertex is deployed, the parallelism of its consumer job vertices may not have been decided yet. To enable Flink to work in this case, we need a way to allow a job vertex to run without knowing the parallelism of its consumer job vertices(or rather, we need a way to allow execution vertices to run without knowing the number of their consumer execution vertices). 
+
+To achieve this goal, we can set the number of subpartitions to be the max parallelism of the consumer job vertex. Then when the consumer execution vertices are deployed, they should be assigned with a subpartition range to consume. Suppose N is the number of consumer execution vertices and P is the number of subpartitions. For the kth consumer execution vertex, the consumed subpartition range should be:
+
+<center>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/range-formula.png" width="55%"/>
+</center>
+
+Taking Fig. 4 as example, the max parallelism of B is 4, so A1/A2 have 4 subpartitions. And then if the decided parallelism of B is 2, then the subpartitions mapping will be Fig. 4 (a), if the decided parallelism of B is 3, then the subpartitions mapping will be  Fig. 4 (b).
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/4-dynamic-graph-subpartition-mapping.png" width="75%"/>
+<br/>
+Fig. 4 - How subpartitions serve consumer execution vertices with dynamic graph
+</center>
+
+<br/>
+
+## Update and schedule the dynamic execution graph
+The adaptive batch scheduler scheduling is similar to the default scheduler, the only difference is that an empty dynamic execution graph will be generated initially and vertices will be attached later. Before handling any scheduling event, the scheduler will try deciding the parallelisms for job vertices, and then initialize them to generate execution vertices, connecting execution edges, and update the execution graph.
+
+The scheduler will try to decide the parallelism for all job vertices before each scheduling, and the parallelism decision will be made for each job vertex in topological order:
+
+- For source vertices, the parallelism should have been decided before starting scheduling. 
+- For non-source vertices, the parallelism can be decided only when all its consumed results are fully finished.

Review Comment:
   I change it to "when all its consumed results are fully produced"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-web] wanglijie95 commented on a diff in pull request #546: Add blogs for FLIP-187 adaptive batch scheduler

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on code in PR #546:
URL: https://github.com/apache/flink-web/pull/546#discussion_r896636348


##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,198 @@
+---
+layout: post
+title: "Automatically decide parallelism for Flink batch jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+  name: "Lijie Wang"
+- Zhu Zhu:
+  name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not easy work for many users. For batch jobs, a small parallelism may result in long execution time and big failover regression. While an unnecessary large parallelism may result in resource waste and more overhead cost in task deployment and network shuffling. 
+
+To decide a proper parallelism, one needs to know how much data each operator needs to process. However, It can be hard to predict data volume to be processed by a job because it can be different everyday. And it can be harder or even impossible (due to complex operators or UDFs) to predict data volume to be processed by each operator.
+
+To solve this problem, we introduced the adaptive batch scheduler in Flink 1.15. The adaptive batch scheduler can automatically decide parallelism for an operator according to the size of its consumed datasets. Here are the benefits the adaptive batch scheduler can bring:
+
+1. Batch job users can be relieved from parallelism tuning.
+2. Parallelism tuning is fine grained considering different operators. This is particularly beneficial for SQL jobs which can only be set with a global parallelism previously.
+3. Parallelism tuning can better fit consumed datasets which have a varying volume size every day.
+
+# Get Started
+
+To automatically decide parallelism for operators, you need to:
+
+1. Configure to use adaptive batch scheduler.
+2. Set the parallelism of operators to -1.
+
+
+## Configure to use adaptive batch scheduler
+
+To use adaptive batch scheduler, you need to set configurations as below:
+
+- Set `jobmanager.scheduler: AdaptiveBatch`.
+- Leave the [execution.batch-shuffle-mode]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#execution-batch-shuffle-mode) unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value). Currently, the adaptive batch scheduler only supports batch jobs whose shuffle mode is `ALL-EXCHANGES-BLOCKING`.
+
+In addition, there are several related configuration options to control the upper bounds and lower bounds of tuned parallelisms, to specify expected data volume to process by each operator, and to specify the default parallelism of sources. More details can be found in the [feature documentation page]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/elastic_scaling/#configure-to-use-adaptive-batch-scheduler).
+
+## Set the parallelism of operators to -1
+
+The adaptive batch scheduler will only automatically decide parallelism for operators whose parallelism is not set (which means the parallelism is -1).To leave parallelism unset, you should configure as follows:
+
+- Set `parallelism.default: -1`
+- Set `table.exec.resource.default-parallelism: -1` in SQL jobs.
+- Don't call `setParallelism()` for operators in DataStream/DataSet jobs.
+- Don't call `setParallelism()` on `StreamExecutionEnvironment/ExecutionEnvironment` in DataStream/DataSet jobs.
+
+
+# Implementation Details
+
+In this section, we will elaborate the details of the implementation. To automatically decide parallelism of operators, we introduced the following changes:
+
+1. Enables the scheduler to collect sizes of finished datasets.

Review Comment:
   Fixed



##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,198 @@
+---
+layout: post
+title: "Automatically decide parallelism for Flink batch jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+  name: "Lijie Wang"
+- Zhu Zhu:
+  name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not easy work for many users. For batch jobs, a small parallelism may result in long execution time and big failover regression. While an unnecessary large parallelism may result in resource waste and more overhead cost in task deployment and network shuffling. 
+
+To decide a proper parallelism, one needs to know how much data each operator needs to process. However, It can be hard to predict data volume to be processed by a job because it can be different everyday. And it can be harder or even impossible (due to complex operators or UDFs) to predict data volume to be processed by each operator.
+
+To solve this problem, we introduced the adaptive batch scheduler in Flink 1.15. The adaptive batch scheduler can automatically decide parallelism for an operator according to the size of its consumed datasets. Here are the benefits the adaptive batch scheduler can bring:
+
+1. Batch job users can be relieved from parallelism tuning.
+2. Parallelism tuning is fine grained considering different operators. This is particularly beneficial for SQL jobs which can only be set with a global parallelism previously.
+3. Parallelism tuning can better fit consumed datasets which have a varying volume size every day.
+
+# Get Started
+
+To automatically decide parallelism for operators, you need to:
+
+1. Configure to use adaptive batch scheduler.
+2. Set the parallelism of operators to -1.
+
+
+## Configure to use adaptive batch scheduler
+
+To use adaptive batch scheduler, you need to set configurations as below:
+
+- Set `jobmanager.scheduler: AdaptiveBatch`.
+- Leave the [execution.batch-shuffle-mode]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#execution-batch-shuffle-mode) unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value). Currently, the adaptive batch scheduler only supports batch jobs whose shuffle mode is `ALL-EXCHANGES-BLOCKING`.
+
+In addition, there are several related configuration options to control the upper bounds and lower bounds of tuned parallelisms, to specify expected data volume to process by each operator, and to specify the default parallelism of sources. More details can be found in the [feature documentation page]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/elastic_scaling/#configure-to-use-adaptive-batch-scheduler).
+
+## Set the parallelism of operators to -1
+
+The adaptive batch scheduler will only automatically decide parallelism for operators whose parallelism is not set (which means the parallelism is -1).To leave parallelism unset, you should configure as follows:
+
+- Set `parallelism.default: -1`
+- Set `table.exec.resource.default-parallelism: -1` in SQL jobs.
+- Don't call `setParallelism()` for operators in DataStream/DataSet jobs.
+- Don't call `setParallelism()` on `StreamExecutionEnvironment/ExecutionEnvironment` in DataStream/DataSet jobs.
+
+
+# Implementation Details
+
+In this section, we will elaborate the details of the implementation. To automatically decide parallelism of operators, we introduced the following changes:
+
+1. Enables the scheduler to collect sizes of finished datasets.
+2. Introduces a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results.

Review Comment:
   I add a link for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-web] wanglijie95 commented on a diff in pull request #546: Add blogs for FLIP-187 adaptive batch scheduler

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on code in PR #546:
URL: https://github.com/apache/flink-web/pull/546#discussion_r896615871


##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,198 @@
+---
+layout: post
+title: "Automatically decide parallelism for Flink batch jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+  name: "Lijie Wang"
+- Zhu Zhu:
+  name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not easy work for many users. For batch jobs, a small parallelism may result in long execution time and big failover regression. While an unnecessary large parallelism may result in resource waste and more overhead cost in task deployment and network shuffling. 
+
+To decide a proper parallelism, one needs to know how much data each operator needs to process. However, It can be hard to predict data volume to be processed by a job because it can be different everyday. And it can be harder or even impossible (due to complex operators or UDFs) to predict data volume to be processed by each operator.
+
+To solve this problem, we introduced the adaptive batch scheduler in Flink 1.15. The adaptive batch scheduler can automatically decide parallelism for an operator according to the size of its consumed datasets. Here are the benefits the adaptive batch scheduler can bring:
+
+1. Batch job users can be relieved from parallelism tuning.
+2. Parallelism tuning is fine grained considering different operators. This is particularly beneficial for SQL jobs which can only be set with a global parallelism previously.
+3. Parallelism tuning can better fit consumed datasets which have a varying volume size every day.
+
+# Get Started
+
+To automatically decide parallelism for operators, you need to:
+
+1. Configure to use adaptive batch scheduler.
+2. Set the parallelism of operators to -1.
+
+
+## Configure to use adaptive batch scheduler
+
+To use adaptive batch scheduler, you need to set configurations as below:
+
+- Set `jobmanager.scheduler: AdaptiveBatch`.
+- Leave the [execution.batch-shuffle-mode]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#execution-batch-shuffle-mode) unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value). Currently, the adaptive batch scheduler only supports batch jobs whose shuffle mode is `ALL-EXCHANGES-BLOCKING`.
+
+In addition, there are several related configuration options to control the upper bounds and lower bounds of tuned parallelisms, to specify expected data volume to process by each operator, and to specify the default parallelism of sources. More details can be found in the [feature documentation page]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/elastic_scaling/#configure-to-use-adaptive-batch-scheduler).
+
+## Set the parallelism of operators to -1
+
+The adaptive batch scheduler will only automatically decide parallelism for operators whose parallelism is not set (which means the parallelism is -1).To leave parallelism unset, you should configure as follows:
+
+- Set `parallelism.default: -1`
+- Set `table.exec.resource.default-parallelism: -1` in SQL jobs.
+- Don't call `setParallelism()` for operators in DataStream/DataSet jobs.
+- Don't call `setParallelism()` on `StreamExecutionEnvironment/ExecutionEnvironment` in DataStream/DataSet jobs.
+
+
+# Implementation Details
+
+In this section, we will elaborate the details of the implementation. To automatically decide parallelism of operators, we introduced the following changes:
+
+1. Enables the scheduler to collect sizes of finished datasets.
+2. Introduces a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results.
+3. Enables to dynamically build up ExecutionGraph to allow the parallelisms of job vertices to be decided lazily. The execution graph starts with an empty execution topology and then gradually attaches the vertices during job execution.
+4. Introduces the adaptive batch scheduler to update and schedule the dynamic execution graph.
+
+The details will be introduced in the following sections.
+
+<center>
+<br/>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/1-overall-structure.png" width="60%"/>
+<br/>
+Fig. 1 - The overall structure of automatically deciding parallelism
+</center>
+
+<br/>
+
+## Collect sizes of consumed datasets
+
+The adaptive batch scheduler decides the parallelism of vertices by the size of input results, so the scheduler needs to know the sizes of result partitions produced by tasks. We introduced a numBytesProduced counter to record the size of each produced result partition, the accumulated result of the counter will be sent to the scheduler when tasks finish. 
+
+## Decide proper parallelisms for job vertices
+
+We introduced a new component(VertexParallelismDecider) to compute proper parallelisms for job vertices according to the sizes of their consumed results. The computation algorithm is as follows:
+Suppose:
+
+- ***V*** is the bytes of data the user expects to be processed by each task.
+- ***totalBytes<sub>non-broadcast</sub>*** is the sum of the non-broadcast result sizes consumed by this job vertex.
+- ***totalBytes<sub>broadcast</sub>*** is the sum of the broadcast result sizes consumed by this job vertex.
+- ***broadcastCapRatio*** is the cap ratio of broadcast bytes that affects the parallelism calculation.
+- ***normalize(***x***)*** is a function that round ***x*** to the closest power of 2.
+
+then the parallelism of this job vertex ***P*** will be:
+<center>
+<img src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/parallelism-formula.png" width="60%"/>
+</center>
+
+Note that we introduced two special treatment in the above formula :
+
+- [Limit the cap ratio of broadcast bytes](#limit-the-cap-ratio-of-broadcast-bytes)
+- [Normalize the parallelism to the closest power of 2](#normalize-the-parallelism-to-the-closest-power-of-2)
+
+However, the above formula cannot be used to decide the parallelism of the source vertices, because the source vertices have no input. To solve it, we introduced the configuration option `jobmanager.adaptive-batch-scheduler.default-source-parallelism` to allow users to manually configure the parallelism of source vertices. Note that not all data sources need this option, because some data sources can automatically infer parallelism (For example, HiveTableSource, see [HiveParallelismInference](https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveParallelismInference.java) for more detail). For these sources, it is recommended to decide parallelism by themselves.
+
+### Limit the cap ratio of broadcast bytes
+As you can see, we limit the cap ratio of broadcast bytes that affects the parallelism calculation to ***broadcastCapRatio***. That is, the non-broadcast bytes processed by each task is at least ***(1-broadcastCapRatio) * V***. If not so,when the total broadcast bytes is close to ***V***, even if the total non-broadcast bytes is very small, it may cause a large parallelism, which is unnecessary and may lead to resource waste and large task deployment overhead.
+
+Generally, the broadcast dataset is usually relatively small against the other co-processed datasets. Therefore, we set the cap ratio to 0.5 by default because we usually expect the broadcast bytes to be smaller than non-broadcast bytes. The value is hard coded in the first version, and we may make it configurable later.
+
+
+### Normalize the parallelism to the closest power of 2
+The normalize is to avoid introducing data skew. To better understand this section, we suggest you read the [Flexible subpartition mapping](#flexible-subpartition-mapping) section first.
+
+Taking Fig. 4 (b) as example, A1/A2 produces 4 subpartitions, and the decided parallelism of B is 3. In this case, B1 will consume 1 subpartition, B2 will consume 1 subpartition, and B3 will consume 2 subpartitions. We assume that subpartitions have the same amount of data, which means B3 will consume twice the data of other tasks, data skew is introduced due to the subpartition mapping.
+
+To solve this problem, we need to make the subpartitions evenly consumed by downstream tasks, which means the number of subpartitions should be a multiple of the number of downstream tasks. For simplicity, we require the user-specified max parallelism to be 2<sup>N</sup>, and then adjust the calculated parallelism to a closest 2<sup>M</sup> (M <= N), so that we can guarantee that subpartitions will be evenly consumed by downstream tasks.
+
+Note that this is a temporary solution, the ultimate solution would be the [Auto-rebalancing of workloads](#auto-rebalancing-of-workloads), which may come soon.
+
+## Build up execution graph dynamically
+Before Flink 1.15, the execution graph was fully built in a static way before starting scheduling. To allow parallelisms of job vertices to be decided lazily, the execution graph must be able to be built up dynamically.
+
+### Create execution vertices and execution edges lazily
+A dynamic execution graph means that a Flink job starts with an empty execution topology, and then gradually attaches vertices during job execution, as shown in Fig. 2.
+
+The execution topology consists of execution vertices and execution edges. The execution vertices will be created and attached to the execution topology only when:
+
+- The parallelism of the corresponding job vertex is decided.
+- All upstream execution vertices are already attached.
+
+A decided parallelism of the job vertex is needed so that Flink knows how many execution vertices should be created. Upstream execution vertices need to be attached first so that Flink can connect the newly created execution vertices to the upstream vertices with execution edges. 

Review Comment:
   I changed it to "The parallelism of the job vertex needs to be decided so that Flink knows how many execution vertices should be created"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-web] wanglijie95 commented on pull request #546: Add blogs for FLIP-187 adaptive batch scheduler

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on PR #546:
URL: https://github.com/apache/flink-web/pull/546#issuecomment-1156505906

   > Thanks @wanglijie95 for the update! I have only one more comment.
   
   Thanks @gaoyunhaii. I have udpated the pull request.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-web] wanglijie95 commented on pull request #546: Add blogs for FLIP-187 adaptive batch scheduler

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on PR #546:
URL: https://github.com/apache/flink-web/pull/546#issuecomment-1155070440

   Thanks for review @gaoyunhaii . I 've addressed all the comments by commit cd523977bc2d943a694feb68a13bdf79d45b51ca. Please task a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-web] wanglijie95 commented on a diff in pull request #546: Add blogs for FLIP-187 adaptive batch scheduler

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on code in PR #546:
URL: https://github.com/apache/flink-web/pull/546#discussion_r898007572


##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,198 @@
+---
+layout: post
+title: "Automatically decide parallelism for Flink batch jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+  name: "Lijie Wang"
+- Zhu Zhu:
+  name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not easy work for many users. For batch jobs, a small parallelism may result in long execution time and big failover regression. While an unnecessary large parallelism may result in resource waste and more overhead cost in task deployment and network shuffling. 
+
+To decide a proper parallelism, one needs to know how much data each operator needs to process. However, It can be hard to predict data volume to be processed by a job because it can be different everyday. And it can be harder or even impossible (due to complex operators or UDFs) to predict data volume to be processed by each operator.
+
+To solve this problem, we introduced the adaptive batch scheduler in Flink 1.15. The adaptive batch scheduler can automatically decide parallelism for an operator according to the size of its consumed datasets. Here are the benefits the adaptive batch scheduler can bring:
+
+1. Batch job users can be relieved from parallelism tuning.
+2. Parallelism tuning is fine grained considering different operators. This is particularly beneficial for SQL jobs which can only be set with a global parallelism previously.
+3. Parallelism tuning can better fit consumed datasets which have a varying volume size every day.
+
+# Get Started
+
+To automatically decide parallelism for operators, you need to:
+
+1. Configure to use adaptive batch scheduler.
+2. Set the parallelism of operators to -1.
+
+
+## Configure to use adaptive batch scheduler
+
+To use adaptive batch scheduler, you need to set configurations as below:
+
+- Set `jobmanager.scheduler: AdaptiveBatch`.
+- Leave the [execution.batch-shuffle-mode]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#execution-batch-shuffle-mode) unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value). Currently, the adaptive batch scheduler only supports batch jobs whose shuffle mode is `ALL-EXCHANGES-BLOCKING`.
+
+In addition, there are several related configuration options to control the upper bounds and lower bounds of tuned parallelisms, to specify expected data volume to process by each operator, and to specify the default parallelism of sources. More details can be found in the [feature documentation page]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/elastic_scaling/#configure-to-use-adaptive-batch-scheduler).
+
+## Set the parallelism of operators to -1
+
+The adaptive batch scheduler will only automatically decide parallelism for operators whose parallelism is not set (which means the parallelism is -1).To leave parallelism unset, you should configure as follows:
+
+- Set `parallelism.default: -1`

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-web] gaoyunhaii commented on a diff in pull request #546: Add blogs for FLIP-187 adaptive batch scheduler

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on code in PR #546:
URL: https://github.com/apache/flink-web/pull/546#discussion_r897575470


##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,198 @@
+---
+layout: post
+title: "Automatically decide parallelism for Flink batch jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+  name: "Lijie Wang"
+- Zhu Zhu:
+  name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not easy work for many users. For batch jobs, a small parallelism may result in long execution time and big failover regression. While an unnecessary large parallelism may result in resource waste and more overhead cost in task deployment and network shuffling. 
+
+To decide a proper parallelism, one needs to know how much data each operator needs to process. However, It can be hard to predict data volume to be processed by a job because it can be different everyday. And it can be harder or even impossible (due to complex operators or UDFs) to predict data volume to be processed by each operator.
+
+To solve this problem, we introduced the adaptive batch scheduler in Flink 1.15. The adaptive batch scheduler can automatically decide parallelism for an operator according to the size of its consumed datasets. Here are the benefits the adaptive batch scheduler can bring:
+
+1. Batch job users can be relieved from parallelism tuning.
+2. Parallelism tuning is fine grained considering different operators. This is particularly beneficial for SQL jobs which can only be set with a global parallelism previously.
+3. Parallelism tuning can better fit consumed datasets which have a varying volume size every day.
+
+# Get Started
+
+To automatically decide parallelism for operators, you need to:
+
+1. Configure to use adaptive batch scheduler.
+2. Set the parallelism of operators to -1.
+
+
+## Configure to use adaptive batch scheduler
+
+To use adaptive batch scheduler, you need to set configurations as below:
+
+- Set `jobmanager.scheduler: AdaptiveBatch`.
+- Leave the [execution.batch-shuffle-mode]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#execution-batch-shuffle-mode) unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value). Currently, the adaptive batch scheduler only supports batch jobs whose shuffle mode is `ALL-EXCHANGES-BLOCKING`.
+
+In addition, there are several related configuration options to control the upper bounds and lower bounds of tuned parallelisms, to specify expected data volume to process by each operator, and to specify the default parallelism of sources. More details can be found in the [feature documentation page]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/elastic_scaling/#configure-to-use-adaptive-batch-scheduler).
+
+## Set the parallelism of operators to -1
+
+The adaptive batch scheduler will only automatically decide parallelism for operators whose parallelism is not set (which means the parallelism is -1).To leave parallelism unset, you should configure as follows:
+
+- Set `parallelism.default: -1`

Review Comment:
   I think we'd better explicitly state that? like "Set `parallelism.default: -1` for all jobs"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-web] zhuzhurk commented on a diff in pull request #546: Add blogs for FLIP-187 adaptive batch scheduler

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #546:
URL: https://github.com/apache/flink-web/pull/546#discussion_r898917321


##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -116,7 +121,7 @@ To solve this problem, we need to make the subpartitions evenly consumed by down
 Note that this is a temporary solution, the ultimate solution would be the [Auto-rebalancing of workloads](#auto-rebalancing-of-workloads), which may come soon.
 
 ## Build up execution graph dynamically
-Before Flink 1.15, the execution graph was fully built in a static way before starting scheduling. To allow parallelisms of job vertices to be decided lazily, the execution graph must be able to be built up dynamically.
+Before the introduction of adaptive batch scheduler, the execution graph was fully built in a static way before starting scheduling. To allow parallelisms of job vertices to be decided lazily, the execution graph must be able to be built up dynamically.

Review Comment:
   the introduction of adaptive batch scheduler -> adaptive batch scheduler was introduced to Flink
   
   `introduction` might be misleading
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-web] zhuzhurk commented on a diff in pull request #546: Add blogs for FLIP-187 adaptive batch scheduler

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #546:
URL: https://github.com/apache/flink-web/pull/546#discussion_r898915849


##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -54,11 +54,16 @@ The adaptive batch scheduler only automatically decides parallelism for operator
 
 # Implementation Details
 
-In this section, we will elaborate the details of the implementation. To automatically decide parallelism of operators, we introduced the following changes:
+In this section, we will elaborate the details of the implementation. Before that, we need to briefly introduce some concepts involved:
+
+- [JobVertex](https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java) and [JobGraph](https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java): A job vertex is an operator chain formed by chaining several operators together for better performance. The job graph is a data flow consisting of job vertices.
+- [ExecutionVertex](https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java) and [ExecutionGraph](https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java): An execution vertex represents a parallel subtask of a job vertex, which will eventually be instantiated as a physical task. For example, a job vertex with a parallelism of 100 will generate 100 execution vertices. The execution graph is the physical execution topology consisting of all execution vertices.
+
+More details about the above concepts can be found in the [Flink documentation](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/internals/job_scheduling/#jobmanager-data-structures). To be precise, the adaptive batch scheduler actually automatically decides the parallelism of job vertices (in the previous sections, in order not to introduce more concepts, **operator** was used to refer to **job vertex**, but they are actually slightly different). We introduced the following changes to automatically decide parallelism of job vertices:

Review Comment:
   > (in the previous sections, in order not to introduce more concepts, **operator** was used to refer to **job vertex**, but they are actually slightly different)
   
   I think the correct logic is "the adaptive batch scheduler automatically decides the parallelism of job vertices. In this way, it decides the parallelism of the operators within that vertex."



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org