You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by wa...@apache.org on 2023/01/12 14:00:04 UTC

[flink] branch master updated: [FLINK-30604][runtime] Remove the limitation that parallelism decided by adaptive batch scheduler is always a power of two

This is an automated email from the ASF dual-hosted git repository.

wanglijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e57e5ee19af [FLINK-30604][runtime] Remove the limitation that parallelism decided by adaptive batch scheduler is always a power of two
e57e5ee19af is described below

commit e57e5ee19af6313cdd48cdec61e2328338ebce86
Author: Lijie Wang <wa...@gmail.com>
AuthorDate: Mon Jan 9 17:17:46 2023 +0800

    [FLINK-30604][runtime] Remove the limitation that parallelism decided by adaptive batch scheduler is always a power of two
    
    This closes #21626
---
 docs/content.zh/docs/deployment/elastic_scaling.md |  7 ++-
 docs/content/docs/deployment/elastic_scaling.md    |  7 ++-
 .../generated/all_jobmanager_section.html          |  6 +--
 .../generated/expert_scheduling_section.html       |  6 +--
 .../generated/job_manager_configuration.html       |  6 +--
 .../flink/configuration/JobManagerOptions.java     | 12 ++---
 .../AdaptiveBatchSchedulerFactory.java             |  8 ++--
 ...faultVertexParallelismAndInputInfosDecider.java | 55 ++++------------------
 ...tVertexParallelismAndInputInfosDeciderTest.java | 33 ++-----------
 9 files changed, 35 insertions(+), 105 deletions(-)

