You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/02/18 09:15:00 UTC
[flink] 12/12: [FLINK-21100][coordination] Add system property for
enabling declarative scheduler
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4f8cc4ee921c5b2a73e100b00cd21f4f1e763bc8
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Mon Feb 8 20:41:33 2021 +0100
[FLINK-21100][coordination] Add system property for enabling declarative scheduler
---
.../org/apache/flink/configuration/ClusterOptions.java | 17 +++++++++++++++++
.../runtime/dispatcher/SchedulerNGFactoryFactory.java | 3 ++-
.../jobmaster/slotpool/SlotPoolServiceFactory.java | 3 +--
3 files changed, 20 insertions(+), 3 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
index c75428f..bc69c10 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
@@ -139,6 +139,23 @@ public class ClusterOptions {
}
}
+ public static JobManagerOptions.SchedulerType getSchedulerType(Configuration configuration) {
+ if (isDeclarativeSchedulerEnabled(configuration)) {
+ return JobManagerOptions.SchedulerType.Declarative;
+ } else {
+ return configuration.get(JobManagerOptions.SCHEDULER);
+ }
+ }
+
+ public static boolean isDeclarativeSchedulerEnabled(Configuration configuration) {
+ if (configuration.contains(JobManagerOptions.SCHEDULER)) {
+ return configuration.get(JobManagerOptions.SCHEDULER)
+ == JobManagerOptions.SchedulerType.Declarative;
+ } else {
+ return System.getProperties().containsKey("flink.tests.enable-declarative-scheduler");
+ }
+ }
+
public static boolean isFineGrainedResourceManagementEnabled(Configuration configuration) {
// TODO We need to bind fine-grained with declarative because in the first step we implement
// the feature base on the declarative protocol. We would be able to support both protocols
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java
index a7e5b02..e9b130d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.dispatcher;
+import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.jobgraph.JobType;
@@ -39,7 +40,7 @@ public final class SchedulerNGFactoryFactory {
public static SchedulerNGFactory createSchedulerNGFactory(
final Configuration configuration, JobType jobType) {
JobManagerOptions.SchedulerType schedulerType =
- configuration.get(JobManagerOptions.SCHEDULER);
+ ClusterOptions.getSchedulerType(configuration);
if (schedulerType == JobManagerOptions.SchedulerType.Declarative
&& jobType == JobType.BATCH) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolServiceFactory.java
index b486747..ea28178 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolServiceFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolServiceFactory.java
@@ -43,8 +43,7 @@ public interface SlotPoolServiceFactory {
Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT));
if (ClusterOptions.isDeclarativeResourceManagementEnabled(configuration)) {
- if (configuration.get(JobManagerOptions.SCHEDULER)
- == JobManagerOptions.SchedulerType.Declarative
+ if (ClusterOptions.isDeclarativeSchedulerEnabled(configuration)
&& jobType == JobType.STREAMING) {
return new DeclarativeSlotPoolServiceFactory(
SystemClock.getInstance(), slotIdleTimeout, rpcTimeout);