You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by yi...@apache.org on 2022/01/18 07:06:04 UTC

[flink] branch master updated: [FLINK-24899][network] Enable data compression for blocking shuffle by default

This is an automated email from the ASF dual-hosted git repository.

yingjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a14c554  [FLINK-24899][network] Enable data compression for blocking shuffle by default
a14c554 is described below

commit a14c55482f60b73d3894a86a81c2ac6725acbb00
Author: SteNicholas <pr...@163.com>
AuthorDate: Wed Nov 17 10:47:58 2021 +0800

    [FLINK-24899][network] Enable data compression for blocking shuffle by default
    
    This closes #17814.
---
 docs/content.zh/docs/ops/batch/blocking_shuffle.md                | 2 +-
 docs/content/docs/ops/batch/blocking_shuffle.md                   | 2 +-
 .../shortcodes/generated/all_taskmanager_network_section.html     | 4 ++--
 .../generated/netty_shuffle_environment_configuration.html        | 4 ++--
 .../flink/configuration/NettyShuffleEnvironmentOptions.java       | 7 +++----
 .../taskexecutor/NettyShuffleEnvironmentConfigurationTest.java    | 7 ++++---
 .../org/apache/flink/test/runtime/ShuffleCompressionITCase.java   | 8 ++++----
 7 files changed, 17 insertions(+), 17 deletions(-)

