You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/08/05 05:04:24 UTC

[flink] 01/02: [FLINK-28781] Rename blockingShuffleCompressionEnabled to batchShuffleCompressionEnabled.

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9f5d0c48f198ff69a175f630832687ba02cf4c3e
Author: Weijie Guo <re...@163.com>
AuthorDate: Wed Aug 3 15:15:51 2022 +0800

    [FLINK-28781] Rename blockingShuffleCompressionEnabled to batchShuffleCompressionEnabled.
---
 .../generated/all_taskmanager_network_section.html |  4 ++--
 .../netty_shuffle_environment_configuration.html   |  4 ++--
 .../NettyShuffleEnvironmentOptions.java            | 10 ++++-----
 .../io/network/NettyShuffleServiceFactory.java     |  2 +-
 .../network/partition/ResultPartitionFactory.java  |  8 ++++----
 .../partition/consumer/SingleInputGateFactory.java |  7 +++----
 .../NettyShuffleEnvironmentConfiguration.java      | 24 ++++++++++------------
 .../NettyShuffleEnvironmentConfigurationTest.java  |  3 +--
 .../test/runtime/ShuffleCompressionITCase.java     |  4 ++--
 9 files changed, 31 insertions(+), 35 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html b/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html
index a626890a3fc..34deb37d450 100644
--- a/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html
+++ b/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html
@@ -9,10 +9,10 @@
     </thead>
     <tbody>
         <tr>
-            <td><h5>taskmanager.network.blocking-shuffle.compression.enabled</h5></td>
+            <td><h5>taskmanager.network.batch-shuffle.compression.enabled</h5></td>
             <td style="word-wrap: break-word;">true</td>
             <td>Boolean</td>
-            <td>Boolean flag indicating whether the shuffle data will be compressed for blocking shuffle mode. Note that data is compressed per buffer and compression can incur extra CPU overhead, so it is more effective for IO bounded scenario when compression ratio is high.</td>
+            <td>Boolean flag indicating whether the shuffle data will be compressed for batch shuffle mode. Note that data is compressed per buffer and compression can incur extra CPU overhead, so it is more effective for IO bounded scenario when compression ratio is high.</td>
         </tr>
         <tr>
             <td><h5>taskmanager.network.blocking-shuffle.type</h5></td>
diff --git a/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html b/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html
index 8bbd176d85e..4ed81c078e3 100644
--- a/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html
+++ b/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html
@@ -27,10 +27,10 @@
             <td>Enable SSL support for the taskmanager data transport. This is applicable only when the global flag for internal SSL (security.ssl.internal.enabled) is set to true</td>
         </tr>
         <tr>
-            <td><h5>taskmanager.network.blocking-shuffle.compression.enabled</h5></td>
+            <td><h5>taskmanager.network.batch-shuffle.compression.enabled</h5></td>
             <td style="word-wrap: break-word;">true</td>
             <td>Boolean</td>
-            <td>Boolean flag indicating whether the shuffle data will be compressed for blocking shuffle mode. Note that data is compressed per buffer and compression can incur extra CPU overhead, so it is more effective for IO bounded scenario when compression ratio is high.</td>
+            <td>Boolean flag indicating whether the shuffle data will be compressed for batch shuffle mode. Note that data is compressed per buffer and compression can incur extra CPU overhead, so it is more effective for IO bounded scenario when compression ratio is high.</td>
         </tr>
         <tr>
             <td><h5>taskmanager.network.blocking-shuffle.type</h5></td>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
index 51ba09faad3..802ee049c34 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
@@ -70,20 +70,20 @@ public class NettyShuffleEnvironmentOptions {
                                     + ") is set to true");
 
     /**
-     * Boolean flag indicating whether the shuffle data will be compressed for blocking shuffle
-     * mode.
+     * Boolean flag indicating whether the shuffle data will be compressed for batch shuffle mode.
      *
      * <p>Note: Data is compressed per buffer and compression can incur extra CPU overhead so it is
      * more effective for IO bounded scenario when data compression ratio is high.
      */
     @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
