You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2022/08/09 14:24:02 UTC

[flink] branch master updated: [FLINK-28873][configuration] Make jobmanager.scheduler visible in documentation

This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 15f87d4a470 [FLINK-28873][configuration] Make jobmanager.scheduler visible in documentation
15f87d4a470 is described below

commit 15f87d4a470e9bf29fd18874c26c4506ea57c09f
Author: Lijie Wang <wa...@gmail.com>
AuthorDate: Mon Aug 8 21:41:54 2022 +0800

    [FLINK-28873][configuration] Make jobmanager.scheduler visible in documentation
---
 .../generated/all_jobmanager_section.html          |  6 ++++++
 .../generated/expert_scheduling_section.html       |  6 ++++++
 .../generated/job_manager_configuration.html       |  6 ++++++
 .../flink/configuration/JobManagerOptions.java     | 22 +++++++++++++++++-----
 .../flink/api/common/ExecutionConfigTest.java      |  2 +-
 flink-end-to-end-tests/test-scripts/test_tpcds.sh  |  4 ++--
 .../DefaultSlotPoolServiceSchedulerFactory.java    | 14 +++++++++++---
 .../runtime/scheduler/DefaultSchedulerFactory.java |  2 +-
 ...DefaultSlotPoolServiceSchedulerFactoryTest.java |  2 +-
 .../runtime/jobmaster/JobMasterSchedulerTest.java  |  2 +-
 .../scheduler/TestingSchedulerNGFactory.java       |  2 +-
 11 files changed, 53 insertions(+), 15 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/all_jobmanager_section.html b/docs/layouts/shortcodes/generated/all_jobmanager_section.html
index ae97499a5bb..83a7365a12a 100644
--- a/docs/layouts/shortcodes/generated/all_jobmanager_section.html
+++ b/docs/layouts/shortcodes/generated/all_jobmanager_section.html
@@ -122,6 +122,12 @@
             <td>Integer</td>
             <td>The config parameter defining the network port to connect to for communication with the job manager. Like jobmanager.rpc.address, this value is only interpreted in setups where a single JobManager with static name/address and port exists (simple standalone setups, or container setups with dynamic service name resolution). This config option is not used in many high-availability setups, when a leader-election service (like ZooKeeper) is used to elect and discover the JobMa [...]
         </tr>
+        <tr>
+            <td><h5>jobmanager.scheduler</h5></td>
+            <td style="word-wrap: break-word;">Default</td>
+            <td><p>Enum</p></td>
+            <td>Determines which scheduler implementation is used to schedule tasks. Accepted values are:<ul><li>'Default': Default scheduler</li><li>'Adaptive': Adaptive scheduler. More details can be found <a href="{{.Site.BaseURL}}{{.Site.LanguagePrefix}}/docs/deployment/elastic_scaling#adaptive-scheduler">here</a>.</li><li>'AdaptiveBatch': Adaptive batch scheduler. More details can be found <a href="{{.Site.BaseURL}}{{.Site.LanguagePrefix}}/docs/deployment/elastic_scaling#adaptive-ba [...]
+        </tr>
         <tr>
             <td><h5>jobstore.cache-size</h5></td>
             <td style="word-wrap: break-word;">52428800</td>
diff --git a/docs/layouts/shortcodes/generated/expert_scheduling_section.html b/docs/layouts/shortcodes/generated/expert_scheduling_section.html
index 29f1f4ce141..fa4e7ea8c39 100644
--- a/docs/layouts/shortcodes/generated/expert_scheduling_section.html
+++ b/docs/layouts/shortcodes/generated/expert_scheduling_section.html
@@ -86,6 +86,12 @@
             <td>Duration</td>
             <td>The maximum time the JobManager will wait to acquire all required resources after a job submission or restart. Once elapsed it will try to run the job with a lower parallelism, or fail if the minimum amount of resources could not be acquired.<br />Increasing this value will make the cluster more resilient against temporary resources shortages (e.g., there is more time for a failed TaskManager to be restarted).<br />Setting a negative duration will disable the resource tim [...]
         </tr>
+        <tr>
+            <td><h5>jobmanager.scheduler</h5></td>
+            <td style="word-wrap: break-word;">Default</td>
+            <td><p>Enum</p></td>
+            <td>Determines which scheduler implementation is used to schedule tasks. Accepted values are:<ul><li>'Default': Default scheduler</li><li>'Adaptive': Adaptive scheduler. More details can be found <a href="{{.Site.BaseURL}}{{.Site.LanguagePrefix}}/docs/deployment/elastic_scaling#adaptive-scheduler">here</a>.</li><li>'AdaptiveBatch': Adaptive batch scheduler. More details can be found <a href="{{.Site.BaseURL}}{{.Site.LanguagePrefix}}/docs/deployment/elastic_scaling#adaptive-ba [...]
+        </tr>
         <tr>
             <td><h5>scheduler-mode</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/docs/layouts/shortcodes/generated/job_manager_configuration.html b/docs/layouts/shortcodes/generated/job_manager_configuration.html
