You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2022/03/20 17:43:37 UTC

[flink] branch release-1.15 updated: [FLINK-25226][doc] Add documentation about the AdaptiveBatchScheduler

This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new 41099c4  [FLINK-25226][doc] Add documentation about the AdaptiveBatchScheduler
41099c4 is described below

commit 41099c475db6077ca5c7b75e6d68266fecdd44f6
Author: Lijie Wang <wa...@gmail.com>
AuthorDate: Mon Feb 14 16:59:25 2022 +0800

    [FLINK-25226][doc] Add documentation about the AdaptiveBatchScheduler
    
    This closes #18757.
---
 docs/content.zh/docs/deployment/elastic_scaling.md | 42 +++++++++++++++++++++
 docs/content/docs/deployment/elastic_scaling.md    | 43 ++++++++++++++++++++++
 2 files changed, 85 insertions(+)

diff --git a/docs/content.zh/docs/deployment/elastic_scaling.md b/docs/content.zh/docs/deployment/elastic_scaling.md
index c1effbd..f58b36c 100644
--- a/docs/content.zh/docs/deployment/elastic_scaling.md
+++ b/docs/content.zh/docs/deployment/elastic_scaling.md
@@ -149,5 +149,47 @@ Adaptive 调度器可以通过[所有在名字包含 `adaptive-scheduler` 的配
 - **空闲 Slot**: 如果 Slot 共享组的最大并行度不相等,提供给 Adaptive 调度器所使用的的 Slot 可能不会被使用。
 - 扩缩容事件会触发 Job 和 Task 重启,Task 重试的次数也会增加。
 
+## Adaptive Batch Scheduler
+
+Adaptive Batch Scheduler 是一种可以自动推导每个算子并行度的批作业调度器。如果算子未设置并行度,调度器将根据其消费的数据量的大小来推导其并行度。这可以带来诸多好处:
+- 批作业用户可以从并行度调优中解脱出来
+- 根据数据量自动推导并行度可以更好地适应每天变化的数据量
+- SQL作业中的算子也可以分配不同的并行度
+
+### 用法
+
+使用 Adaptive Batch Scheduler 自动推导算子的并行度,需要:
+- 启用 Adaptive Batch Scheduler
+- 配置算子的并行度为 `-1`
+
+#### 启用 Adaptive Batch Scheduler
+为了启用 Adaptive Batch Scheduler, 你需要:
+- 配置 `jobmanager.scheduler: AdpaptiveBatch`
+- 由于 ["只支持所有数据交换都为 BLOCKING 模式的作业"](#局限性-2), 需要将 [`execution.batch-shuffle-mode`]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) 配置为 `ALL-EXCHANGES-BLOCKING`(默认值) 。
+
+除此之外,使用 Adaptive Batch Scheduler 时,以下相关配置也可以调整:
+- [`jobmanager.adaptive-batch-scheduler.min-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-min-parallelism): 允许自动设置的并行度最小值。需要配置为 2 的幂,否则也会被自动调整为最接近且大于其的 2 的幂。
+- [`jobmanager.adaptive-batch-scheduler.max-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-max-parallelism): 允许自动设置的并行度最大值。需要配置为 2 的幂,否则也会被自动调整为最接近且小于其的 2 的幂。
+- [`jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-avg-data-volume-per-task): 期望每个任务平均处理的数据量大小。由于顶点的并行度会被调整为 2^N,因此实际每个任务平均处理的数据量大小将是该值的 0.75~1.5 倍。 另外需要注意的是,当出现数据倾斜,或者确定的并行度达到最大并行度(由于数据过多)时,一些任务实际处理的数据可能会远远超过这个值。
+- [`jobmanager.adaptive-batch-scheduler.default-source-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-default-source-parallelism): source 算子的默认并行度
+
+#### 配置算子的并行度为 `-1`
+Adaptive Batch Scheduler 只会为用户未指定并行度的算子(并行度为 `-1`)推导并行度。 所以如果你想自动推导算子的并行度,需要进行以下配置:
+- 配置 `parallelism.default: -1`
+- 对于 SQL 作业,需要配置 `table.exec.resource.default-parallelism: -1`
+- 对于 DataStream/DataSet 作业,不要在作业中通过算子的 `setParallelism()` 方法来指定并行度
+- 对于 DataStream/DataSet 作业,不要在作业中通过 `StreamExecutionEnvironment/ExecutionEnvironment` 的 `setParallelism()` 方法来指定并行度
+
+### 性能调优
+
+1. 建议使用 [Sort Shuffle](https://flink.apache.org/2021/10/26/sort-shuffle-part1.html) 并且设置 [`taskmanager.network.memory.buffers-per-channel`]({{< ref "docs/deployment/config" >}}#taskmanager-network-memory-buffers-per-channel) 为 `0`。 这会解耦并行度与需要的网络内存,对于大规模作业,这样可以降低遇到 "Insufficient number of network buffers" 错误的可能性。
+2. 建议将 [`jobmanager.adaptive-batch-scheduler.max-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-max-parallelism) 设置为最坏情况下预期需要的并行度。不建议配置太大的值,否则可能会影响性能。这个配置项会影响上游任务产出的 subpartition 的数量,过多的 subpartition 可能会影响 hash shuffle 的性能,或者由于小包影响网络传输的性能。
+
+### 局限性
+- **只支持批作业**: Adaptive Batch Scheduler 只支持批作业。当提交的是一个流作业时,会抛出异常。
+- **只支持所有数据交换都为 BLOCKING 模式的作业**: 目前 Adaptive Batch Scheduler 只支持 [shuffle mode]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) 为 ALL-EXCHANGES-BLOCKING 的作业。
+- **推导出的并行度是 2 的幂**: 为了使子分区可以均匀分配给下游任务,[`jobmanager.adaptive-batch-scheduler.max-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-max-parallelism) 应该被配置为 2^N, 推导出的并行度会是 2^M, 且满足 M <= N。
+- **不支持 FileInputFormat 类型的 source**: 不支持 FileInputFormat 类型的 source, 包括 `StreamExecutionEnvironment#readFile(...)` `StreamExecutionEnvironment#readTextFile(...)` 和 `StreamExecutionEnvironment#createInput(FileInputFormat, ...)`。 当使用 Adaptive Batch Scheduler 时,用户应该使用新版的 Source API ([FileSystem DataStream Connector]({{< ref "docs/connectors/datastream/filesystem.md" >}}) 或 [FileSystem SQL Connector]({{< ref "docs/connectors/table/filesystem.md" >}})) 来读取文件.
+- **Web UI 上展示的上游输出的数据量和下游收到的数据量可能不一致**: 在使用 Adaptive Batch Scheduler 时,对于 broadcast 边,上游算子发送的数据量和下游算子接收的数据量可能会不相等,这在 Web UI 的显示上可能会困扰用户。细节详见 [FLIP-187](https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler)。
 
 {{< top >}}
diff --git a/docs/content/docs/deployment/elastic_scaling.md b/docs/content/docs/deployment/elastic_scaling.md
index 47e220d..386f083 100644
--- a/docs/content/docs/deployment/elastic_scaling.md
+++ b/docs/content/docs/deployment/elastic_scaling.md
@@ -151,5 +151,48 @@ The behavior of Adaptive Scheduler is configured by [all configuration options c
 - **Unused slots**: If the max parallelism for slot sharing groups is not equal, slots offered to Adaptive Scheduler might be unused.
 - Scaling events trigger job and task restarts, which will increase the number of Task attempts.
 
+## Adaptive Batch Scheduler
+
+The Adaptive Batch Scheduler can automatically decide parallelisms of operators for batch jobs. If an operator is not set with a parallelism, the scheduler will decide parallelism for it according to the size of its consumed datasets. This can bring many benefits:
+- Batch job users can be relieved from parallelism tuning
+- Automatically tuned parallelisms can better fit consumed datasets which have a varying volume size every day
+- Operators from SQL batch jobs can be assigned with different parallelisms which are automatically tuned
+
+### Usage
+
+To automatically decide parallelisms for operators with Adaptive Batch Scheduler, you need to:
+- Configure to use Adaptive Batch Scheduler.
+- Set the parallelism of operators to `-1`.
+  
+#### Configure to use Adaptive Batch Scheduler
+To use Adaptive Batch Scheduler, you need to:
+- Set `jobmanager.scheduler: AdaptiveBatch`.
+- Leave the [`execution.batch-shuffle-mode`]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value) due to ["ALL-EXCHANGES-BLOCKING jobs only"](#limitations-2).
+
+In addition, there are several related configuration options that may need adjustment when using Adaptive Batch Scheduler:
+- [`jobmanager.adaptive-batch-scheduler.min-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-min-parallelism): The lower bound of allowed parallelism to set adaptively. Currently, this option should be configured as a power of 2, otherwise it will be rounded up to a power of 2 automatically.
+- [`jobmanager.adaptive-batch-scheduler.max-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-max-parallelism): The upper bound of allowed parallelism to set adaptively. Currently, this option should be configured as a power of 2, otherwise it will be rounded down to a power of 2 automatically.
+- [`jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-avg-data-volume-per-task): The average size of data volume to expect each task instance to process. Note that since the parallelism of the vertices is adjusted to a power of 2, the actual average size will be 0.75~1.5 times this value. It is also important to note that when data skew occurs, or the decided parallelism reaches the max parallelism (due [...]
+- [`jobmanager.adaptive-batch-scheduler.default-source-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-default-source-parallelism): The default parallelism of data source.
+
+#### Set the parallelism of operators to `-1`
+Adaptive Batch Scheduler will only decide parallelism for operators whose parallelism is not specified by users (parallelism is `-1`). So if you want the parallelism of operators to be decided automatically, you should configure as follows:
+- Set `parallelism.default: -1`
+- Set `table.exec.resource.default-parallelism: -1` in SQL jobs.
+- Don't call `setParallelism()` for operators in DataStream/DataSet jobs.
+- Don't call `setParallelism()` on `StreamExecutionEnvironment/ExecutionEnvironment` in DataStream/DataSet jobs.
+
+### Performance tuning
+
+1. It's recommended to use [Sort Shuffle](https://flink.apache.org/2021/10/26/sort-shuffle-part1.html) and set [`taskmanager.network.memory.buffers-per-channel`]({{< ref "docs/deployment/config" >}}#taskmanager-network-memory-buffers-per-channel) to `0`. This can decouple the required network memory from parallelism, so that for large scale jobs, the "Insufficient number of network buffers" errors are less likely to happen.
+2. It's recommended to set [`jobmanager.adaptive-batch-scheduler.max-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-max-parallelism) to the parallelism you expect to need in the worst case. Values larger than that are not recommended, because excessive value may affect the performance. This option can affect the number of subpartitions produced by upstream tasks, large number of subpartitions may degrade the performance of hash shuffle and the perf [...]
+                                                                                                                                                                                                                                                                                      
+### Limitations
+
+- **Batch jobs only**: Adaptive Batch Scheduler only supports batch jobs. Exception will be thrown if a streaming job is submitted.
+- **ALL-EXCHANGES-BLOCKING jobs only**: At the moment, Adaptive Batch Scheduler only supports jobs whose [shuffle mode]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) is `ALL-EXCHANGES-BLOCKING`.
+- **The decided parallelism will be a power of 2**: In order to ensure downstream tasks to consume the same count of subpartitions, the configuration option [`jobmanager.adaptive-batch-scheduler.max-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-max-parallelism) should be set to be a power of 2 (2^N), and the decided parallelism will also be a power of 2 (2^M and M <= N).
+- **FileInputFormat sources are not supported**: FileInputFormat sources are not supported, including `StreamExecutionEnvironment#readFile(...)` `StreamExecutionEnvironment#readTextFile(...)` and `StreamExecutionEnvironment#createInput(FileInputFormat, ...)`. Users should use the new sources([FileSystem DataStream Connector]({{< ref "docs/connectors/datastream/filesystem.md" >}}) or [FileSystem SQL Connector]({{< ref "docs/connectors/table/filesystem.md" >}})) to read files when using th [...]
+- **Inconsistent broadcast results metrics on WebUI**: In Adaptive Batch Scheduler, for broadcast results, the number of bytes/records sent by the upstream task counted by metric is not equal to the number of bytes/records received by the downstream task, which may confuse users when displayed on the Web UI. See [FLIP-187](https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler) for details.
 
 {{< top >}}