You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2023/05/08 09:23:45 UTC

[flink] branch master updated: [FLINK-31448][runtime] use fine-grained slot manager as default

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3df65910025 [FLINK-31448][runtime] use fine-grained slot manager as default
3df65910025 is described below

commit 3df65910025ccba93d75b3a885ef5d0b67becd17
Author: Weihua Hu <hu...@gmail.com>
AuthorDate: Mon Mar 20 16:13:28 2023 +0800

    [FLINK-31448][runtime] use fine-grained slot manager as default
    
    This closes #22224
---
 .../docs/deployment/finegrained_resource.md          | 17 +----------------
 docs/content/docs/deployment/finegrained_resource.md | 20 +-------------------
 .../shortcodes/generated/cluster_configuration.html  |  6 ------
 .../generated/expert_scheduling_section.html         |  6 ------
 .../apache/flink/configuration/ClusterOptions.java   | 12 +++---------
 .../ResourceManagerRuntimeServicesConfiguration.java |  2 +-
 .../active/ActiveResourceManagerFactory.java         |  2 +-
 tools/azure-pipelines/jobs-template.yml              |  2 --
 tools/ci/stage.sh                                    | 13 -------------
 tools/ci/test_controller.sh                          |  7 -------
 10 files changed, 7 insertions(+), 80 deletions(-)

diff --git a/docs/content.zh/docs/deployment/finegrained_resource.md b/docs/content.zh/docs/deployment/finegrained_resource.md
index 893bd50cec3..258c6dad266 100644
--- a/docs/content.zh/docs/deployment/finegrained_resource.md
+++ b/docs/content.zh/docs/deployment/finegrained_resource.md
@@ -69,20 +69,7 @@ Flink之前的资源申请只包含必须指定的 slots,但没有精细化的
 
 ## 用法
 
