You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/02/18 09:15:00 UTC

[flink] 12/12: [FLINK-21100][coordination] Add system property for enabling declarative scheduler

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4f8cc4ee921c5b2a73e100b00cd21f4f1e763bc8
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Mon Feb 8 20:41:33 2021 +0100

    [FLINK-21100][coordination] Add system property for enabling declarative scheduler
---
 .../org/apache/flink/configuration/ClusterOptions.java  | 17 +++++++++++++++++
 .../runtime/dispatcher/SchedulerNGFactoryFactory.java   |  3 ++-
 .../jobmaster/slotpool/SlotPoolServiceFactory.java      |  3 +--
 3 files changed, 20 insertions(+), 3 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
index c75428f..bc69c10 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
@@ -139,6 +139,23 @@ public class ClusterOptions {
         }
     }
 
+    public static JobManagerOptions.SchedulerType getSchedulerType(Configuration configuration) {
+        if (isDeclarativeSchedulerEnabled(configuration)) {
+            return JobManagerOptions.SchedulerType.Declarative;
+        } else {
+            return configuration.get(JobManagerOptions.SCHEDULER);
+        }
+    }
+
+    public static boolean isDeclarativeSchedulerEnabled(Configuration configuration) {
+        if (configuration.contains(JobManagerOptions.SCHEDULER)) {
+            return configuration.get(JobManagerOptions.SCHEDULER)
+                    == JobManagerOptions.SchedulerType.Declarative;
+        } else {
+            return System.getProperties().containsKey("flink.tests.enable-declarative-scheduler");
+        }
+    }
+
     public static boolean isFineGrainedResourceManagementEnabled(Configuration configuration) {
         // TODO We need to bind fine-grained with declarative because in the first step we implement
         // the feature base on the declarative protocol. We would be able to support both protocols
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java
index a7e5b02..e9b130d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java
@@ -19,6 +19,7 @@
 
 package org.apache.flink.runtime.dispatcher;
 
+import org.apache.flink.configuration.ClusterOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.jobgraph.JobType;
@@ -39,7 +40,7 @@ public final class SchedulerNGFactoryFactory {
     public static SchedulerNGFactory createSchedulerNGFactory(
             final Configuration configuration, JobType jobType) {
         JobManagerOptions.SchedulerType schedulerType =
-                configuration.get(JobManagerOptions.SCHEDULER);
+                ClusterOptions.getSchedulerType(configuration);
 
         if (schedulerType == JobManagerOptions.SchedulerType.Declarative
                 && jobType == JobType.BATCH) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolServiceFactory.java
index b486747..ea28178 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolServiceFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolServiceFactory.java
@@ -43,8 +43,7 @@ public interface SlotPoolServiceFactory {
                 Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT));
 
         if (ClusterOptions.isDeclarativeResourceManagementEnabled(configuration)) {
-            if (configuration.get(JobManagerOptions.SCHEDULER)
-                            == JobManagerOptions.SchedulerType.Declarative
+            if (ClusterOptions.isDeclarativeSchedulerEnabled(configuration)
                     && jobType == JobType.STREAMING) {
                 return new DeclarativeSlotPoolServiceFactory(
                         SystemClock.getInstance(), slotIdleTimeout, rpcTimeout);