diff --git a/docs/content.zh/docs/deployment/elastic_scaling.md b/docs/content.zh/docs/deployment/elastic_scaling.md
index dc96e11e235..715b8c74ef0 100644
--- a/docs/content.zh/docs/deployment/elastic_scaling.md
+++ b/docs/content.zh/docs/deployment/elastic_scaling.md
@@ -167,9 +167,9 @@ Adaptive Batch Scheduler 是一种可以自动推导每个算子并行度的批
 - 由于 ["只支持所有数据交换都为 BLOCKING 模式的作业"](#局限性-2), 需要将 [`execution.batch-shuffle-mode`]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) 配置为 `ALL-EXCHANGES-BLOCKING`(默认值) 。
 
 除此之外,使用 Adaptive Batch Scheduler 时,以下相关配置也可以调整:
-- [`jobmanager.adaptive-batch-scheduler.min-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-min-parallelism): 允许自动设置的并行度最小值。需要配置为 2 的幂,否则也会被自动调整为最接近且大于其的 2 的幂。
-- [`jobmanager.adaptive-batch-scheduler.max-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-max-parallelism): 允许自动设置的并行度最大值。需要配置为 2 的幂,否则也会被自动调整为最接近且小于其的 2 的幂。
-- [`jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-avg-data-volume-per-task): 期望每个任务平均处理的数据量大小。由于顶点的并行度会被调整为 2^N,因此实际每个任务平均处理的数据量大小将是该值的 0.75~1.5 倍。 另外需要注意的是,当出现数据倾斜,或者确定的并行度达到最大并行度(由于数据过多)时,一些任务实际处理的数据可能会远远超过这个值。
+- [`jobmanager.adaptive-batch-scheduler.min-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-min-parallelism): 允许自动设置的并行度最小值。
+- [`jobmanager.adaptive-batch-scheduler.max-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-max-parallelism): 允许自动设置的并行度最大值。
+- [`jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-avg-data-volume-per-task): 期望每个任务平均处理的数据量大小。请注意,当出现数据倾斜,或者确定的并行度达到最大并行度(由于数据过多)时,一些任务实际处理的数据可能会远远超过这个值。
 - [`jobmanager.adaptive-batch-scheduler.default-source-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-default-source-parallelism): source 算子的默认并行度
 
 #### 配置算子的并行度为 `-1`
@@ -187,7 +187,6 @@ Adaptive Batch Scheduler 只会为用户未指定并行度的算子(并行度
 ### 局限性
 - **只支持批作业**: Adaptive Batch Scheduler 只支持批作业。当提交的是一个流作业时,会抛出异常。
 - **只支持所有数据交换都为 BLOCKING 模式的作业**: 目前 Adaptive Batch Scheduler 只支持 [shuffle mode]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) 为 ALL-EXCHANGES-BLOCKING 的作业。
-- **推导出的并行度是 2 的幂**: 为了使子分区可以均匀分配给下游任务,[`jobmanager.adaptive-batch-scheduler.max-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-max-parallelism) 应该被配置为 2^N, 推导出的并行度会是 2^M, 且满足 M <= N。
 - **不支持 FileInputFormat 类型的 source**: 不支持 FileInputFormat 类型的 source, 包括 `StreamExecutionEnvironment#readFile(...)` `StreamExecutionEnvironment#readTextFile(...)` 和 `StreamExecutionEnvironment#createInput(FileInputFormat, ...)`。 当使用 Adaptive Batch Scheduler 时,用户应该使用新版的 Source API ([FileSystem DataStream Connector]({{< ref "docs/connectors/datastream/filesystem.md" >}}) 或 [FileSystem SQL Connector]({{< ref "docs/connectors/table/filesystem.md" >}})) 来读取文件.
 - **Web UI 上展示的上游输出的数据量和下游收到的数据量可能不一致**: 在使用 Adaptive Batch Scheduler 时,对于 broadcast 边,上游算子发送的数据量和下游算子接收的数据量可能会不相等,这在 Web UI 的显示上可能会困扰用户。细节详见 [FLIP-187](https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler)。
 
diff --git a/docs/content/docs/deployment/elastic_scaling.md b/docs/content/docs/deployment/elastic_scaling.md
index 686df507ffd..8beacec8d5c 100644
--- a/docs/content/docs/deployment/elastic_scaling.md
+++ b/docs/content/docs/deployment/elastic_scaling.md
@@ -169,9 +169,9 @@ To use Adaptive Batch Scheduler, you need to:
 - Leave the [`execution.batch-shuffle-mode`]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value) due to ["ALL-EXCHANGES-BLOCKING jobs only"](#limitations-2).
 
 In addition, there are several related configuration options that may need adjustment when using Adaptive Batch Scheduler:
-- [`jobmanager.adaptive-batch-scheduler.min-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-min-parallelism): The lower bound of allowed parallelism to set adaptively. Currently, this option should be configured as a power of 2, otherwise it will be rounded up to a power of 2 automatically.
-- [`jobmanager.adaptive-batch-scheduler.max-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-max-parallelism): The upper bound of allowed parallelism to set adaptively. Currently, this option should be configured as a power of 2, otherwise it will be rounded down to a power of 2 automatically.
-- [`jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-avg-data-volume-per-task): The average size of data volume to expect each task instance to process. Note that since the parallelism of the vertices is adjusted to a power of 2, the actual average size will be 0.75~1.5 times this value. It is also important to note that when data skew occurs, or the decided parallelism reaches the max parallelism (due [...]
+- [`jobmanager.adaptive-batch-scheduler.min-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-min-parallelism): The lower bound of allowed parallelism to set adaptively.
+- [`jobmanager.adaptive-batch-scheduler.max-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-max-parallelism): The upper bound of allowed parallelism to set adaptively.
+- [`jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-avg-data-volume-per-task): The average size of data volume to expect each task instance to process. Note that when data skew occurs, or the decided parallelism reaches the max parallelism (due to too much data), the data actually processed by some tasks may far exceed this value.
 - [`jobmanager.adaptive-batch-scheduler.default-source-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-default-source-parallelism): The default parallelism of data source.
 
 #### Set the parallelism of operators to `-1`
@@ -190,7 +190,6 @@ Adaptive Batch Scheduler will only decide parallelism for operators whose parall
 
 - **Batch jobs only**: Adaptive Batch Scheduler only supports batch jobs. Exception will be thrown if a streaming job is submitted.
 - **ALL-EXCHANGES-BLOCKING jobs only**: At the moment, Adaptive Batch Scheduler only supports jobs whose [shuffle mode]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) is `ALL-EXCHANGES-BLOCKING`.
-- **The decided parallelism will be a power of 2**: In order to ensure downstream tasks to consume the same count of subpartitions, the configuration option [`jobmanager.adaptive-batch-scheduler.max-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-max-parallelism) should be set to be a power of 2 (2^N), and the decided parallelism will also be a power of 2 (2^M and M <= N).
 - **FileInputFormat sources are not supported**: FileInputFormat sources are not supported, including `StreamExecutionEnvironment#readFile(...)` `StreamExecutionEnvironment#readTextFile(...)` and `StreamExecutionEnvironment#createInput(FileInputFormat, ...)`. Users should use the new sources([FileSystem DataStream Connector]({{< ref "docs/connectors/datastream/filesystem.md" >}}) or [FileSystem SQL Connector]({{< ref "docs/connectors/table/filesystem.md" >}})) to read files when using th [...]
 - **Inconsistent broadcast results metrics on WebUI**: In Adaptive Batch Scheduler, for broadcast results, the number of bytes/records sent by the upstream task counted by metric is not equal to the number of bytes/records received by the downstream task, which may confuse users when displayed on the Web UI. See [FLIP-187](https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler) for details.
 
diff --git a/docs/layouts/shortcodes/generated/all_jobmanager_section.html b/docs/layouts/shortcodes/generated/all_jobmanager_section.html
index 1e97f483e13..d994b54af67 100644
--- a/docs/layouts/shortcodes/generated/all_jobmanager_section.html
+++ b/docs/layouts/shortcodes/generated/all_jobmanager_section.html
@@ -12,7 +12,7 @@
             <td><h5>jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task</h5></td>
             <td style="word-wrap: break-word;">1 gb</td>
             <td>MemorySize</td>
-            <td>The average size of data volume to expect each task instance to process if <code class="highlighter-rouge">jobmanager.scheduler</code> has been set to <code class="highlighter-rouge">AdaptiveBatch</code>. Note that since the parallelism of the vertices is adjusted to a power of 2, the actual average size will be 0.75~1.5 times this value. It is also important to note that when data skew occurs or the decided parallelism reaches the <code class="highlighter-rouge">jobmanag [...]
+            <td>The average size of data volume to expect each task instance to process if <code class="highlighter-rouge">jobmanager.scheduler</code> has been set to <code class="highlighter-rouge">AdaptiveBatch</code>. Note that when data skew occurs or the decided parallelism reaches the <code class="highlighter-rouge">jobmanager.adaptive-batch-scheduler.max-parallelism</code> (due to too much data), the data actually processed by some tasks may far exceed this value.</td>
         </tr>
         <tr>
             <td><h5>jobmanager.adaptive-batch-scheduler.default-source-parallelism</h5></td>
@@ -24,13 +24,13 @@
             <td><h5>jobmanager.adaptive-batch-scheduler.max-parallelism</h5></td>
             <td style="word-wrap: break-word;">128</td>
             <td>Integer</td>
-            <td>The upper bound of allowed parallelism to set adaptively if <code class="highlighter-rouge">jobmanager.scheduler</code> has been set to <code class="highlighter-rouge">AdaptiveBatch</code>. Currently, this option should be configured as a power of 2, otherwise it will also be rounded down to a power of 2 automatically.</td>
+            <td>The upper bound of allowed parallelism to set adaptively if <code class="highlighter-rouge">jobmanager.scheduler</code> has been set to <code class="highlighter-rouge">AdaptiveBatch</code></td>
         </tr>
         <tr>
             <td><h5>jobmanager.adaptive-batch-scheduler.min-parallelism</h5></td>
             <td style="word-wrap: break-word;">1</td>
             <td>Integer</td>
-            <td>The lower bound of allowed parallelism to set adaptively if <code class="highlighter-rouge">jobmanager.scheduler</code> has been set to <code class="highlighter-rouge">AdaptiveBatch</code>. Currently, this option should be configured as a power of 2, otherwise it will also be rounded up to a power of 2 automatically.</td>
+            <td>The lower bound of allowed parallelism to set adaptively if <code class="highlighter-rouge">jobmanager.scheduler</code> has been set to <code class="highlighter-rouge">AdaptiveBatch</code></td>
         </tr>
         <tr>
             <td><h5>jobmanager.adaptive-batch-scheduler.speculative.block-slow-node-duration</h5></td>
diff --git a/docs/layouts/shortcodes/generated/expert_scheduling_section.html b/docs/layouts/shortcodes/generated/expert_scheduling_section.html
index c8f4ef88c26..6f5cba9f520 100644
--- a/docs/layouts/shortcodes/generated/expert_scheduling_section.html
+++ b/docs/layouts/shortcodes/generated/expert_scheduling_section.html
@@ -30,7 +30,7 @@
             <td><h5>jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task</h5></td>
             <td style="word-wrap: break-word;">1 gb</td>
             <td>MemorySize</td>
-            <td>The average size of data volume to expect each task instance to process if <code class="highlighter-rouge">jobmanager.scheduler</code> has been set to <code class="highlighter-rouge">AdaptiveBatch</code>. Note that since the parallelism of the vertices is adjusted to a power of 2, the actual average size will be 0.75~1.5 times this value. It is also important to note that when data skew occurs or the decided parallelism reaches the <code class="highlighter-rouge">jobmanag [...]
+            <td>The average size of data volume to expect each task instance to process if <code class="highlighter-rouge">jobmanager.scheduler</code> has been set to <code class="highlighter-rouge">AdaptiveBatch</code>. Note that when data skew occurs or the decided parallelism reaches the <code class="highlighter-rouge">jobmanager.adaptive-batch-scheduler.max-parallelism</code> (due to too much data), the data actually processed by some tasks may far exceed this value.</td>
         </tr>
         <tr>
             <td><h5>jobmanager.adaptive-batch-scheduler.default-source-parallelism</h5></td>
@@ -42,13 +42,13 @@
             <td><h5>jobmanager.adaptive-batch-scheduler.max-parallelism</h5></td>
             <td style="word-wrap: break-word;">128</td>
             <td>Integer</td>
-            <td>The upper bound of allowed parallelism to set adaptively if <code class="highlighter-rouge">jobmanager.scheduler</code> has been set to <code class="highlighter-rouge">AdaptiveBatch</code>. Currently, this option should be configured as a power of 2, otherwise it will also be rounded down to a power of 2 automatically.</td>
+            <td>The upper bound of allowed parallelism to set adaptively if <code class="highlighter-rouge">jobmanager.scheduler</code> has been set to <code class="highlighter-rouge">AdaptiveBatch</code></td>
         </tr>
         <tr>
             <td><h5>jobmanager.adaptive-batch-scheduler.min-parallelism</h5></td>
             <td style="word-wrap: break-word;">1</td>
             <td>Integer</td>
-            <td>The lower bound of allowed parallelism to set adaptively if <code class="highlighter-rouge">jobmanager.scheduler</code> has been set to <code class="highlighter-rouge">AdaptiveBatch</code>. Currently, this option should be configured as a power of 2, otherwise it will also be rounded up to a power of 2 automatically.</td>
+            <td>The lower bound of allowed parallelism to set adaptively if <code class="highlighter-rouge">jobmanager.scheduler</code> has been set to <code class="highlighter-rouge">AdaptiveBatch</code></td>
         </tr>
         <tr>
             <td><h5>jobmanager.adaptive-batch-scheduler.speculative.block-slow-node-duration</h5></td>
diff --git a/docs/layouts/shortcodes/generated/job_manager_configuration.html b/docs/layouts/shortcodes/generated/job_manager_configuration.html
index ff0beb07963..12c8dab4316 100644
--- a/docs/layouts/shortcodes/generated/job_manager_configuration.html
+++ b/docs/layouts/shortcodes/generated/job_manager_configuration.html
@@ -12,7 +12,7 @@
             <td><h5>jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task</h5></td>
             <td style="word-wrap: break-word;">1 gb</td>
             <td>MemorySize</td>
-            <td>The average size of data volume to expect each task instance to process if <code class="highlighter-rouge">jobmanager.scheduler</code> has been set to <code class="highlighter-rouge">AdaptiveBatch</code>. Note that since the parallelism of the vertices is adjusted to a power of 2, the actual average size will be 0.75~1.5 times this value. It is also important to note that when data skew occurs or the decided parallelism reaches the <code class="highlighter-rouge">jobmanag [...]
+            <td>The average size of data volume to expect each task instance to process if <code class="highlighter-rouge">jobmanager.scheduler</code> has been set to <code class="highlighter-rouge">AdaptiveBatch</code>. Note that when data skew occurs or the decided parallelism reaches the <code class="highlighter-rouge">jobmanager.adaptive-batch-scheduler.max-parallelism</code> (due to too much data), the data actually processed by some tasks may far exceed this value.</td>
         </tr>
         <tr>
             <td><h5>jobmanager.adaptive-batch-scheduler.default-source-parallelism</h5></td>
@@ -24,13 +24,13 @@
             <td><h5>jobmanager.adaptive-batch-scheduler.max-parallelism</h5></td>
             <td style="word-wrap: break-word;">128</td>
             <td>Integer</td>
-            <td>The upper bound of allowed parallelism to set adaptively if <code class="highlighter-rouge">jobmanager.scheduler</code> has been set to <code class="highlighter-rouge">AdaptiveBatch</code>. Currently, this option should be configured as a power of 2, otherwise it will also be rounded down to a power of 2 automatically.</td>
+            <td>The upper bound of allowed parallelism to set adaptively if <code class="highlighter-rouge">jobmanager.scheduler</code> has been set to <code class="highlighter-rouge">AdaptiveBatch</code></td>
         </tr>
         <tr>
             <td><h5>jobmanager.adaptive-batch-scheduler.min-parallelism</h5></td>
             <td style="word-wrap: break-word;">1</td>
             <td>Integer</td>
-            <td>The lower bound of allowed parallelism to set adaptively if <code class="highlighter-rouge">jobmanager.scheduler</code> has been set to <code class="highlighter-rouge">AdaptiveBatch</code>. Currently, this option should be configured as a power of 2, otherwise it will also be rounded up to a power of 2 automatically.</td>
+            <td>The lower bound of allowed parallelism to set adaptively if <code class="highlighter-rouge">jobmanager.scheduler</code> has been set to <code class="highlighter-rouge">AdaptiveBatch</code></td>
         </tr>
         <tr>
             <td><h5>jobmanager.adaptive-batch-scheduler.speculative.block-slow-node-duration</h5></td>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index 91afa73fb89..55c3898b057 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -546,9 +546,7 @@ public class JobManagerOptions {
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "The lower bound of allowed parallelism to set adaptively if %s has been set to %s. "
-                                                    + "Currently, this option should be configured as a power of 2, "
-                                                    + "otherwise it will also be rounded up to a power of 2 automatically.",
+                                            "The lower bound of allowed parallelism to set adaptively if %s has been set to %s",
                                             code(SCHEDULER.key()),
                                             code(SchedulerType.AdaptiveBatch.name()))
                                     .build());