index 05f361fa9c2..bb1960007a0 100644
--- a/docs/layouts/shortcodes/generated/job_manager_configuration.html
+++ b/docs/layouts/shortcodes/generated/job_manager_configuration.html
@@ -188,6 +188,12 @@
             <td>Integer</td>
             <td>The config parameter defining the network port to connect to for communication with the job manager. Like jobmanager.rpc.address, this value is only interpreted in setups where a single JobManager with static name/address and port exists (simple standalone setups, or container setups with dynamic service name resolution). This config option is not used in many high-availability setups, when a leader-election service (like ZooKeeper) is used to elect and discover the JobMa [...]
         </tr>
+        <tr>
+            <td><h5>jobmanager.scheduler</h5></td>
+            <td style="word-wrap: break-word;">Default</td>
+            <td><p>Enum</p></td>
+            <td>Determines which scheduler implementation is used to schedule tasks. Accepted values are:<ul><li>'Default': Default scheduler</li><li>'Adaptive': Adaptive scheduler. More details can be found <a href="{{.Site.BaseURL}}{{.Site.LanguagePrefix}}/docs/deployment/elastic_scaling#adaptive-scheduler">here</a>.</li><li>'AdaptiveBatch': Adaptive batch scheduler. More details can be found <a href="{{.Site.BaseURL}}{{.Site.LanguagePrefix}}/docs/deployment/elastic_scaling#adaptive-ba [...]
+        </tr>
         <tr>
             <td><h5>jobstore.cache-size</h5></td>
             <td style="word-wrap: break-word;">52428800</td>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index 777283af036..3a0ad6d6124 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -411,26 +411,38 @@ public class JobManagerOptions {
                     .withDescription("The timeout in milliseconds for a idle slot in Slot Pool.");
 
     /** Config parameter determining the scheduler implementation. */
-    @Documentation.ExcludeFromDocumentation("SchedulerNG is still in development.")
+    @Documentation.Section({
+        Documentation.Sections.EXPERT_SCHEDULING,
+        Documentation.Sections.ALL_JOB_MANAGER
+    })
     public static final ConfigOption<SchedulerType> SCHEDULER =
             key("jobmanager.scheduler")
                     .enumType(SchedulerType.class)
-                    .defaultValue(SchedulerType.Ng)
+                    .defaultValue(SchedulerType.Default)
                     .withDescription(
                             Description.builder()
                                     .text(
                                             "Determines which scheduler implementation is used to schedule tasks. Accepted values are:")
                                     .list(
-                                            text("'Ng': new generation scheduler"),
+                                            text("'Default': Default scheduler"),
                                             text(
-                                                    "'Adaptive': adaptive scheduler; supports reactive mode"),
+                                                    "'Adaptive': Adaptive scheduler. More details can be found %s.",
+                                                    link(
+                                                            "{{.Site.BaseURL}}{{.Site.LanguagePrefix}}/docs/deployment/elastic_scaling#adaptive-scheduler",
+                                                            "here")),
                                             text(
-                                                    "'AdaptiveBatch': adaptive batch scheduler, which can automatically decide parallelisms of job vertices for batch jobs"))
+                                                    "'AdaptiveBatch': Adaptive batch scheduler. More details can be found %s.",
+                                                    link(
+                                                            "{{.Site.BaseURL}}{{.Site.LanguagePrefix}}/docs/deployment/elastic_scaling#adaptive-batch-scheduler",
+                                                            "here")))
                                     .build());
 
     /** Type of scheduler implementation. */
     public enum SchedulerType {
+        /** @deprecated Use {@link SchedulerType#Default} instead. */
+        @Deprecated
         Ng,
+        Default,
         Adaptive,
         AdaptiveBatch
     }
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
index d95ad668a2b..9a3ccf273e1 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
@@ -285,7 +285,7 @@ public class ExecutionConfigTest extends TestLogger {
     public void testLoadingIsDynamicGraphFromConfiguration() {
         testLoadingIsDynamicGraphFromConfiguration(
                 JobManagerOptions.SchedulerType.AdaptiveBatch, true);
-        testLoadingIsDynamicGraphFromConfiguration(JobManagerOptions.SchedulerType.Ng, false);
+        testLoadingIsDynamicGraphFromConfiguration(JobManagerOptions.SchedulerType.Default, false);
         testLoadingIsDynamicGraphFromConfiguration(JobManagerOptions.SchedulerType.Adaptive, false);
     }
 
diff --git a/flink-end-to-end-tests/test-scripts/test_tpcds.sh b/flink-end-to-end-tests/test-scripts/test_tpcds.sh
index f7ef5ce9e7a..859135f0921 100755
--- a/flink-end-to-end-tests/test-scripts/test_tpcds.sh
+++ b/flink-end-to-end-tests/test-scripts/test_tpcds.sh
@@ -54,13 +54,13 @@ cd "$END_TO_END_DIR"
 
 echo "[INFO]Preparing Flink cluster..."
 
-SCHEDULER="${1:-Ng}"
+SCHEDULER="${1:-Default}"
 
 set_config_key "jobmanager.scheduler" "${SCHEDULER}"
 set_config_key "taskmanager.memory.process.size" "4096m"
 set_config_key "taskmanager.memory.network.fraction" "0.2"
 
-if [ "${SCHEDULER}" == "Ng" ]; then
+if [ "${SCHEDULER}" == "Default" ]; then
     set_config_key "taskmanager.numberOfTaskSlots" "4"
     set_config_key "parallelism.default" "4"
 elif [ "${SCHEDULER}" == "AdaptiveBatch" ]; then
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
index f218927ebb5..4788078c10a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
@@ -162,13 +162,21 @@ public final class DefaultSlotPoolServiceSchedulerFactory
                 ClusterOptions.getSchedulerType(configuration);
         if (schedulerType == JobManagerOptions.SchedulerType.Adaptive && jobType == JobType.BATCH) {
             LOG.info(
-                    "Adaptive Scheduler configured, but Batch job detected. Changing scheduler type to NG / DefaultScheduler.");
+                    "Adaptive Scheduler configured, but Batch job detected. Changing scheduler type to 'Default'.");
             // overwrite
-            schedulerType = JobManagerOptions.SchedulerType.Ng;
+            schedulerType = JobManagerOptions.SchedulerType.Default;
+        } else if (schedulerType == JobManagerOptions.SchedulerType.Ng) {
+            LOG.warn(
+                    "Config value '{}' for option '{}' is deprecated, use '{}' instead.",
+                    JobManagerOptions.SchedulerType.Ng,
+                    JobManagerOptions.SCHEDULER.key(),
+                    JobManagerOptions.SchedulerType.Default);
+            // overwrite
+            schedulerType = JobManagerOptions.SchedulerType.Default;
         }
 
         switch (schedulerType) {
-            case Ng:
+            case Default:
                 schedulerNGFactory = new DefaultSchedulerFactory();
                 slotPoolServiceFactory =
                         new DeclarativeSlotPoolBridgeServiceFactory(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
index 72893347982..315bb2911ff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
@@ -155,6 +155,6 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
 
     @Override
     public JobManagerOptions.SchedulerType getSchedulerType() {
-        return JobManagerOptions.SchedulerType.Ng;
+        return JobManagerOptions.SchedulerType.Default;
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactoryTest.java
index f1d102ae3e5..9e64477f8d1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactoryTest.java
@@ -56,7 +56,7 @@ public class DefaultSlotPoolServiceSchedulerFactoryTest {
         assertThat(defaultSlotPoolServiceSchedulerFactory.getSchedulerNGFactory())
                 .isInstanceOf(DefaultSchedulerFactory.class);
         assertThat(defaultSlotPoolServiceSchedulerFactory.getSchedulerType())
-                .isEqualTo(JobManagerOptions.SchedulerType.Ng);
+                .isEqualTo(JobManagerOptions.SchedulerType.Default);
     }
 
     @Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java
index ee542eb0822..14bc3760792 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java
@@ -132,7 +132,7 @@ public class JobMasterSchedulerTest extends TestLogger {
 
         @Override
         public JobManagerOptions.SchedulerType getSchedulerType() {
-            return JobManagerOptions.SchedulerType.Ng;
+            return JobManagerOptions.SchedulerType.Default;
         }
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java
index 6233a42a805..53b31232a97 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java
@@ -78,6 +78,6 @@ public class TestingSchedulerNGFactory implements SchedulerNGFactory {
 
     @Override
     public JobManagerOptions.SchedulerType getSchedulerType() {
-        return JobManagerOptions.SchedulerType.Ng;
+        return JobManagerOptions.SchedulerType.Default;
     }
 }