-    public static final ConfigOption<Boolean> BLOCKING_SHUFFLE_COMPRESSION_ENABLED =
-            key("taskmanager.network.blocking-shuffle.compression.enabled")
+    public static final ConfigOption<Boolean> BATCH_SHUFFLE_COMPRESSION_ENABLED =
+            key("taskmanager.network.batch-shuffle.compression.enabled")
                     .booleanType()
                     .defaultValue(true)
+                    .withDeprecatedKeys("taskmanager.network.blocking-shuffle.compression.enabled")
                     .withDescription(
                             "Boolean flag indicating whether the shuffle data will be compressed "
-                                    + "for blocking shuffle mode. Note that data is compressed per "
+                                    + "for batch shuffle mode. Note that data is compressed per "
                                     + "buffer and compression can incur extra CPU overhead, so it "
                                     + "is more effective for IO bounded scenario when compression "
                                     + "ratio is high.");
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
index 6f9c0bb29d7..98a202c763a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
@@ -204,7 +204,7 @@ public class NettyShuffleServiceFactory
                         config.networkBuffersPerChannel(),
                         config.floatingNetworkBuffersPerGate(),
                         config.networkBufferSize(),
-                        config.isBlockingShuffleCompressionEnabled(),
+                        config.isBatchShuffleCompressionEnabled(),
                         config.getCompressionCodec(),
                         config.getMaxBuffersPerChannel(),
                         config.sortShuffleMinBuffers(),
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
index 6d47bb3c047..73171089203 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
@@ -65,7 +65,7 @@ public class ResultPartitionFactory {
 
     private final int networkBufferSize;
 
-    private final boolean blockingShuffleCompressionEnabled;
+    private final boolean batchShuffleCompressionEnabled;
 
     private final String compressionCodec;
 
@@ -89,7 +89,7 @@ public class ResultPartitionFactory {
             int configuredNetworkBuffersPerChannel,
             int floatingNetworkBuffersPerGate,
             int networkBufferSize,
-            boolean blockingShuffleCompressionEnabled,
+            boolean batchShuffleCompressionEnabled,
             String compressionCodec,
             int maxBuffersPerChannel,
             int sortShuffleMinBuffers,
@@ -106,7 +106,7 @@ public class ResultPartitionFactory {
         this.batchShuffleReadIOExecutor = batchShuffleReadIOExecutor;
         this.blockingSubpartitionType = blockingSubpartitionType;
         this.networkBufferSize = networkBufferSize;
-        this.blockingShuffleCompressionEnabled = blockingShuffleCompressionEnabled;
+        this.batchShuffleCompressionEnabled = batchShuffleCompressionEnabled;
         this.compressionCodec = compressionCodec;
         this.maxBuffersPerChannel = maxBuffersPerChannel;
         this.sortShuffleMinBuffers = sortShuffleMinBuffers;
@@ -140,7 +140,7 @@ public class ResultPartitionFactory {
             SupplierWithException<BufferPool, IOException> bufferPoolFactory) {
         BufferCompressor bufferCompressor = null;
         if (type.isBlockingOrBlockingPersistentResultPartition()
-                && blockingShuffleCompressionEnabled) {
+                && batchShuffleCompressionEnabled) {
             bufferCompressor = new BufferCompressor(networkBufferSize, compressionCodec);
         }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
index a27f307f5cb..89091fcca7a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
@@ -79,7 +79,7 @@ public class SingleInputGateFactory {
 
     private final int floatingNetworkBuffersPerGate;
 
-    private final boolean blockingShuffleCompressionEnabled;
+    private final boolean batchShuffleCompressionEnabled;
 
     private final String compressionCodec;
 
@@ -101,8 +101,7 @@ public class SingleInputGateFactory {
                 NettyShuffleUtils.getNetworkBuffersPerInputChannel(
                         networkConfig.networkBuffersPerChannel());
         this.floatingNetworkBuffersPerGate = networkConfig.floatingNetworkBuffersPerGate();
-        this.blockingShuffleCompressionEnabled =
-                networkConfig.isBlockingShuffleCompressionEnabled();
+        this.batchShuffleCompressionEnabled = networkConfig.isBatchShuffleCompressionEnabled();
         this.compressionCodec = networkConfig.getCompressionCodec();
         this.networkBufferSize = networkConfig.networkBufferSize();
         this.connectionManager = connectionManager;
@@ -123,7 +122,7 @@ public class SingleInputGateFactory {
 
         BufferDecompressor bufferDecompressor = null;
         if (igdd.getConsumedPartitionType().isBlockingOrBlockingPersistentResultPartition()
-                && blockingShuffleCompressionEnabled) {
+                && batchShuffleCompressionEnabled) {
             bufferDecompressor = new BufferDecompressor(networkBufferSize, compressionCodec);
         }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
index 869f655a801..f18256649cc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
@@ -84,7 +84,7 @@ public class NettyShuffleEnvironmentConfiguration {
 
     private final BoundedBlockingSubpartitionType blockingSubpartitionType;
 
-    private final boolean blockingShuffleCompressionEnabled;
+    private final boolean batchShuffleCompressionEnabled;
 
     private final String compressionCodec;
 
@@ -111,7 +111,7 @@ public class NettyShuffleEnvironmentConfiguration {
             @Nullable NettyConfig nettyConfig,
             String[] tempDirs,
             BoundedBlockingSubpartitionType blockingSubpartitionType,
-            boolean blockingShuffleCompressionEnabled,
+            boolean batchShuffleCompressionEnabled,
             String compressionCodec,
             int maxBuffersPerChannel,
             long batchShuffleReadMemoryBytes,
@@ -133,7 +133,7 @@ public class NettyShuffleEnvironmentConfiguration {
         this.nettyConfig = nettyConfig;
         this.tempDirs = Preconditions.checkNotNull(tempDirs);
         this.blockingSubpartitionType = Preconditions.checkNotNull(blockingSubpartitionType);
-        this.blockingShuffleCompressionEnabled = blockingShuffleCompressionEnabled;
+        this.batchShuffleCompressionEnabled = batchShuffleCompressionEnabled;
         this.compressionCodec = Preconditions.checkNotNull(compressionCodec);
         this.maxBuffersPerChannel = maxBuffersPerChannel;
         this.batchShuffleReadMemoryBytes = batchShuffleReadMemoryBytes;
@@ -207,8 +207,8 @@ public class NettyShuffleEnvironmentConfiguration {
         return blockingSubpartitionType;
     }
 
-    public boolean isBlockingShuffleCompressionEnabled() {
-        return blockingShuffleCompressionEnabled;
+    public boolean isBatchShuffleCompressionEnabled() {
+        return batchShuffleCompressionEnabled;
     }
 
     public BufferDebloatConfiguration getDebloatConfiguration() {
@@ -318,9 +318,8 @@ public class NettyShuffleEnvironmentConfiguration {
         BoundedBlockingSubpartitionType blockingSubpartitionType =
                 getBlockingSubpartitionType(configuration);
 
-        boolean blockingShuffleCompressionEnabled =
-                configuration.get(
-                        NettyShuffleEnvironmentOptions.BLOCKING_SHUFFLE_COMPRESSION_ENABLED);
+        boolean batchShuffleCompressionEnabled =
+                configuration.get(NettyShuffleEnvironmentOptions.BATCH_SHUFFLE_COMPRESSION_ENABLED);
         String compressionCodec =
                 configuration.getString(NettyShuffleEnvironmentOptions.SHUFFLE_COMPRESSION_CODEC);
 
@@ -346,7 +345,7 @@ public class NettyShuffleEnvironmentConfiguration {
                 nettyConfig,
                 shuffleDirs.toArray(tempDirs),
                 blockingSubpartitionType,
-                blockingShuffleCompressionEnabled,
+                batchShuffleCompressionEnabled,
                 compressionCodec,
                 maxBuffersPerChannel,
                 batchShuffleReadMemoryBytes,
@@ -486,7 +485,7 @@ public class NettyShuffleEnvironmentConfiguration {
         result = 31 * result + requestSegmentsTimeout.hashCode();
         result = 31 * result + (nettyConfig != null ? nettyConfig.hashCode() : 0);
         result = 31 * result + Arrays.hashCode(tempDirs);
-        result = 31 * result + (blockingShuffleCompressionEnabled ? 1 : 0);
+        result = 31 * result + (batchShuffleCompressionEnabled ? 1 : 0);
         result = 31 * result + Objects.hashCode(compressionCodec);
         result = 31 * result + maxBuffersPerChannel;
         result = 31 * result + Objects.hashCode(batchShuffleReadMemoryBytes);
@@ -522,8 +521,7 @@ public class NettyShuffleEnvironmentConfiguration {
                             ? nettyConfig.equals(that.nettyConfig)
                             : that.nettyConfig == null)
                     && Arrays.equals(this.tempDirs, that.tempDirs)
-                    && this.blockingShuffleCompressionEnabled
-                            == that.blockingShuffleCompressionEnabled
+                    && this.batchShuffleCompressionEnabled == that.batchShuffleCompressionEnabled
                     && this.maxBuffersPerChannel == that.maxBuffersPerChannel
                     && Objects.equals(this.compressionCodec, that.compressionCodec)
                     && this.maxNumberOfConnections == that.maxNumberOfConnections
@@ -554,7 +552,7 @@ public class NettyShuffleEnvironmentConfiguration {
                 + ", tempDirs="
                 + Arrays.toString(tempDirs)
                 + ", blockingShuffleCompressionEnabled="
-                + blockingShuffleCompressionEnabled
+                + batchShuffleCompressionEnabled
                 + ", compressionCodec="
                 + compressionCodec
                 + ", maxBuffersPerChannel="
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NettyShuffleEnvironmentConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NettyShuffleEnvironmentConfigurationTest.java
index 2ba34417a0c..658c8273416 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NettyShuffleEnvironmentConfigurationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NettyShuffleEnvironmentConfigurationTest.java
@@ -95,8 +95,7 @@ public class NettyShuffleEnvironmentConfigurationTest extends TestLogger {
         configKey = getConfigKey(TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY);
         assertTrue(description.contains(configKey));
 
-        assertTrue(
-                NettyShuffleEnvironmentOptions.BLOCKING_SHUFFLE_COMPRESSION_ENABLED.defaultValue());
+        assertTrue(NettyShuffleEnvironmentOptions.BATCH_SHUFFLE_COMPRESSION_ENABLED.defaultValue());
     }
 
     private static String getConfigKey(ConfigOption<?> configOption) {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java
index b32db9d3cc8..1963ac7a7d0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java
@@ -85,7 +85,7 @@ public class ShuffleCompressionITCase {
     public void testNoDataCompressionForBoundedBlockingShuffle() throws Exception {
         Configuration configuration = new Configuration();
         configuration.setBoolean(
-                NettyShuffleEnvironmentOptions.BLOCKING_SHUFFLE_COMPRESSION_ENABLED, false);
+                NettyShuffleEnvironmentOptions.BATCH_SHUFFLE_COMPRESSION_ENABLED, false);
         configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofMinutes(1));
         configuration.setInteger(
                 NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
@@ -99,7 +99,7 @@ public class ShuffleCompressionITCase {
     public void testNoDataCompressionForSortMergeBlockingShuffle() throws Exception {
         Configuration configuration = new Configuration();
         configuration.setBoolean(
-                NettyShuffleEnvironmentOptions.BLOCKING_SHUFFLE_COMPRESSION_ENABLED, false);
+                NettyShuffleEnvironmentOptions.BATCH_SHUFFLE_COMPRESSION_ENABLED, false);
         configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofMinutes(1));
 
         JobGraph jobGraph = createJobGraph(ResultPartitionType.BLOCKING, ExecutionMode.BATCH);