You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by yi...@apache.org on 2022/01/18 11:58:44 UTC

[flink] branch master updated (eeec246 -> ed699b6)

This is an automated email from the ASF dual-hosted git repository.

yingjie pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from eeec246  [FLINK-20286][connector-files] Table source now supports monitor continuously
     new e1878fb  [FLINK-25639][network] Increase the default read buffer size of sort-shuffle to 64M
     new 4275525  [FLINK-25638][network] Increase the default write buffer size of sort-shuffle to 16M
     new ed699b6  [FLINK-25637][network] Make sort-shuffle the default shuffle implementation for batch jobs

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/content.zh/docs/ops/batch/blocking_shuffle.md |  6 ++--
 docs/content/docs/ops/batch/blocking_shuffle.md    |  6 ++--
 .../generated/all_taskmanager_network_section.html |  8 +++---
 .../generated/common_memory_section.html           |  4 +--
 .../netty_shuffle_environment_configuration.html   |  8 +++---
 .../task_manager_memory_configuration.html         |  4 +--
 .../NettyShuffleEnvironmentOptions.java            | 32 ++++++++++++----------
 .../flink/configuration/TaskManagerOptions.java    |  6 ++--
 .../test_high_parallelism_iterations.sh            |  1 +
 flink-end-to-end-tests/test-scripts/test_tpcds.sh  |  1 +
 .../minicluster/MiniClusterConfiguration.java      |  8 ++++++
 ...tractTaskManagerProcessFailureRecoveryTest.java |  2 ++
 .../JobManagerHAProcessFailureRecoveryITCase.java  |  2 ++
 .../flink/test/runtime/BlockingShuffleITCase.java  | 12 ++++++--
 .../test/runtime/ShuffleCompressionITCase.java     |  5 ++--
 15 files changed, 65 insertions(+), 40 deletions(-)

[flink] 01/03: [FLINK-25639][network] Increase the default read buffer size of sort-shuffle to 64M

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yingjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e1878fb899d4598c36fbb0740cefebf31de8c3d9
Author: kevin.cyj <ke...@alibaba-inc.com>
AuthorDate: Thu Jan 13 17:24:04 2022 +0800

    [FLINK-25639][network] Increase the default read buffer size of sort-shuffle to 64M
    
    This closes #18350.
---
 docs/layouts/shortcodes/generated/common_memory_section.html        | 4 ++--
 .../shortcodes/generated/task_manager_memory_configuration.html     | 4 ++--
 .../java/org/apache/flink/configuration/TaskManagerOptions.java     | 6 +++---
 3 files changed, 7 insertions(+), 7 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/common_memory_section.html b/docs/layouts/shortcodes/generated/common_memory_section.html
index 1d224e8..6e520e2 100644
--- a/docs/layouts/shortcodes/generated/common_memory_section.html
+++ b/docs/layouts/shortcodes/generated/common_memory_section.html
@@ -76,9 +76,9 @@
         </tr>
         <tr>
             <td><h5>taskmanager.memory.framework.off-heap.batch-shuffle.size</h5></td>
-            <td style="word-wrap: break-word;">32 mb</td>
+            <td style="word-wrap: break-word;">64 mb</td>
             <td>MemorySize</td>