diff --git a/docs/content.zh/docs/ops/batch/blocking_shuffle.md b/docs/content.zh/docs/ops/batch/blocking_shuffle.md
index 4ff1dc8..74cc30c 100644
--- a/docs/content.zh/docs/ops/batch/blocking_shuffle.md
+++ b/docs/content.zh/docs/ops/batch/blocking_shuffle.md
@@ -71,7 +71,7 @@ Flink [DataStream API]({{< ref "docs/dev/datastream/execution_mode" >}}) 和 [Ta
 `Sort Shuffle` 是 1.13 版中引入的另一种 blocking shuffle 实现。不同于 `Hash Shuffle`,sort shuffle 将每个分区结果写入到一个文件。当多个下游任务同时读取结果分片,数据文件只会被打开一次并共享给所有的读请求。因此,集群使用更少的资源。例如:节点和文件描述符以提升稳定性。此外,通过写更少的文件和尽可能线性的读取文件,尤其是在使用机械硬盘情况下 sort shuffle 可以获得比 hash shuffle 更好的性能。另外,`sort shuffle` 使用额外管理的内存作为读数据缓存并不依赖 `sendfile` 或 `mmap` 机制,因此也适用于 [SSL]({{< ref "docs/deployment/security/security-ssl" >}})。关于 sort shuffle 的更多细节请参考 [FLINK-19582](https://issues.apache.org/jira/browse/FLINK-19582) 和 [FLINK-19614](https://issues.a [...]
 
 当使用sort blocking shuffle的时候有些配置需要适配:
-- [taskmanager.network.blocking-shuffle.compression.enabled]({{< ref "docs/deployment/config" >}}#taskmanager-network-blocking-shuffle-compression-enabled): 配置该选项以启用 shuffle data 压缩,大部分任务建议开启除非你的数据压缩比率比较低。
+- [taskmanager.network.blocking-shuffle.compression.enabled]({{< ref "docs/deployment/config" >}}#taskmanager-network-blocking-shuffle-compression-enabled): 配置该选项以启用 shuffle data 压缩,大部分任务建议开启除非你的数据压缩比率比较低。对于 1.14 以及更低的版本默认为 false,1.15 版本起默认为 true。
 - [taskmanager.network.sort-shuffle.min-parallelism]({{< ref "docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism): 根据下游任务的并行度配置该选项以启用 sort shuffle。如果并行度低于设置的值,则使用 `hash shuffle`,否则 `sort shuffle`。
 - [taskmanager.network.sort-shuffle.min-buffers]({{< ref "docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-buffers): 配置该选项以控制数据写缓存大小。对于大规模的任务而言,你可能需要调大这个值,正常几百兆内存就足够了。
 - [taskmanager.memory.framework.off-heap.batch-shuffle.size]({{< ref "docs/deployment/config" >}}#taskmanager-memory-framework-off-heap-batch-shuffle-size): 配置该选项以控制数据读取缓存大小。对于大规模的任务而言,你可能需要调大这个值,正常几百兆内存就足够了。
diff --git a/docs/content/docs/ops/batch/blocking_shuffle.md b/docs/content/docs/ops/batch/blocking_shuffle.md
index 7fdf13a..7bf647b 100644
--- a/docs/content/docs/ops/batch/blocking_shuffle.md
+++ b/docs/content/docs/ops/batch/blocking_shuffle.md
@@ -71,7 +71,7 @@ To further improve the performance, for most jobs we also recommend [enabling co
 `Sort Shuffle` is another blocking shuffle implementation introduced in version 1.13. Different from `Hash Shuffle`, sort shuffle writes only one file for each result partition. When the result partition is read by multiple downstream tasks concurrently, the data file is opened only once and shared by all readers. As a result, the cluster uses fewer resources like inode and file descriptors, which improves stability. Furthermore, by writing fewer files and making a best effort to read da [...]
 
 There are several config options that might need adjustment when using sort blocking shuffle:
-- [taskmanager.network.blocking-shuffle.compression.enabled]({{< ref "docs/deployment/config" >}}#taskmanager-network-blocking-shuffle-compression-enabled): Config option for shuffle data compression. it is suggested to enable it for most jobs except that the compression ratio of your data is low.
+- [taskmanager.network.blocking-shuffle.compression.enabled]({{< ref "docs/deployment/config" >}}#taskmanager-network-blocking-shuffle-compression-enabled): Config option for shuffle data compression. it is suggested to enable it for most jobs except that the compression ratio of your data is low. Defaults to false for 1.14 and lower, and true for 1.15 and higher.
 - [taskmanager.network.sort-shuffle.min-parallelism]({{< ref "docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism): Config option to enable sort shuffle depending on the parallelism of downstream tasks. If parallelism is lower than the configured value, `hash shuffle` will be used, otherwise `sort shuffle` will be used.
 - [taskmanager.network.sort-shuffle.min-buffers]({{< ref "docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-buffers): Config option to control data writing buffer size. For large scale jobs, you may need to increase this value, usually, several hundreds of megabytes memory is enough.
 - [taskmanager.memory.framework.off-heap.batch-shuffle.size]({{< ref "docs/deployment/config" >}}#taskmanager-memory-framework-off-heap-batch-shuffle-size): Config option to control data reading buffer size. For large scale jobs, you may need to increase this value, usually, several hundreds of megabytes memory is enough.
diff --git a/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html b/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html
index 4865f35..9952de2 100644
--- a/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html
+++ b/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html
@@ -10,7 +10,7 @@
     <tbody>
         <tr>
             <td><h5>taskmanager.network.blocking-shuffle.compression.enabled</h5></td>
-            <td style="word-wrap: break-word;">false</td>
+            <td style="word-wrap: break-word;">true</td>
             <td>Boolean</td>
             <td>Boolean flag indicating whether the shuffle data will be compressed for blocking shuffle mode. Note that data is compressed per buffer and compression can incur extra CPU overhead, so it is more effective for IO bounded scenario when compression ratio is high.</td>
         </tr>
@@ -144,7 +144,7 @@
             <td><h5>taskmanager.network.sort-shuffle.min-parallelism</h5></td>
             <td style="word-wrap: break-word;">2147483647</td>
             <td>Integer</td>
-            <td>Parallelism threshold to switch between sort-merge blocking shuffle and the default hash-based blocking shuffle, which means for batch jobs of small parallelism, the hash-based blocking shuffle will be used and for batch jobs of large parallelism, the sort-merge one will be used. Note: For production usage, if sort-merge blocking shuffle is enabled, you may also need to enable data compression by setting 'taskmanager.network.blocking-shuffle.compression.enabled' to true a [...]
+            <td>Parallelism threshold to switch between sort-merge blocking shuffle and the default hash-based blocking shuffle, which means for batch jobs of small parallelism, the hash-based blocking shuffle will be used and for batch jobs of large parallelism, the sort-merge one will be used. Note: For production usage, if sort-merge blocking shuffle is enabled, you may also need to tune 'taskmanager.network.sort-shuffle.min-buffers' and 'taskmanager.memory.framework.off-heap.batch-sh [...]
         </tr>
     </tbody>
 </table>
diff --git a/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html b/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html
index a304de9..10023d8 100644
--- a/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html
+++ b/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html
@@ -28,7 +28,7 @@
         </tr>
         <tr>
             <td><h5>taskmanager.network.blocking-shuffle.compression.enabled</h5></td>
-            <td style="word-wrap: break-word;">false</td>
+            <td style="word-wrap: break-word;">true</td>
             <td>Boolean</td>
             <td>Boolean flag indicating whether the shuffle data will be compressed for blocking shuffle mode. Note that data is compressed per buffer and compression can incur extra CPU overhead, so it is more effective for IO bounded scenario when compression ratio is high.</td>
         </tr>
@@ -132,7 +132,7 @@
             <td><h5>taskmanager.network.sort-shuffle.min-parallelism</h5></td>
             <td style="word-wrap: break-word;">2147483647</td>
             <td>Integer</td>
-            <td>Parallelism threshold to switch between sort-merge blocking shuffle and the default hash-based blocking shuffle, which means for batch jobs of small parallelism, the hash-based blocking shuffle will be used and for batch jobs of large parallelism, the sort-merge one will be used. Note: For production usage, if sort-merge blocking shuffle is enabled, you may also need to enable data compression by setting 'taskmanager.network.blocking-shuffle.compression.enabled' to true a [...]
+            <td>Parallelism threshold to switch between sort-merge blocking shuffle and the default hash-based blocking shuffle, which means for batch jobs of small parallelism, the hash-based blocking shuffle will be used and for batch jobs of large parallelism, the sort-merge one will be used. Note: For production usage, if sort-merge blocking shuffle is enabled, you may also need to tune 'taskmanager.network.sort-shuffle.min-buffers' and 'taskmanager.memory.framework.off-heap.batch-sh [...]
         </tr>
     </tbody>
 </table>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
index e18f39c..67c3280 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
@@ -76,7 +76,8 @@ public class NettyShuffleEnvironmentOptions {
     @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
     public static final ConfigOption<Boolean> BLOCKING_SHUFFLE_COMPRESSION_ENABLED =
             key("taskmanager.network.blocking-shuffle.compression.enabled")
-                    .defaultValue(false)
+                    .booleanType()
+                    .defaultValue(true)
                     .withDescription(
                             "Boolean flag indicating whether the shuffle data will be compressed "
                                     + "for blocking shuffle mode. Note that data is compressed per "
@@ -232,9 +233,7 @@ public class NettyShuffleEnvironmentOptions {
                                             + " batch jobs of large parallelism, the sort-merge one"
                                             + " will be used. Note: For production usage, if sort-"
                                             + "merge blocking shuffle is enabled, you may also need"
-                                            + " to enable data compression by setting '%s' to true "
-                                            + "and tune '%s' and '%s' for better performance.",
-                                    BLOCKING_SHUFFLE_COMPRESSION_ENABLED.key(),
+                                            + " to tune '%s' and '%s' for better performance.",
                                     NETWORK_SORT_SHUFFLE_MIN_BUFFERS.key(),
                                     // raw string key is used here to avoid interdependence, a test
                                     // is implemented to guard that when the target key is modified,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NettyShuffleEnvironmentConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NettyShuffleEnvironmentConfigurationTest.java
index 3a69f29..2ba3441 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NettyShuffleEnvironmentConfigurationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NettyShuffleEnvironmentConfigurationTest.java
@@ -90,12 +90,13 @@ public class NettyShuffleEnvironmentConfigurationTest extends TestLogger {
         String description = formatter.format(configOption.description());
 
         String configKey =
-                getConfigKey(NettyShuffleEnvironmentOptions.BLOCKING_SHUFFLE_COMPRESSION_ENABLED);
-        assertTrue(description.contains(configKey));
-        configKey = getConfigKey(NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS);
+                getConfigKey(NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS);
         assertTrue(description.contains(configKey));
         configKey = getConfigKey(TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY);
         assertTrue(description.contains(configKey));
+
+        assertTrue(
+                NettyShuffleEnvironmentOptions.BLOCKING_SHUFFLE_COMPRESSION_ENABLED.defaultValue());
     }
 
     private static String getConfigKey(ConfigOption<?> configOption) {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java
index c14344c..d458136 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java
@@ -82,10 +82,10 @@ public class ShuffleCompressionITCase {
     }
 
     @Test
-    public void testDataCompressionForBoundedBlockingShuffle() throws Exception {
+    public void testNoDataCompressionForBoundedBlockingShuffle() throws Exception {
         Configuration configuration = new Configuration();
         configuration.setBoolean(
-                NettyShuffleEnvironmentOptions.BLOCKING_SHUFFLE_COMPRESSION_ENABLED, true);
+                NettyShuffleEnvironmentOptions.BLOCKING_SHUFFLE_COMPRESSION_ENABLED, false);
         configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofMinutes(1));
 
         JobGraph jobGraph = createJobGraph(ResultPartitionType.BLOCKING, ExecutionMode.BATCH);
@@ -93,10 +93,10 @@ public class ShuffleCompressionITCase {
     }
 
     @Test
-    public void testDataCompressionForSortMergeBlockingShuffle() throws Exception {
+    public void testNoDataCompressionForSortMergeBlockingShuffle() throws Exception {
         Configuration configuration = new Configuration();
         configuration.setBoolean(
-                NettyShuffleEnvironmentOptions.BLOCKING_SHUFFLE_COMPRESSION_ENABLED, true);
+                NettyShuffleEnvironmentOptions.BLOCKING_SHUFFLE_COMPRESSION_ENABLED, false);
         configuration.setInteger(
                 NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, 1);
         configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofMinutes(1));