-为了可以使用细粒度的资源管理,需要做以下步骤:
-
-- 配置细粒度的资源管理
-
-- 指定资源请求
-
-### 启用细粒度资源管理
-
-为了启用细粒度的资源管理配置,需要将 [cluster.fine-grained-resource-management.enabled]({{< ref "docs/deployment/config" >}}#cluster-fine-grained-resource-management-enabled) 的值设置为 true。
-{{< hint danger >}}
-没有该配置,Flink 运行 job 时并不能按照你指定的资源需求分配 slots,并且 job 会失败抛出异常。
-{{< /hint >}}
-
-### 为 Slot 共享组指定资源请求
+使用细粒度的资源管理,需要指定资源请求。
 
 细粒度资源请求是基于 slot 共享组定义的。一个 slot 共享组是一个切入点,这意味着在 TaskManager 中的算子和 tasks 可以被置于相同的 slot。
 
@@ -240,8 +227,6 @@ env.register_slot_sharing_group(ssg_with_resource)
 
 因为细粒度资源管理是新的实验性特性,并不是所有的特性都被默认的调度器所支持.Flink 社区正努力解决并突破这些限制。
 - **不支持[弹性伸缩]({{< ref "docs/deployment/elastic_scaling" >}})**. 弹性伸缩目前只支持不指定资源的slot请求。
-- **不支持TaskManager的冗余** TaskManager冗余 [slotmanager.redundant-taskmanager-num]({{< ref "docs/deployment/config" >}}#slotmanager-redundant-taskmanager-num) 用于启动冗余的 TaskManager 以加速 job 恢复。当前该配置在细粒度资源管理中不生效。
-- **不支持均匀分布的插槽策略** 此策略试图在所有可用的TaskManager中均匀分配插槽 [cluster.evenly-spread-out-slots]({{< ref "docs/deployment/config" >}}#cluster-evenly-spread-out-slots)。该策略在细粒度资源管理的第一个版本中不受支持,目前不会生效。
 - **与Flink Web UI有限的集成** 在细粒度的资源管理中,Slots会有不同的资源规格.目前Web UI页面只显示 slot 数量而不显示具体详情。
 - **与批作业有限的集成** 目前,细粒度资源管理需要在所有边缘都被阻塞的情况下执行批处理工作负载。为了达到该实现,需要将配置 [fine-grained.shuffle-mode.all-blocking]({{< ref "docs/deployment/config" >}}#fine-grained-shuffle-mode-all-blocking)设置为 true。注意这样可能会影响性能。详情请见[FLINK-20865](https://issues.apache.org/jira/browse/FLINK-20865)。
 - **不建议使用混合资源需求** 不建议仅为工作的某些部分指定资源需求,而未指定其余部分的需求。目前,任何资源的插槽都可以满足未指定的要求。它获取的实际资源可能在不同的作业执行或故障切换中不一致。
diff --git a/docs/content/docs/deployment/finegrained_resource.md b/docs/content/docs/deployment/finegrained_resource.md
index b6e505c18c3..dd34962c016 100644
--- a/docs/content/docs/deployment/finegrained_resource.md
+++ b/docs/content/docs/deployment/finegrained_resource.md
@@ -76,21 +76,7 @@ Please refer to [Resource Allocation Strategy](#resource-allocation-strategy) fo
 
 ## Usage
 
-To use fine-grained resource management, you need to:
-
-  - Configure to enable fine-grained resource management.
-
-  - Specify the resource requirement.
-
-### Enable Fine-Grained Resource Management
-
-To enable fine-grained resource management, you need to configure the [cluster.fine-grained-resource-management.enabled]({{< ref "docs/deployment/config" >}}#cluster-fine-grained-resource-management-enabled) to true.
-
-{{< hint danger >}}
-Without this configuration, the Flink runtime cannot schedule the slots with your specified resource requirement and the job will fail with an exception.
-{{< /hint >}}
-
-### Specify Resource Requirement for Slot Sharing Group
+To use fine-grained resource management, you need to specify the resource requirement.
 
 Fine-grained resource requirements are defined on slot sharing groups. A slot sharing group is a hint that tells the JobManager operators/tasks in it CAN be put into the same slot.
 
@@ -250,10 +236,6 @@ scheduler are also available with it. The Flink community is working on addressi
 
   - **No support for the [Elastic Scaling]({{< ref "docs/deployment/elastic_scaling" >}})**. The elastic scaling only supports slot requests without specified-resource at the moment.
 
-  - **No support for task manager redundancy**. The [slotmanager.redundant-taskmanager-num]({{< ref "docs/deployment/config" >}}#slotmanager-redundant-taskmanager-num) is used to start redundant TaskManagers to speed up job recovery. This config option will not take effect in fine-grained resource management at the moment.
-
-  - **No support for evenly spread out slot strategy**. This strategy tries to spread out the slots evenly across all available TaskManagers. The strategy is not supported in the first version of fine-grained resource management and [cluster.evenly-spread-out-slots]({{< ref "docs/deployment/config" >}}#cluster-evenly-spread-out-slots) will not take effect in it at the moment.
-
   - **Limited integration with Flink’s Web UI**. Slots in fine-grained resource management can have different resource specs. The web UI only shows the slot number without its details at the moment.
 
   - **Limited integration with batch jobs**. At the moment, fine-grained resource management requires batch workloads to be executed with types of all edges being BLOCKING. To do that, you need to configure [fine-grained.shuffle-mode.all-blocking]({{< ref "docs/deployment/config" >}}#fine-grained-shuffle-mode-all-blocking) to `true`. Notice that this may affect the performance. See [FLINK-20865](https://issues.apache.org/jira/browse/FLINK-20865) for more details.
diff --git a/docs/layouts/shortcodes/generated/cluster_configuration.html b/docs/layouts/shortcodes/generated/cluster_configuration.html
index 8639eb3bf8c..d734ea405f1 100644
--- a/docs/layouts/shortcodes/generated/cluster_configuration.html
+++ b/docs/layouts/shortcodes/generated/cluster_configuration.html
@@ -14,12 +14,6 @@
             <td>Boolean</td>
             <td>Enable the slot spread out allocation strategy. This strategy tries to spread out the slots evenly across all available <code class="highlighter-rouge">TaskExecutors</code>.</td>
         </tr>
-        <tr>
-            <td><h5>cluster.fine-grained-resource-management.enabled</h5></td>
-            <td style="word-wrap: break-word;">false</td>
-            <td>Boolean</td>
-            <td>Defines whether the cluster uses fine-grained resource management.</td>
-        </tr>
         <tr>
             <td><h5>cluster.intercept-user-system-exit</h5></td>
             <td style="word-wrap: break-word;">DISABLED</td>
diff --git a/docs/layouts/shortcodes/generated/expert_scheduling_section.html b/docs/layouts/shortcodes/generated/expert_scheduling_section.html
index 26fe4f20a87..8ae5ab14d96 100644
--- a/docs/layouts/shortcodes/generated/expert_scheduling_section.html
+++ b/docs/layouts/shortcodes/generated/expert_scheduling_section.html
@@ -14,12 +14,6 @@
             <td>Boolean</td>
             <td>Enable the slot spread out allocation strategy. This strategy tries to spread out the slots evenly across all available <code class="highlighter-rouge">TaskExecutors</code>.</td>
         </tr>
-        <tr>
-            <td><h5>cluster.fine-grained-resource-management.enabled</h5></td>
-            <td style="word-wrap: break-word;">false</td>
-            <td>Boolean</td>
-            <td>Defines whether the cluster uses fine-grained resource management.</td>
-        </tr>
         <tr>
             <td><h5>execution.batch.adaptive.auto-parallelism.avg-data-volume-per-task</h5></td>
             <td style="word-wrap: break-word;">16 mb</td>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
index 92678f82465..58150beb4b7 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
@@ -137,10 +137,12 @@ public class ClusterOptions {
                             "The maximum stacktrace depth of TaskManager and JobManager's thread dump web-frontend displayed.");
 
     @Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING)
+    @Documentation.ExcludeFromDocumentation("Hidden for deprecated")
+    @Deprecated
     public static final ConfigOption<Boolean> ENABLE_FINE_GRAINED_RESOURCE_MANAGEMENT =
             ConfigOptions.key("cluster.fine-grained-resource-management.enabled")
                     .booleanType()
-                    .defaultValue(false)
+                    .defaultValue(true)
                     .withDescription(
                             "Defines whether the cluster uses fine-grained resource management.");
 
@@ -227,14 +229,6 @@ public class ClusterOptions {
         }
     }
 
-    public static boolean isFineGrainedResourceManagementEnabled(Configuration configuration) {
-        if (configuration.contains(ENABLE_FINE_GRAINED_RESOURCE_MANAGEMENT)) {
-            return configuration.get(ENABLE_FINE_GRAINED_RESOURCE_MANAGEMENT);
-        } else {
-            return System.getProperties().containsKey("flink.tests.enable-fine-grained");
-        }
-    }
-
     /** The mode of how to handle user code attempting to exit JVM. */
     public enum UserSystemExitMode implements DescribedEnum {
         DISABLED(text("Flink is not monitoring or intercepting calls to System.exit()")),
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServicesConfiguration.java
index d37cf7e1a0a..fe023452198 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServicesConfiguration.java
@@ -84,7 +84,7 @@ public class ResourceManagerRuntimeServicesConfiguration {
                         configuration, defaultWorkerResourceSpec);
 
         final boolean enableFineGrainedResourceManagement =
-                ClusterOptions.isFineGrainedResourceManagementEnabled(configuration);
+                configuration.getBoolean(ClusterOptions.ENABLE_FINE_GRAINED_RESOURCE_MANAGEMENT);
 
         return new ResourceManagerRuntimeServicesConfiguration(
                 jobTimeout, slotManagerConfiguration, enableFineGrainedResourceManagement);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerFactory.java
index dd39914c6b0..cef883587b8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerFactory.java
@@ -64,7 +64,7 @@ public abstract class ActiveResourceManagerFactory<WorkerType extends ResourceID
     @Override
     protected Configuration getEffectiveConfigurationForResourceManager(
             Configuration configuration) {
-        if (ClusterOptions.isFineGrainedResourceManagementEnabled(configuration)) {
+        if (configuration.getBoolean(ClusterOptions.ENABLE_FINE_GRAINED_RESOURCE_MANAGEMENT)) {
             final Configuration copiedConfig = new Configuration(configuration);
 
             if (copiedConfig.removeConfig(TaskManagerOptions.TOTAL_PROCESS_MEMORY)) {
diff --git a/tools/azure-pipelines/jobs-template.yml b/tools/azure-pipelines/jobs-template.yml
index 088977108f7..8a586de96c8 100644
--- a/tools/azure-pipelines/jobs-template.yml
+++ b/tools/azure-pipelines/jobs-template.yml
@@ -100,8 +100,6 @@ jobs:
         module: tests
       misc:
         module: misc
-      finegrained_resource_management:
-        module: finegrained_resource_management
   steps:
   # if on Azure, free up disk space
   - script: ./tools/azure-pipelines/free_disk_space.sh
diff --git a/tools/ci/stage.sh b/tools/ci/stage.sh
index b16f7cd394a..1127f5ecd23 100755
--- a/tools/ci/stage.sh
+++ b/tools/ci/stage.sh
@@ -26,7 +26,6 @@ STAGE_CONNECTORS_2="connect_2"
 STAGE_TESTS="tests"
 STAGE_MISC="misc"
 STAGE_CLEANUP="cleanup"
-STAGE_FINEGRAINED_RESOURCE_MANAGEMENT="finegrained_resource_management"
 
 MODULES_CORE="\
 flink-annotations,\
@@ -142,11 +141,6 @@ MODULES_TESTS="\
 flink-tests,\
 "
 
-MODULES_FINEGRAINED_RESOURCE_MANAGEMENT="\
-flink-runtime,\
-flink-tests,\
-"
-
 function get_compile_modules_for_stage() {
     local stage=$1
 
@@ -175,9 +169,6 @@ function get_compile_modules_for_stage() {
             # compile everything for PyFlink.
             echo ""
         ;;
-        (${STAGE_FINEGRAINED_RESOURCE_MANAGEMENT})
-            echo "-pl $MODULES_FINEGRAINED_RESOURCE_MANAGEMENT -am"
-        ;;
     esac
 }
 
@@ -195,7 +186,6 @@ function get_test_modules_for_stage() {
     local negated_connectors_1=\!${MODULES_CONNECTORS_1//,/,\!}
     local negated_tests=\!${MODULES_TESTS//,/,\!}
     local modules_misc="$negated_core,$negated_table,$negated_connectors_1,$negated_connectors_2,$negated_tests"
-    local modules_finegrained_resource_management=$MODULES_FINEGRAINED_RESOURCE_MANAGEMENT
 
     case ${stage} in
         (${STAGE_CORE})
@@ -216,8 +206,5 @@ function get_test_modules_for_stage() {
         (${STAGE_MISC})
             echo "-pl $modules_misc"
         ;;
-        (${STAGE_FINEGRAINED_RESOURCE_MANAGEMENT})
-            echo "-pl $modules_finegrained_resource_management"
-        ;;
     esac
 }
diff --git a/tools/ci/test_controller.sh b/tools/ci/test_controller.sh
index a7d06c4d269..2df79af0302 100755
--- a/tools/ci/test_controller.sh
+++ b/tools/ci/test_controller.sh
@@ -106,13 +106,6 @@ if [ $STAGE == $STAGE_PYTHON ]; then
 	EXIT_CODE=$?
 else
 	MVN_TEST_OPTIONS="-Dflink.tests.with-openssl -Dflink.tests.check-segment-multiple-free -Darchunit.freeze.store.default.allowStoreUpdate=false -Dakka.rpc.force-invocation-serialization"
-	if [ $STAGE = $STAGE_FINEGRAINED_RESOURCE_MANAGEMENT ]; then
-		if [[ ${PROFILE} == *"enable-adaptive-scheduler"* ]]; then
-			echo "Skipping fine grained resource management test stage in adaptive scheduler job"
-			exit 0
-		fi
-		MVN_TEST_OPTIONS="$MVN_TEST_OPTIONS -Dflink.tests.enable-fine-grained"
-	fi
 	MVN_TEST_MODULES=$(get_test_modules_for_stage ${STAGE})
 
 	run_with_watchdog "run_mvn $MVN_COMMON_OPTIONS $MVN_TEST_OPTIONS $PROFILE $MVN_TEST_MODULES verify" $CALLBACK_ON_TIMEOUT