-            <td>Size of memory used by blocking shuffle for shuffle data read (currently only used by sort-merge shuffle). Notes: 1) The memory is cut from 'taskmanager.memory.framework.off-heap.size' so must be smaller than that, which means you may also need to increase 'taskmanager.memory.framework.off-heap.size' after you increase this config value; 2) This memory size can influence the shuffle performance and you can increase this config value for large-scale batch jobs (for example [...]
+            <td>Size of memory used by blocking shuffle for shuffle data read (currently only used by sort-shuffle). Notes: 1) The memory is cut from 'taskmanager.memory.framework.off-heap.size' so must be smaller than that, which means you may also need to increase 'taskmanager.memory.framework.off-heap.size' after you increase this config value; 2) This memory size can influence the shuffle performance and you can increase this config value for large-scale batch jobs (for example, to 1 [...]
         </tr>
         <tr>
             <td><h5>taskmanager.memory.framework.off-heap.size</h5></td>
diff --git a/docs/layouts/shortcodes/generated/task_manager_memory_configuration.html b/docs/layouts/shortcodes/generated/task_manager_memory_configuration.html
index bd9248e..d572e80 100644
--- a/docs/layouts/shortcodes/generated/task_manager_memory_configuration.html
+++ b/docs/layouts/shortcodes/generated/task_manager_memory_configuration.html
@@ -22,9 +22,9 @@
         </tr>
         <tr>
             <td><h5>taskmanager.memory.framework.off-heap.batch-shuffle.size</h5></td>
-            <td style="word-wrap: break-word;">32 mb</td>
+            <td style="word-wrap: break-word;">64 mb</td>
             <td>MemorySize</td>
-            <td>Size of memory used by blocking shuffle for shuffle data read (currently only used by sort-merge shuffle). Notes: 1) The memory is cut from 'taskmanager.memory.framework.off-heap.size' so must be smaller than that, which means you may also need to increase 'taskmanager.memory.framework.off-heap.size' after you increase this config value; 2) This memory size can influence the shuffle performance and you can increase this config value for large-scale batch jobs (for example [...]
+            <td>Size of memory used by blocking shuffle for shuffle data read (currently only used by sort-shuffle). Notes: 1) The memory is cut from 'taskmanager.memory.framework.off-heap.size' so must be smaller than that, which means you may also need to increase 'taskmanager.memory.framework.off-heap.size' after you increase this config value; 2) This memory size can influence the shuffle performance and you can increase this config value for large-scale batch jobs (for example, to 1 [...]
         </tr>
         <tr>
             <td><h5>taskmanager.memory.framework.off-heap.size</h5></td>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index 7fecad5..2ad7333 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -581,17 +581,17 @@ public class TaskManagerOptions {
 
     /**
      * Size of direct memory used by blocking shuffle for shuffle data read (currently only used by
-     * sort-merge shuffle).
+     * sort-shuffle).
      */
     @Documentation.Section(Documentation.Sections.COMMON_MEMORY)
     public static final ConfigOption<MemorySize> NETWORK_BATCH_SHUFFLE_READ_MEMORY =
             key("taskmanager.memory.framework.off-heap.batch-shuffle.size")
                     .memoryType()
-                    .defaultValue(MemorySize.parse("32m"))
+                    .defaultValue(MemorySize.parse("64m"))
                     .withDescription(
                             String.format(
                                     "Size of memory used by blocking shuffle for shuffle data read "
-                                            + "(currently only used by sort-merge shuffle). Notes: "
+                                            + "(currently only used by sort-shuffle). Notes: "
                                             + "1) The memory is cut from '%s' so must be smaller than"
                                             + " that, which means you may also need to increase '%s' "
                                             + "after you increase this config value; 2) This memory"

[flink] 02/03: [FLINK-25638][network] Increase the default write buffer size of sort-shuffle to 16M

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yingjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4275525fedd238a8b57edf46d22dc36ce19df846
Author: kevin.cyj <ke...@alibaba-inc.com>
AuthorDate: Thu Jan 13 18:00:51 2022 +0800

    [FLINK-25638][network] Increase the default write buffer size of sort-shuffle to 16M
    
    This closes #18350.
---
 .../shortcodes/generated/all_taskmanager_network_section.html  |  4 ++--
 .../generated/netty_shuffle_environment_configuration.html     |  4 ++--
 .../flink/configuration/NettyShuffleEnvironmentOptions.java    | 10 ++++++----
 .../org/apache/flink/test/runtime/BlockingShuffleITCase.java   |  4 ++++
 4 files changed, 14 insertions(+), 8 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html b/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html
index 9952de2..33a0ce3 100644
--- a/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html
+++ b/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html
@@ -136,9 +136,9 @@
         </tr>
         <tr>
             <td><h5>taskmanager.network.sort-shuffle.min-buffers</h5></td>
-            <td style="word-wrap: break-word;">64</td>
+            <td style="word-wrap: break-word;">512</td>
             <td>Integer</td>
-            <td>Minimum number of network buffers required per sort-merge blocking result partition. For production usage, it is suggested to increase this config value to at least 2048 (64M memory if the default 32K memory segment size is used) to improve the data compression ratio and reduce the small network packets. Usually, several hundreds of megabytes memory is enough for large scale batch jobs. Note: you may also need to increase the size of total network memory to avoid the 'ins [...]
+            <td>Minimum number of network buffers required per blocking result partition for sort-shuffle. For production usage, it is suggested to increase this config value to at least 2048 (64M memory if the default 32K memory segment size is used) to improve the data compression ratio and reduce the small network packets. Usually, several hundreds of megabytes memory is enough for large scale batch jobs. Note: you may also need to increase the size of total network memory to avoid th [...]
         </tr>
         <tr>
             <td><h5>taskmanager.network.sort-shuffle.min-parallelism</h5></td>
diff --git a/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html b/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html
index 10023d8..602b015 100644
--- a/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html
+++ b/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html
@@ -124,9 +124,9 @@
         </tr>
         <tr>
             <td><h5>taskmanager.network.sort-shuffle.min-buffers</h5></td>
-            <td style="word-wrap: break-word;">64</td>
+            <td style="word-wrap: break-word;">512</td>
             <td>Integer</td>
-            <td>Minimum number of network buffers required per sort-merge blocking result partition. For production usage, it is suggested to increase this config value to at least 2048 (64M memory if the default 32K memory segment size is used) to improve the data compression ratio and reduce the small network packets. Usually, several hundreds of megabytes memory is enough for large scale batch jobs. Note: you may also need to increase the size of total network memory to avoid the 'ins [...]
+            <td>Minimum number of network buffers required per blocking result partition for sort-shuffle. For production usage, it is suggested to increase this config value to at least 2048 (64M memory if the default 32K memory segment size is used) to improve the data compression ratio and reduce the small network packets. Usually, several hundreds of megabytes memory is enough for large scale batch jobs. Note: you may also need to increase the size of total network memory to avoid th [...]
         </tr>
         <tr>
             <td><h5>taskmanager.network.sort-shuffle.min-parallelism</h5></td>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
index 67c3280..526b4bd 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
@@ -197,15 +197,17 @@ public class NettyShuffleEnvironmentOptions {
                                     + " help relieve back-pressure caused by unbalanced data distribution among the subpartitions. This value should be"
                                     + " increased in case of higher round trip times between nodes and/or larger number of machines in the cluster.");
 
-    /** Minimum number of network buffers required per sort-merge blocking result partition. */
+    /**
+     * Minimum number of network buffers required per blocking result partition for sort-shuffle.
+     */
     @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
     public static final ConfigOption<Integer> NETWORK_SORT_SHUFFLE_MIN_BUFFERS =
             key("taskmanager.network.sort-shuffle.min-buffers")
                     .intType()
-                    .defaultValue(64)
+                    .defaultValue(512)
                     .withDescription(
-                            "Minimum number of network buffers required per sort-merge blocking "
-                                    + "result partition. For production usage, it is suggested to "
+                            "Minimum number of network buffers required per blocking result partition"
+                                    + " for sort-shuffle. For production usage, it is suggested to "
                                     + "increase this config value to at least 2048 (64M memory if "
                                     + "the default 32K memory segment size is used) to improve the "
                                     + "data compression ratio and reduce the small network packets."
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
index 4c50bc7..2991ba0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
@@ -65,6 +65,8 @@ public class BlockingShuffleITCase {
         Configuration configuration = new Configuration();
         configuration.setInteger(
                 NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, 1);
+        configuration.setInteger(
+                NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS, 64);
 
         JobGraph jobGraph = createJobGraph(1000000);
         JobGraphRunningUtil.execute(
@@ -76,6 +78,8 @@ public class BlockingShuffleITCase {
         Configuration configuration = new Configuration();
         configuration.setInteger(
                 NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, 1);
+        configuration.setInteger(
+                NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS, 64);
 
         JobGraph jobGraph = createJobGraph(0);
         JobGraphRunningUtil.execute(

[flink] 03/03: [FLINK-25637][network] Make sort-shuffle the default shuffle implementation for batch jobs

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yingjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ed699b6ee6b0539087632b68a444f79b95120d84
Author: kevin.cyj <ke...@alibaba-inc.com>
AuthorDate: Thu Jan 13 20:06:44 2022 +0800

    [FLINK-25637][network] Make sort-shuffle the default shuffle implementation for batch jobs
    
    This closes #18350.
---
 docs/content.zh/docs/ops/batch/blocking_shuffle.md |  6 +++---
 docs/content/docs/ops/batch/blocking_shuffle.md    |  6 +++---
 .../generated/all_taskmanager_network_section.html |  4 ++--
 .../netty_shuffle_environment_configuration.html   |  4 ++--
 .../NettyShuffleEnvironmentOptions.java            | 22 +++++++++++-----------
 .../test_high_parallelism_iterations.sh            |  1 +
 flink-end-to-end-tests/test-scripts/test_tpcds.sh  |  1 +
 .../minicluster/MiniClusterConfiguration.java      |  8 ++++++++
 ...tractTaskManagerProcessFailureRecoveryTest.java |  2 ++
 .../JobManagerHAProcessFailureRecoveryITCase.java  |  2 ++
 .../flink/test/runtime/BlockingShuffleITCase.java  | 12 ++++++++----
 .../test/runtime/ShuffleCompressionITCase.java     |  5 +++--
 12 files changed, 46 insertions(+), 27 deletions(-)

diff --git a/docs/content.zh/docs/ops/batch/blocking_shuffle.md b/docs/content.zh/docs/ops/batch/blocking_shuffle.md
index 74cc30c..da1384c 100644
--- a/docs/content.zh/docs/ops/batch/blocking_shuffle.md
+++ b/docs/content.zh/docs/ops/batch/blocking_shuffle.md
@@ -37,7 +37,7 @@ Flink [DataStream API]({{< ref "docs/dev/datastream/execution_mode" >}}) 和 [Ta
 
 ## Hash Shuffle
 
-`Hash Shuffle` 是 blocking shuffle 的默认实现,它为每个下游任务将每个上游任务的结果以单独文件的方式保存在 TaskManager 本地磁盘上。当下游任务运行时会向上游的 TaskManager 请求分片,TaskManager 读取文件之后通过网络传输(给下游任务)。
+对于 1.14 以及更低的版本,`Hash Shuffle` 是 blocking shuffle 的默认实现,它为每个下游任务将每个上游任务的结果以单独文件的方式保存在 TaskManager 本地磁盘上。当下游任务运行时会向上游的 TaskManager 请求分片,TaskManager 读取文件之后通过网络传输(给下游任务)。
 
 `Hash Shuffle` 为读写文件提供了不同的机制:
 
@@ -68,11 +68,11 @@ Flink [DataStream API]({{< ref "docs/dev/datastream/execution_mode" >}}) 和 [Ta
 
 ## Sort Shuffle
 
-`Sort Shuffle` 是 1.13 版中引入的另一种 blocking shuffle 实现。不同于 `Hash Shuffle`,sort shuffle 将每个分区结果写入到一个文件。当多个下游任务同时读取结果分片,数据文件只会被打开一次并共享给所有的读请求。因此,集群使用更少的资源。例如:节点和文件描述符以提升稳定性。此外,通过写更少的文件和尽可能线性的读取文件,尤其是在使用机械硬盘情况下 sort shuffle 可以获得比 hash shuffle 更好的性能。另外,`sort shuffle` 使用额外管理的内存作为读数据缓存并不依赖 `sendfile` 或 `mmap` 机制,因此也适用于 [SSL]({{< ref "docs/deployment/security/security-ssl" >}})。关于 sort shuffle 的更多细节请参考 [FLINK-19582](https://issues.apache.org/jira/browse/FLINK-19582) 和 [FLINK-19614](https://issues.a [...]
+`Sort Shuffle` 是 1.13 版中引入的另一种 blocking shuffle 实现,它在 1.15 版本成为默认。不同于 `Hash Shuffle`,sort shuffle 将每个分区结果写入到一个文件。当多个下游任务同时读取结果分片,数据文件只会被打开一次并共享给所有的读请求。因此,集群使用更少的资源。例如:节点和文件描述符以提升稳定性。此外,通过写更少的文件和尽可能线性的读取文件,尤其是在使用机械硬盘情况下 sort shuffle 可以获得比 hash shuffle 更好的性能。另外,`sort shuffle` 使用额外管理的内存作为读数据缓存并不依赖 `sendfile` 或 `mmap` 机制,因此也适用于 [SSL]({{< ref "docs/deployment/security/security-ssl" >}})。关于 sort shuffle 的更多细节请参考 [FLINK-19582](https://issues.apache.org/jira/browse/FLINK-19582) 和 [FLINK-19614](h [...]
 
 当使用sort blocking shuffle的时候有些配置需要适配:
 - [taskmanager.network.blocking-shuffle.compression.enabled]({{< ref "docs/deployment/config" >}}#taskmanager-network-blocking-shuffle-compression-enabled): 配置该选项以启用 shuffle data 压缩,大部分任务建议开启除非你的数据压缩比率比较低。对于 1.14 以及更低的版本默认为 false,1.15 版本起默认为 true。
-- [taskmanager.network.sort-shuffle.min-parallelism]({{< ref "docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism): 根据下游任务的并行度配置该选项以启用 sort shuffle。如果并行度低于设置的值,则使用 `hash shuffle`,否则 `sort shuffle`。
+- [taskmanager.network.sort-shuffle.min-parallelism]({{< ref "docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism): 根据下游任务的并行度配置该选项以启用 sort shuffle。如果并行度低于设置的值,则使用 `hash shuffle`,否则 `sort shuffle`。对于 1.15 以下的版本,它的默认值是 `Integer.MAX_VALUE`,所以默认情况下总是会使用 `hash shuffle`。从 1.15 开始,它的默认值是 1, 所以默认情况下总是会使用 `sort shuffle`。
 - [taskmanager.network.sort-shuffle.min-buffers]({{< ref "docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-buffers): 配置该选项以控制数据写缓存大小。对于大规模的任务而言,你可能需要调大这个值,正常几百兆内存就足够了。
 - [taskmanager.memory.framework.off-heap.batch-shuffle.size]({{< ref "docs/deployment/config" >}}#taskmanager-memory-framework-off-heap-batch-shuffle-size): 配置该选项以控制数据读取缓存大小。对于大规模的任务而言,你可能需要调大这个值,正常几百兆内存就足够了。
 
diff --git a/docs/content/docs/ops/batch/blocking_shuffle.md b/docs/content/docs/ops/batch/blocking_shuffle.md
index 7bf647b..c6654f4 100644
--- a/docs/content/docs/ops/batch/blocking_shuffle.md
+++ b/docs/content/docs/ops/batch/blocking_shuffle.md
@@ -37,7 +37,7 @@ They will be detailed in the following sections.
 
 ## Hash Shuffle
 
-The default blocking shuffle implementation, `Hash Shuffle`, has each upstream task persist its results in a separate file for each downstream task on the local disk of the TaskManager. When the downstream tasks run, they will request partitions from the upstream TaskManager's, which read the files and transmit data via the network.
+The default blocking shuffle implementation for 1.14 and lower, `Hash Shuffle`, has each upstream task persist its results in a separate file for each downstream task on the local disk of the TaskManager. When the downstream tasks run, they will request partitions from the upstream TaskManager's, which read the files and transmit data via the network.
 
 `Hash Shuffle` provides different mechanisms for writing and reading files:
 
@@ -68,11 +68,11 @@ To further improve the performance, for most jobs we also recommend [enabling co
 
 ## Sort Shuffle 
 
-`Sort Shuffle` is another blocking shuffle implementation introduced in version 1.13. Different from `Hash Shuffle`, sort shuffle writes only one file for each result partition. When the result partition is read by multiple downstream tasks concurrently, the data file is opened only once and shared by all readers. As a result, the cluster uses fewer resources like inode and file descriptors, which improves stability. Furthermore, by writing fewer files and making a best effort to read da [...]
+`Sort Shuffle` is another blocking shuffle implementation introduced in version 1.13 and it becomes the default blocking shuffle implementation in 1.15. Different from `Hash Shuffle`, sort shuffle writes only one file for each result partition. When the result partition is read by multiple downstream tasks concurrently, the data file is opened only once and shared by all readers. As a result, the cluster uses fewer resources like inode and file descriptors, which improves stability. Furt [...]
 
 There are several config options that might need adjustment when using sort blocking shuffle:
 - [taskmanager.network.blocking-shuffle.compression.enabled]({{< ref "docs/deployment/config" >}}#taskmanager-network-blocking-shuffle-compression-enabled): Config option for shuffle data compression. it is suggested to enable it for most jobs except that the compression ratio of your data is low. Defaults to false for 1.14 and lower, and true for 1.15 and higher.
-- [taskmanager.network.sort-shuffle.min-parallelism]({{< ref "docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism): Config option to enable sort shuffle depending on the parallelism of downstream tasks. If parallelism is lower than the configured value, `hash shuffle` will be used, otherwise `sort shuffle` will be used.
+- [taskmanager.network.sort-shuffle.min-parallelism]({{< ref "docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism): Config option to enable sort shuffle depending on the parallelism of downstream tasks. If parallelism is lower than the configured value, `hash shuffle` will be used, otherwise `sort shuffle` will be used. For versions lower than 1.15, its default value is `Integer.MAX_VALUE`, so hash shuffle will be always used by default. Since 1.15, its default v [...]
 - [taskmanager.network.sort-shuffle.min-buffers]({{< ref "docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-buffers): Config option to control data writing buffer size. For large scale jobs, you may need to increase this value, usually, several hundreds of megabytes memory is enough.
 - [taskmanager.memory.framework.off-heap.batch-shuffle.size]({{< ref "docs/deployment/config" >}}#taskmanager-memory-framework-off-heap-batch-shuffle-size): Config option to control data reading buffer size. For large scale jobs, you may need to increase this value, usually, several hundreds of megabytes memory is enough.
 
diff --git a/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html b/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html
index 33a0ce3..3501528 100644
--- a/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html
+++ b/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html
@@ -142,9 +142,9 @@
         </tr>
         <tr>
             <td><h5>taskmanager.network.sort-shuffle.min-parallelism</h5></td>
-            <td style="word-wrap: break-word;">2147483647</td>
+            <td style="word-wrap: break-word;">1</td>
             <td>Integer</td>
-            <td>Parallelism threshold to switch between sort-merge blocking shuffle and the default hash-based blocking shuffle, which means for batch jobs of small parallelism, the hash-based blocking shuffle will be used and for batch jobs of large parallelism, the sort-merge one will be used. Note: For production usage, if sort-merge blocking shuffle is enabled, you may also need to tune 'taskmanager.network.sort-shuffle.min-buffers' and 'taskmanager.memory.framework.off-heap.batch-sh [...]
+            <td>Parallelism threshold to switch between sort-based blocking shuffle and hash-based blocking shuffle, which means for batch jobs of smaller parallelism, hash-shuffle will be used and for batch jobs of larger or equal parallelism, sort-shuffle will be used. The value 1 means that sort-shuffle is the default option. Note: For production usage, you may also need to tune 'taskmanager.network.sort-shuffle.min-buffers' and 'taskmanager.memory.framework.off-heap.batch-shuffle.siz [...]
         </tr>
     </tbody>
 </table>
diff --git a/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html b/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html
index 602b015..e64e6b8 100644
--- a/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html
+++ b/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html
@@ -130,9 +130,9 @@
         </tr>
         <tr>
             <td><h5>taskmanager.network.sort-shuffle.min-parallelism</h5></td>
-            <td style="word-wrap: break-word;">2147483647</td>
+            <td style="word-wrap: break-word;">1</td>
             <td>Integer</td>
-            <td>Parallelism threshold to switch between sort-merge blocking shuffle and the default hash-based blocking shuffle, which means for batch jobs of small parallelism, the hash-based blocking shuffle will be used and for batch jobs of large parallelism, the sort-merge one will be used. Note: For production usage, if sort-merge blocking shuffle is enabled, you may also need to tune 'taskmanager.network.sort-shuffle.min-buffers' and 'taskmanager.memory.framework.off-heap.batch-sh [...]
+            <td>Parallelism threshold to switch between sort-based blocking shuffle and hash-based blocking shuffle, which means for batch jobs of smaller parallelism, hash-shuffle will be used and for batch jobs of larger or equal parallelism, sort-shuffle will be used. The value 1 means that sort-shuffle is the default option. Note: For production usage, you may also need to tune 'taskmanager.network.sort-shuffle.min-buffers' and 'taskmanager.memory.framework.off-heap.batch-shuffle.siz [...]
         </tr>
     </tbody>
 </table>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
index 526b4bd..cafca62 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
@@ -218,24 +218,24 @@ public class NettyShuffleEnvironmentOptions {
                                     + " config value.");
 
     /**
-     * Parallelism threshold to switch between sort-merge based blocking shuffle and the default
-     * hash-based blocking shuffle.
+     * Parallelism threshold to switch between sort-based blocking shuffle and hash-based blocking
+     * shuffle.
      */
     @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
     public static final ConfigOption<Integer> NETWORK_SORT_SHUFFLE_MIN_PARALLELISM =
             key("taskmanager.network.sort-shuffle.min-parallelism")
                     .intType()
-                    .defaultValue(Integer.MAX_VALUE)
+                    .defaultValue(1)
                     .withDescription(
                             String.format(
-                                    "Parallelism threshold to switch between sort-merge blocking "
-                                            + "shuffle and the default hash-based blocking shuffle,"
-                                            + " which means for batch jobs of small parallelism, "
-                                            + "the hash-based blocking shuffle will be used and for"
-                                            + " batch jobs of large parallelism, the sort-merge one"
-                                            + " will be used. Note: For production usage, if sort-"
-                                            + "merge blocking shuffle is enabled, you may also need"
-                                            + " to tune '%s' and '%s' for better performance.",
+                                    "Parallelism threshold to switch between sort-based blocking "
+                                            + "shuffle and hash-based blocking shuffle, which means"
+                                            + " for batch jobs of smaller parallelism, hash-shuffle"
+                                            + " will be used and for batch jobs of larger or equal "
+                                            + "parallelism, sort-shuffle will be used. The value 1 "
+                                            + "means that sort-shuffle is the default option. Note:"
+                                            + " For production usage, you may also need to tune "
+                                            + "'%s' and '%s' for better performance.",
                                     NETWORK_SORT_SHUFFLE_MIN_BUFFERS.key(),
                                     // raw string key is used here to avoid interdependence, a test
                                     // is implemented to guard that when the target key is modified,
diff --git a/flink-end-to-end-tests/test-scripts/test_high_parallelism_iterations.sh b/flink-end-to-end-tests/test-scripts/test_high_parallelism_iterations.sh
index cc1fb13..310f2bc 100755
--- a/flink-end-to-end-tests/test-scripts/test_high_parallelism_iterations.sh
+++ b/flink-end-to-end-tests/test-scripts/test_high_parallelism_iterations.sh
@@ -33,6 +33,7 @@ set_config_key "taskmanager.numberOfTaskSlots" "$SLOTS_PER_TM"
 set_config_key "taskmanager.memory.network.min" "160m"
 set_config_key "taskmanager.memory.network.max" "160m"
 set_config_key "taskmanager.memory.framework.off-heap.size" "300m"
+set_config_key "taskmanager.network.sort-shuffle.min-buffers" "64"
 
 print_mem_use
 start_cluster
diff --git a/flink-end-to-end-tests/test-scripts/test_tpcds.sh b/flink-end-to-end-tests/test-scripts/test_tpcds.sh
index 8eaf51d..4f88c1f 100755
--- a/flink-end-to-end-tests/test-scripts/test_tpcds.sh
+++ b/flink-end-to-end-tests/test-scripts/test_tpcds.sh
@@ -57,6 +57,7 @@ echo "[INFO]Preparing Flink cluster..."
 set_config_key "taskmanager.memory.process.size" "4096m"
 set_config_key "taskmanager.numberOfTaskSlots" "4"
 set_config_key "parallelism.default" "4"
+set_config_key "taskmanager.memory.network.fraction" "0.2"
 start_cluster
 
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
index 1deb221..980c0a8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ClusterOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
@@ -71,6 +72,13 @@ public class MiniClusterConfiguration {
 
         TaskExecutorResourceUtils.adjustForLocalExecution(modifiedConfig);
 
+        // reduce the default number of network buffers used by sort-shuffle to avoid the
+        // "Insufficient number of network buffers" error.
+        if (!modifiedConfig.contains(
+                NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS)) {
+            modifiedConfig.set(NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS, 16);
+        }
+
         // set default io pool size.
         if (!modifiedConfig.contains(ClusterOptions.CLUSTER_IO_EXECUTOR_POOL_SIZE)) {
             modifiedConfig.set(ClusterOptions.CLUSTER_IO_EXECUTOR_POOL_SIZE, DEFAULT_IO_POOL_SIZE);
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 4010b41..c9900ac 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.HeartbeatManagerOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.plugin.PluginManager;
@@ -105,6 +106,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
         config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("4m"));
         config.set(TaskManagerOptions.NETWORK_MEMORY_MIN, MemorySize.parse("3200k"));
         config.set(TaskManagerOptions.NETWORK_MEMORY_MAX, MemorySize.parse("3200k"));
+        config.set(NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS, 16);
         config.set(TaskManagerOptions.TASK_HEAP_MEMORY, MemorySize.parse("128m"));
         config.set(TaskManagerOptions.CPU_CORES, 1.0);
         config.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, "full");
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java
index aafd8c6..00c977f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java
@@ -31,6 +31,7 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.plugin.PluginManager;
 import org.apache.flink.core.plugin.PluginUtils;
@@ -267,6 +268,7 @@ public class JobManagerHAProcessFailureRecoveryITCase extends TestLogger {
         config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("4m"));
         config.set(TaskManagerOptions.NETWORK_MEMORY_MIN, MemorySize.parse("3200k"));
         config.set(TaskManagerOptions.NETWORK_MEMORY_MAX, MemorySize.parse("3200k"));
+        config.set(NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS, 16);
         config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
         config.set(TaskManagerOptions.TASK_HEAP_MEMORY, MemorySize.parse("128m"));
         config.set(TaskManagerOptions.CPU_CORES, 1.0);
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
index 2991ba0..ce788a8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
@@ -48,6 +48,10 @@ public class BlockingShuffleITCase {
     public void testBoundedBlockingShuffle() throws Exception {
         JobGraph jobGraph = createJobGraph(1000000);
         Configuration configuration = new Configuration();
+        configuration.setInteger(
+                NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
+                Integer.MAX_VALUE);
+
         JobGraphRunningUtil.execute(
                 jobGraph, configuration, numTaskManagers, numSlotsPerTaskManager);
     }
@@ -56,6 +60,10 @@ public class BlockingShuffleITCase {
     public void testBoundedBlockingShuffleWithoutData() throws Exception {
         JobGraph jobGraph = createJobGraph(0);
         Configuration configuration = new Configuration();
+        configuration.setInteger(
+                NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
+                Integer.MAX_VALUE);
+
         JobGraphRunningUtil.execute(
                 jobGraph, configuration, numTaskManagers, numSlotsPerTaskManager);
     }
@@ -64,8 +72,6 @@ public class BlockingShuffleITCase {
     public void testSortMergeBlockingShuffle() throws Exception {
         Configuration configuration = new Configuration();
         configuration.setInteger(
-                NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, 1);
-        configuration.setInteger(
                 NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS, 64);
 
         JobGraph jobGraph = createJobGraph(1000000);
@@ -77,8 +83,6 @@ public class BlockingShuffleITCase {
     public void testSortMergeBlockingShuffleWithoutData() throws Exception {
         Configuration configuration = new Configuration();
         configuration.setInteger(
-                NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, 1);
-        configuration.setInteger(
                 NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS, 64);
 
         JobGraph jobGraph = createJobGraph(0);
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java
index d458136..b32db9d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java
@@ -87,6 +87,9 @@ public class ShuffleCompressionITCase {
         configuration.setBoolean(
                 NettyShuffleEnvironmentOptions.BLOCKING_SHUFFLE_COMPRESSION_ENABLED, false);
         configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofMinutes(1));
+        configuration.setInteger(
+                NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
+                Integer.MAX_VALUE);
 
         JobGraph jobGraph = createJobGraph(ResultPartitionType.BLOCKING, ExecutionMode.BATCH);
         JobGraphRunningUtil.execute(jobGraph, configuration, NUM_TASKMANAGERS, NUM_SLOTS);
@@ -97,8 +100,6 @@ public class ShuffleCompressionITCase {
         Configuration configuration = new Configuration();
         configuration.setBoolean(
                 NettyShuffleEnvironmentOptions.BLOCKING_SHUFFLE_COMPRESSION_ENABLED, false);
-        configuration.setInteger(
-                NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, 1);
         configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofMinutes(1));
 
         JobGraph jobGraph = createJobGraph(ResultPartitionType.BLOCKING, ExecutionMode.BATCH);