@@ -564,9 +562,7 @@ public class JobManagerOptions {
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "The upper bound of allowed parallelism to set adaptively if %s has been set to %s. "
-                                                    + "Currently, this option should be configured as a power of 2, "
-                                                    + "otherwise it will also be rounded down to a power of 2 automatically.",
+                                            "The upper bound of allowed parallelism to set adaptively if %s has been set to %s",
                                             code(SCHEDULER.key()),
                                             code(SchedulerType.AdaptiveBatch.name()))
                                     .build());
@@ -583,9 +579,7 @@ public class JobManagerOptions {
                             Description.builder()
                                     .text(
                                             "The average size of data volume to expect each task instance to process if %s has been set to %s. "
-                                                    + "Note that since the parallelism of the vertices is adjusted to a power of 2, "
-                                                    + "the actual average size will be 0.75~1.5 times this value. "
-                                                    + "It is also important to note that when data skew occurs or the decided parallelism reaches the %s (due to too much data), "
+                                                    + "Note that when data skew occurs or the decided parallelism reaches the %s (due to too much data), "
                                                     + "the data actually processed by some tasks may far exceed this value.",
                                             code(SCHEDULER.key()),
                                             code(SchedulerType.AdaptiveBatch.name()),
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
index 8bff47a9e2d..093efec1956 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java
@@ -194,8 +194,8 @@ public class AdaptiveBatchSchedulerFactory implements SchedulerNGFactory {
                     shuffleMaster,
                     rpcTimeout,
                     DefaultVertexParallelismAndInputInfosDecider.from(jobMasterConfiguration),
-                    DefaultVertexParallelismAndInputInfosDecider.getNormalizedMaxParallelism(
-                            jobMasterConfiguration),
+                    jobMasterConfiguration.getInteger(
+                            JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM),
                     blocklistOperations,
                     hybridPartitionDataConsumeConstraint);
         } else {
@@ -224,8 +224,8 @@ public class AdaptiveBatchSchedulerFactory implements SchedulerNGFactory {
                     shuffleMaster,
                     rpcTimeout,
                     DefaultVertexParallelismAndInputInfosDecider.from(jobMasterConfiguration),
-                    DefaultVertexParallelismAndInputInfosDecider.getNormalizedMaxParallelism(
-                            jobMasterConfiguration),
+                    jobMasterConfiguration.getInteger(
+                            JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM),
                     hybridPartitionDataConsumeConstraint);
         }
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java
index 047a7890020..589b2b78038 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.scheduler.adaptivebatch;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
@@ -30,7 +29,6 @@ import org.apache.flink.runtime.executiongraph.ParallelismAndInputInfos;
 import org.apache.flink.runtime.executiongraph.VertexInputInfoComputationUtils;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.util.MathUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -163,22 +161,20 @@ public class DefaultVertexParallelismAndInputInfosDecider
         long broadcastBytes = getReasonableBroadcastBytes(jobVertexId, consumedResults);
         long nonBroadcastBytes = getNonBroadcastBytes(consumedResults);
 
-        int initiallyDecidedParallelism =
+        int parallelism =
                 (int) Math.ceil((double) nonBroadcastBytes / (dataVolumePerTask - broadcastBytes));
-        int parallelism = normalizeParallelism(initiallyDecidedParallelism);
 
         LOG.debug(
                 "The size of broadcast data is {}, the size of non-broadcast data is {}, "
-                        + "the initially decided parallelism of job vertex {} is {}, after normalization is {}",
+                        + "the initially decided parallelism of job vertex {} is {}.",
                 new MemorySize(broadcastBytes),
                 new MemorySize(nonBroadcastBytes),
                 jobVertexId,
-                initiallyDecidedParallelism,
                 parallelism);
 
         if (parallelism < minParallelism) {
             LOG.info(
-                    "The initially normalized parallelism {} is smaller than the normalized minimum parallelism {}. "
+                    "The initially decided parallelism {} is smaller than the minimum parallelism {}. "
                             + "Use {} as the finally decided parallelism of job vertex {}.",
                     parallelism,
                     minParallelism,
@@ -187,7 +183,7 @@ public class DefaultVertexParallelismAndInputInfosDecider
             parallelism = minParallelism;
         } else if (parallelism > maxParallelism) {
             LOG.info(
-                    "The initially normalized parallelism {} is larger than the normalized maximum parallelism {}. "
+                    "The initially decided parallelism {} is larger than the maximum parallelism {}. "
                             + "Use {} as the finally decided parallelism of job vertex {}.",
                     parallelism,
                     maxParallelism,
@@ -450,50 +446,15 @@ public class DefaultVertexParallelismAndInputInfosDecider
                 .collect(Collectors.toList());
     }
 
-    @VisibleForTesting
-    int getMaxParallelism() {
-        return maxParallelism;
-    }
-
-    @VisibleForTesting
-    int getMinParallelism() {
-        return minParallelism;
-    }
-
     static DefaultVertexParallelismAndInputInfosDecider from(Configuration configuration) {
-        int maxParallelism = getNormalizedMaxParallelism(configuration);
-        int minParallelism = getNormalizedMinParallelism(configuration);
-        checkState(
-                maxParallelism >= minParallelism,
-                String.format(
-                        "Invalid configuration: '%s' should be greater than or equal to '%s' and the range must contain at least one power of 2.",
-                        JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM.key(),
-                        JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MIN_PARALLELISM.key()));
-
         return new DefaultVertexParallelismAndInputInfosDecider(
-                maxParallelism,
-                minParallelism,
+                configuration.getInteger(
+                        JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM),
+                configuration.getInteger(
+                        JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MIN_PARALLELISM),
                 configuration.get(
                         JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_AVG_DATA_VOLUME_PER_TASK),
                 configuration.get(
                         JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DEFAULT_SOURCE_PARALLELISM));
     }
-
-    static int getNormalizedMaxParallelism(Configuration configuration) {
-        return MathUtils.roundDownToPowerOf2(
-                configuration.getInteger(
-                        JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM));
-    }
-
-    static int getNormalizedMinParallelism(Configuration configuration) {
-        return MathUtils.roundUpToPowerOfTwo(
-                configuration.getInteger(
-                        JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MIN_PARALLELISM));
-    }
-
-    static int normalizeParallelism(int parallelism) {
-        int down = MathUtils.roundDownToPowerOf2(parallelism);
-        int up = MathUtils.roundUpToPowerOfTwo(parallelism);
-        return parallelism < (up + down) / 2 ? down : up;
-    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDeciderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDeciderTest.java
index 0b9cea85883..09c460ddec4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDeciderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDeciderTest.java
@@ -58,15 +58,7 @@ class DefaultVertexParallelismAndInputInfosDeciderTest {
     private static final long DATA_VOLUME_PER_TASK = 1024 * 1024 * 1024L;
 
     @Test
-    void testNormalizedMaxAndMinParallelism() {
-        final DefaultVertexParallelismAndInputInfosDecider decider =
-                createDefaultVertexParallelismAndInputInfosDecider();
-        assertThat(decider.getMaxParallelism()).isEqualTo(64);
-        assertThat(decider.getMinParallelism()).isEqualTo(4);
-    }
-
-    @Test
-    void testNormalizeParallelismDownToPowerOf2() {
+    void testDecideParallelism() {
         final DefaultVertexParallelismAndInputInfosDecider decider =
                 createDefaultVertexParallelismAndInputInfosDecider();
 
@@ -77,22 +69,7 @@ class DefaultVertexParallelismAndInputInfosDeciderTest {
                 decider.decideParallelism(
                         new JobVertexID(), Arrays.asList(resultInfo1, resultInfo2));
 
-        assertThat(parallelism).isEqualTo(8);
-    }
-
-    @Test
-    void testNormalizeParallelismUpToPowerOf2() {
-        final DefaultVertexParallelismAndInputInfosDecider decider =
-                createDefaultVertexParallelismAndInputInfosDecider();
-
-        BlockingResultInfo resultInfo1 = createFromBroadcastResult(BYTE_256_MB);
-        BlockingResultInfo resultInfo2 = createFromNonBroadcastResult(BYTE_1_GB + BYTE_8_GB);
-
-        int parallelism =
-                decider.decideParallelism(
-                        new JobVertexID(), Arrays.asList(resultInfo1, resultInfo2));
-
-        assertThat(parallelism).isEqualTo(16);
+        assertThat(parallelism).isEqualTo(11);
     }
 
     @Test
@@ -107,7 +84,7 @@ class DefaultVertexParallelismAndInputInfosDeciderTest {
                 decider.decideParallelism(
                         new JobVertexID(), Arrays.asList(resultInfo1, resultInfo2));
 
-        assertThat(parallelism).isEqualTo(64);
+        assertThat(parallelism).isEqualTo(MAX_PARALLELISM);
     }
 
     @Test
@@ -122,7 +99,7 @@ class DefaultVertexParallelismAndInputInfosDeciderTest {
                 decider.decideParallelism(
                         new JobVertexID(), Arrays.asList(resultInfo1, resultInfo2));
 
-        assertThat(parallelism).isEqualTo(4);
+        assertThat(parallelism).isEqualTo(MIN_PARALLELISM);
     }
 
     @Test
@@ -152,7 +129,7 @@ class DefaultVertexParallelismAndInputInfosDeciderTest {
                 decider.decideParallelism(
                         new JobVertexID(), Arrays.asList(resultInfo1, resultInfo2));
 
-        assertThat(parallelism).isEqualTo(16);
+        assertThat(parallelism).isEqualTo(17);
     }
 
     @Test