You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2022/08/09 14:24:02 UTC
[flink] branch master updated: [FLINK-28873][configuration] Make jobmanager.scheduler visible in documentation
This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 15f87d4a470 [FLINK-28873][configuration] Make jobmanager.scheduler visible in documentation
15f87d4a470 is described below
commit 15f87d4a470e9bf29fd18874c26c4506ea57c09f
Author: Lijie Wang <wa...@gmail.com>
AuthorDate: Mon Aug 8 21:41:54 2022 +0800
[FLINK-28873][configuration] Make jobmanager.scheduler visible in documentation
---
.../generated/all_jobmanager_section.html | 6 ++++++
.../generated/expert_scheduling_section.html | 6 ++++++
.../generated/job_manager_configuration.html | 6 ++++++
.../flink/configuration/JobManagerOptions.java | 22 +++++++++++++++++-----
.../flink/api/common/ExecutionConfigTest.java | 2 +-
flink-end-to-end-tests/test-scripts/test_tpcds.sh | 4 ++--
.../DefaultSlotPoolServiceSchedulerFactory.java | 14 +++++++++++---
.../runtime/scheduler/DefaultSchedulerFactory.java | 2 +-
...DefaultSlotPoolServiceSchedulerFactoryTest.java | 2 +-
.../runtime/jobmaster/JobMasterSchedulerTest.java | 2 +-
.../scheduler/TestingSchedulerNGFactory.java | 2 +-
11 files changed, 53 insertions(+), 15 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/all_jobmanager_section.html b/docs/layouts/shortcodes/generated/all_jobmanager_section.html
index ae97499a5bb..83a7365a12a 100644
--- a/docs/layouts/shortcodes/generated/all_jobmanager_section.html
+++ b/docs/layouts/shortcodes/generated/all_jobmanager_section.html
@@ -122,6 +122,12 @@
<td>Integer</td>
<td>The config parameter defining the network port to connect to for communication with the job manager. Like jobmanager.rpc.address, this value is only interpreted in setups where a single JobManager with static name/address and port exists (simple standalone setups, or container setups with dynamic service name resolution). This config option is not used in many high-availability setups, when a leader-election service (like ZooKeeper) is used to elect and discover the JobMa [...]
</tr>
+ <tr>
+ <td><h5>jobmanager.scheduler</h5></td>
+ <td style="word-wrap: break-word;">Default</td>
+ <td><p>Enum</p></td>
+ <td>Determines which scheduler implementation is used to schedule tasks. Accepted values are:<ul><li>'Default': Default scheduler</li><li>'Adaptive': Adaptive scheduler. More details can be found <a href="{{.Site.BaseURL}}{{.Site.LanguagePrefix}}/docs/deployment/elastic_scaling#adaptive-scheduler">here</a>.</li><li>'AdaptiveBatch': Adaptive batch scheduler. More details can be found <a href="{{.Site.BaseURL}}{{.Site.LanguagePrefix}}/docs/deployment/elastic_scaling#adaptive-ba [...]
+ </tr>
<tr>
<td><h5>jobstore.cache-size</h5></td>
<td style="word-wrap: break-word;">52428800</td>
diff --git a/docs/layouts/shortcodes/generated/expert_scheduling_section.html b/docs/layouts/shortcodes/generated/expert_scheduling_section.html
index 29f1f4ce141..fa4e7ea8c39 100644
--- a/docs/layouts/shortcodes/generated/expert_scheduling_section.html
+++ b/docs/layouts/shortcodes/generated/expert_scheduling_section.html
@@ -86,6 +86,12 @@
<td>Duration</td>
<td>The maximum time the JobManager will wait to acquire all required resources after a job submission or restart. Once elapsed it will try to run the job with a lower parallelism, or fail if the minimum amount of resources could not be acquired.<br />Increasing this value will make the cluster more resilient against temporary resources shortages (e.g., there is more time for a failed TaskManager to be restarted).<br />Setting a negative duration will disable the resource tim [...]
</tr>
+ <tr>
+ <td><h5>jobmanager.scheduler</h5></td>
+ <td style="word-wrap: break-word;">Default</td>
+ <td><p>Enum</p></td>
+ <td>Determines which scheduler implementation is used to schedule tasks. Accepted values are:<ul><li>'Default': Default scheduler</li><li>'Adaptive': Adaptive scheduler. More details can be found <a href="{{.Site.BaseURL}}{{.Site.LanguagePrefix}}/docs/deployment/elastic_scaling#adaptive-scheduler">here</a>.</li><li>'AdaptiveBatch': Adaptive batch scheduler. More details can be found <a href="{{.Site.BaseURL}}{{.Site.LanguagePrefix}}/docs/deployment/elastic_scaling#adaptive-ba [...]
+ </tr>
<tr>
<td><h5>scheduler-mode</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git a/docs/layouts/shortcodes/generated/job_manager_configuration.html b/docs/layouts/shortcodes/generated/job_manager_configuration.html
index 05f361fa9c2..bb1960007a0 100644
--- a/docs/layouts/shortcodes/generated/job_manager_configuration.html
+++ b/docs/layouts/shortcodes/generated/job_manager_configuration.html
@@ -188,6 +188,12 @@
<td>Integer</td>
<td>The config parameter defining the network port to connect to for communication with the job manager. Like jobmanager.rpc.address, this value is only interpreted in setups where a single JobManager with static name/address and port exists (simple standalone setups, or container setups with dynamic service name resolution). This config option is not used in many high-availability setups, when a leader-election service (like ZooKeeper) is used to elect and discover the JobMa [...]
</tr>
+ <tr>
+ <td><h5>jobmanager.scheduler</h5></td>
+ <td style="word-wrap: break-word;">Default</td>
+ <td><p>Enum</p></td>
+ <td>Determines which scheduler implementation is used to schedule tasks. Accepted values are:<ul><li>'Default': Default scheduler</li><li>'Adaptive': Adaptive scheduler. More details can be found <a href="{{.Site.BaseURL}}{{.Site.LanguagePrefix}}/docs/deployment/elastic_scaling#adaptive-scheduler">here</a>.</li><li>'AdaptiveBatch': Adaptive batch scheduler. More details can be found <a href="{{.Site.BaseURL}}{{.Site.LanguagePrefix}}/docs/deployment/elastic_scaling#adaptive-ba [...]
+ </tr>
<tr>
<td><h5>jobstore.cache-size</h5></td>
<td style="word-wrap: break-word;">52428800</td>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index 777283af036..3a0ad6d6124 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -411,26 +411,38 @@ public class JobManagerOptions {
.withDescription("The timeout in milliseconds for a idle slot in Slot Pool.");
/** Config parameter determining the scheduler implementation. */
- @Documentation.ExcludeFromDocumentation("SchedulerNG is still in development.")
+ @Documentation.Section({
+ Documentation.Sections.EXPERT_SCHEDULING,
+ Documentation.Sections.ALL_JOB_MANAGER
+ })
public static final ConfigOption<SchedulerType> SCHEDULER =
key("jobmanager.scheduler")
.enumType(SchedulerType.class)
- .defaultValue(SchedulerType.Ng)
+ .defaultValue(SchedulerType.Default)
.withDescription(
Description.builder()
.text(
"Determines which scheduler implementation is used to schedule tasks. Accepted values are:")
.list(
- text("'Ng': new generation scheduler"),
+ text("'Default': Default scheduler"),
text(
- "'Adaptive': adaptive scheduler; supports reactive mode"),
+ "'Adaptive': Adaptive scheduler. More details can be found %s.",
+ link(
+ "{{.Site.BaseURL}}{{.Site.LanguagePrefix}}/docs/deployment/elastic_scaling#adaptive-scheduler",
+ "here")),
text(
- "'AdaptiveBatch': adaptive batch scheduler, which can automatically decide parallelisms of job vertices for batch jobs"))
+ "'AdaptiveBatch': Adaptive batch scheduler. More details can be found %s.",
+ link(
+ "{{.Site.BaseURL}}{{.Site.LanguagePrefix}}/docs/deployment/elastic_scaling#adaptive-batch-scheduler",
+ "here")))
.build());
/** Type of scheduler implementation. */
public enum SchedulerType {
+ /** @deprecated Use {@link SchedulerType#Default} instead. */
+ @Deprecated
Ng,
+ Default,
Adaptive,
AdaptiveBatch
}
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
index d95ad668a2b..9a3ccf273e1 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
@@ -285,7 +285,7 @@ public class ExecutionConfigTest extends TestLogger {
public void testLoadingIsDynamicGraphFromConfiguration() {
testLoadingIsDynamicGraphFromConfiguration(
JobManagerOptions.SchedulerType.AdaptiveBatch, true);
- testLoadingIsDynamicGraphFromConfiguration(JobManagerOptions.SchedulerType.Ng, false);
+ testLoadingIsDynamicGraphFromConfiguration(JobManagerOptions.SchedulerType.Default, false);
testLoadingIsDynamicGraphFromConfiguration(JobManagerOptions.SchedulerType.Adaptive, false);
}
diff --git a/flink-end-to-end-tests/test-scripts/test_tpcds.sh b/flink-end-to-end-tests/test-scripts/test_tpcds.sh
index f7ef5ce9e7a..859135f0921 100755
--- a/flink-end-to-end-tests/test-scripts/test_tpcds.sh
+++ b/flink-end-to-end-tests/test-scripts/test_tpcds.sh
@@ -54,13 +54,13 @@ cd "$END_TO_END_DIR"
echo "[INFO]Preparing Flink cluster..."
-SCHEDULER="${1:-Ng}"
+SCHEDULER="${1:-Default}"
set_config_key "jobmanager.scheduler" "${SCHEDULER}"
set_config_key "taskmanager.memory.process.size" "4096m"
set_config_key "taskmanager.memory.network.fraction" "0.2"
-if [ "${SCHEDULER}" == "Ng" ]; then
+if [ "${SCHEDULER}" == "Default" ]; then
set_config_key "taskmanager.numberOfTaskSlots" "4"
set_config_key "parallelism.default" "4"
elif [ "${SCHEDULER}" == "AdaptiveBatch" ]; then
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
index f218927ebb5..4788078c10a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java
@@ -162,13 +162,21 @@ public final class DefaultSlotPoolServiceSchedulerFactory
ClusterOptions.getSchedulerType(configuration);
if (schedulerType == JobManagerOptions.SchedulerType.Adaptive && jobType == JobType.BATCH) {
LOG.info(
- "Adaptive Scheduler configured, but Batch job detected. Changing scheduler type to NG / DefaultScheduler.");
+ "Adaptive Scheduler configured, but Batch job detected. Changing scheduler type to 'Default'.");
// overwrite
- schedulerType = JobManagerOptions.SchedulerType.Ng;
+ schedulerType = JobManagerOptions.SchedulerType.Default;
+ } else if (schedulerType == JobManagerOptions.SchedulerType.Ng) {
+ LOG.warn(
+ "Config value '{}' for option '{}' is deprecated, use '{}' instead.",
+ JobManagerOptions.SchedulerType.Ng,
+ JobManagerOptions.SCHEDULER.key(),
+ JobManagerOptions.SchedulerType.Default);
+ // overwrite
+ schedulerType = JobManagerOptions.SchedulerType.Default;
}
switch (schedulerType) {
- case Ng:
+ case Default:
schedulerNGFactory = new DefaultSchedulerFactory();
slotPoolServiceFactory =
new DeclarativeSlotPoolBridgeServiceFactory(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
index 72893347982..315bb2911ff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
@@ -155,6 +155,6 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
@Override
public JobManagerOptions.SchedulerType getSchedulerType() {
- return JobManagerOptions.SchedulerType.Ng;
+ return JobManagerOptions.SchedulerType.Default;
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactoryTest.java
index f1d102ae3e5..9e64477f8d1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactoryTest.java
@@ -56,7 +56,7 @@ public class DefaultSlotPoolServiceSchedulerFactoryTest {
assertThat(defaultSlotPoolServiceSchedulerFactory.getSchedulerNGFactory())
.isInstanceOf(DefaultSchedulerFactory.class);
assertThat(defaultSlotPoolServiceSchedulerFactory.getSchedulerType())
- .isEqualTo(JobManagerOptions.SchedulerType.Ng);
+ .isEqualTo(JobManagerOptions.SchedulerType.Default);
}
@Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java
index ee542eb0822..14bc3760792 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java
@@ -132,7 +132,7 @@ public class JobMasterSchedulerTest extends TestLogger {
@Override
public JobManagerOptions.SchedulerType getSchedulerType() {
- return JobManagerOptions.SchedulerType.Ng;
+ return JobManagerOptions.SchedulerType.Default;
}
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java
index 6233a42a805..53b31232a97 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java
@@ -78,6 +78,6 @@ public class TestingSchedulerNGFactory implements SchedulerNGFactory {
@Override
public JobManagerOptions.SchedulerType getSchedulerType() {
- return JobManagerOptions.SchedulerType.Ng;
+ return JobManagerOptions.SchedulerType.Default;
}
}