You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/03/03 06:17:59 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-26399] Make some option of operator configurable

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 43e1716  [FLINK-26399] Make some option of operator configurable
43e1716 is described below

commit 43e171627e5cf47a866958c1e88b5c182b686802
Author: 愚鲤 <yu...@alibaba-inc.com>
AuthorDate: Wed Mar 2 23:26:21 2022 +0800

    [FLINK-26399] Make some option of operator configurable
    
    Closes #31
---
 .../flink/kubernetes/operator/FlinkOperator.java   | 10 ++++-
 .../config/FlinkOperatorConfiguration.java         | 52 ++++++++++++++++++++++
 .../operator/config/OperatorConfigOptions.java     | 41 +++++++++++++++++
 .../controller/FlinkDeploymentController.java      |  8 +++-
 .../metrics/KubernetesOperatorMetricOptions.java   |  1 +
 .../observer/JobManagerDeploymentStatus.java       | 15 ++++---
 .../operator/reconciler/BaseReconciler.java        | 15 +++----
 .../operator/reconciler/JobReconciler.java         | 14 ++++--
 .../operator/reconciler/SessionReconciler.java     | 15 ++++---
 .../validation/DefaultDeploymentValidator.java     | 12 ++---
 .../controller/FlinkDeploymentControllerTest.java  | 22 ++++++---
 .../operator/reconciler/JobReconcilerTest.java     |  9 +++-
 12 files changed, 173 insertions(+), 41 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index fd9d60c..284860d 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -18,6 +18,7 @@
 package org.apache.flink.kubernetes.operator;
 
 import org.apache.flink.kubernetes.operator.config.DefaultConfig;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.controller.FlinkControllerConfig;
 import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
 import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
@@ -55,16 +56,21 @@ public class FlinkOperator {
         Operator operator = new Operator(client, configurationService);
 
         FlinkService flinkService = new FlinkService(client);
+        FlinkOperatorConfiguration operatorConfiguration =
+                FlinkOperatorConfiguration.fromConfiguration(defaultConfig.getOperatorConfig());
 
         Observer observer = new Observer(flinkService);
-        JobReconciler jobReconciler = new JobReconciler(client, flinkService);
-        SessionReconciler sessionReconciler = new SessionReconciler(client, flinkService);
+        JobReconciler jobReconciler =
+                new JobReconciler(client, flinkService, operatorConfiguration);
+        SessionReconciler sessionReconciler =
+                new SessionReconciler(client, flinkService, operatorConfiguration);
 
         FlinkDeploymentValidator validator = new DefaultDeploymentValidator();
 
         FlinkDeploymentController controller =
                 new FlinkDeploymentController(
                         defaultConfig,
+                        operatorConfiguration,
                         client,
                         namespace,
                         validator,
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
new file mode 100644
index 0000000..00e741d
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.config;
+
+import org.apache.flink.configuration.Configuration;
+
+/** Configuration class for operator. */
+public class FlinkOperatorConfiguration {
+
+    private final int reconcileIntervalInSec;
+
+    private final int portCheckIntervalInSec;
+
+    public FlinkOperatorConfiguration(int reconcileIntervalInSec, int portCheckIntervalInSec) {
+        this.reconcileIntervalInSec = reconcileIntervalInSec;
+        this.portCheckIntervalInSec = portCheckIntervalInSec;
+    }
+
+    public int getReconcileIntervalInSec() {
+        return reconcileIntervalInSec;
+    }
+
+    public int getPortCheckIntervalInSec() {
+        return portCheckIntervalInSec;
+    }
+
+    public static FlinkOperatorConfiguration fromConfiguration(Configuration operatorConfig) {
+        int reconcileIntervalInSec =
+                operatorConfig.getInteger(
+                        OperatorConfigOptions.OPERATOR_RECONCILER_RESCHEDULE_INTERVAL_IN_SEC);
+        int portCheckIntervalInSec =
+                operatorConfig.getInteger(
+                        OperatorConfigOptions.OPERATOR_OBSERVER_PORT_CHECK_INTERVAL_IN_SEC);
+        return new FlinkOperatorConfiguration(reconcileIntervalInSec, portCheckIntervalInSec);
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java
new file mode 100644
index 0000000..0a80c07
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.config;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/** This class holds configuration constants used by flink operator. */
+public class OperatorConfigOptions {
+
+    public static final ConfigOption<Integer> OPERATOR_RECONCILER_RESCHEDULE_INTERVAL_IN_SEC =
+            ConfigOptions.key("operator.reconciler.reschedule.interval.sec")
+                    .intType()
+                    .defaultValue(60)
+                    .withDescription(
+                            "The interval in second for the controller to reschedule the reconcile process");
+
+    public static final ConfigOption<Integer> OPERATOR_OBSERVER_PORT_CHECK_INTERVAL_IN_SEC =
+            ConfigOptions.key("operator.observer.port-check.interval.sec")
+                    .intType()
+                    .defaultValue(10)
+                    .withDescription(
+                            "The interval in second for the controller to reschedule the reconcile process to "
+                                    + "wait for deployment to be ready");
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index b9d5cd4..bc27ac5 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.controller;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.config.DefaultConfig;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
 import org.apache.flink.kubernetes.operator.exception.InvalidDeploymentException;
@@ -69,9 +70,11 @@ public class FlinkDeploymentController
     private final JobReconciler jobReconciler;
     private final SessionReconciler sessionReconciler;
     private final DefaultConfig defaultConfig;
+    private final FlinkOperatorConfiguration operatorConfiguration;
 
     public FlinkDeploymentController(
             DefaultConfig defaultConfig,
+            FlinkOperatorConfiguration operatorConfiguration,
             KubernetesClient kubernetesClient,
             String operatorNamespace,
             FlinkDeploymentValidator validator,
@@ -79,6 +82,7 @@ public class FlinkDeploymentController
             JobReconciler jobReconciler,
             SessionReconciler sessionReconciler) {
         this.defaultConfig = defaultConfig;
+        this.operatorConfiguration = operatorConfiguration;
         this.kubernetesClient = kubernetesClient;
         this.operatorNamespace = operatorNamespace;
         this.validator = validator;
@@ -114,7 +118,9 @@ public class FlinkDeploymentController
 
         boolean readyToReconcile = observer.observe(flinkApp, context, effectiveConfig);
         if (!readyToReconcile) {
-            return flinkApp.getStatus().getJobManagerDeploymentStatus().toUpdateControl(flinkApp);
+            return flinkApp.getStatus()
+                    .getJobManagerDeploymentStatus()
+                    .toUpdateControl(flinkApp, operatorConfiguration);
         }
 
         try {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
index 5d092f8..3c07674 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.ConfigOptions;
 public class KubernetesOperatorMetricOptions {
     public static final ConfigOption<String> SCOPE_NAMING_KUBERNETES_OPERATOR =
             ConfigOptions.key("metrics.scope.kubernetes-operator")
+                    .stringType()
                     .defaultValue("<host>.kubernetes-operator.<namespace>.<name>")
                     .withDescription(
                             "Defines the scope format string that is applied to all metrics scoped to the kubernetes operator.");
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
index 08262ff..960c1cd 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
@@ -17,15 +17,13 @@
 
 package org.apache.flink.kubernetes.operator.observer;
 
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 
 import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
 
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.PORT_READY_DELAY_SECONDS;
-import static org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.REFRESH_SECONDS;
-
 /** Status of the Flink JobManager Kubernetes deployment. */
 public enum JobManagerDeploymentStatus {
 
@@ -41,15 +39,20 @@ public enum JobManagerDeploymentStatus {
     /** JobManager deployment not found, probably not started or killed by user. */
     MISSING;
 
-    public UpdateControl<FlinkDeployment> toUpdateControl(FlinkDeployment flinkDeployment) {
+    public UpdateControl<FlinkDeployment> toUpdateControl(
+            FlinkDeployment flinkDeployment, FlinkOperatorConfiguration operatorConfiguration) {
         switch (this) {
             case DEPLOYING:
             case READY:
                 return UpdateControl.updateStatus(flinkDeployment)
-                        .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
+                        .rescheduleAfter(
+                                operatorConfiguration.getReconcileIntervalInSec(),
+                                TimeUnit.SECONDS);
             case DEPLOYED_NOT_READY:
                 return UpdateControl.updateStatus(flinkDeployment)
-                        .rescheduleAfter(PORT_READY_DELAY_SECONDS, TimeUnit.SECONDS);
+                        .rescheduleAfter(
+                                operatorConfiguration.getPortCheckIntervalInSec(),
+                                TimeUnit.SECONDS);
             case MISSING:
             default:
                 return null;
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
index b94956b..b4a98d8 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
@@ -18,6 +18,7 @@
 package org.apache.flink.kubernetes.operator.reconciler;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
@@ -28,23 +29,21 @@ import io.fabric8.kubernetes.client.KubernetesClient;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
 import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
 import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /** BaseReconciler with functionality that is common to job and session modes. */
 public abstract class BaseReconciler {
 
-    private static final Logger LOG = LoggerFactory.getLogger(BaseReconciler.class);
-
-    public static final int REFRESH_SECONDS = 60;
-    public static final int PORT_READY_DELAY_SECONDS = 10;
-
+    protected final FlinkOperatorConfiguration operatorConfiguration;
     protected final KubernetesClient kubernetesClient;
     protected final FlinkService flinkService;
 
-    public BaseReconciler(KubernetesClient kubernetesClient, FlinkService flinkService) {
+    public BaseReconciler(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkOperatorConfiguration operatorConfiguration) {
         this.kubernetesClient = kubernetesClient;
         this.flinkService = flinkService;
+        this.operatorConfiguration = operatorConfiguration;
     }
 
     public abstract UpdateControl<FlinkDeployment> reconcile(
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
index 3abb860..3d0cb39 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.reconciler;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
@@ -48,8 +49,11 @@ public class JobReconciler extends BaseReconciler {
 
     private static final Logger LOG = LoggerFactory.getLogger(JobReconciler.class);
 
-    public JobReconciler(KubernetesClient kubernetesClient, FlinkService flinkService) {
-        super(kubernetesClient, flinkService);
+    public JobReconciler(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkOperatorConfiguration operatorConfiguration) {
+        super(kubernetesClient, flinkService, operatorConfiguration);
     }
 
     @Override
@@ -69,7 +73,8 @@ public class JobReconciler extends BaseReconciler {
                     Optional.ofNullable(jobSpec.getInitialSavepointPath()));
             IngressUtils.updateIngressRules(
                     flinkApp, effectiveConfig, operatorNamespace, kubernetesClient, false);
-            return JobManagerDeploymentStatus.DEPLOYING.toUpdateControl(flinkApp);
+            return JobManagerDeploymentStatus.DEPLOYING.toUpdateControl(
+                    flinkApp, operatorConfiguration);
         }
 
         // TODO: following assumes that current job is running
@@ -105,7 +110,8 @@ public class JobReconciler extends BaseReconciler {
         }
 
         return UpdateControl.updateStatus(flinkApp)
-                .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
+                .rescheduleAfter(
+                        operatorConfiguration.getReconcileIntervalInSec(), TimeUnit.SECONDS);
     }
 
     private void deployFlinkJob(
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
index d63fdeb..7596b4d 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
@@ -18,6 +18,7 @@
 package org.apache.flink.kubernetes.operator.reconciler;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
@@ -26,8 +27,6 @@ import org.apache.flink.kubernetes.operator.utils.IngressUtils;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
 import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.TimeUnit;
 
@@ -37,10 +36,11 @@ import java.util.concurrent.TimeUnit;
  */
 public class SessionReconciler extends BaseReconciler {
 
-    private static final Logger LOG = LoggerFactory.getLogger(SessionReconciler.class);
-
-    public SessionReconciler(KubernetesClient kubernetesClient, FlinkService flinkService) {
-        super(kubernetesClient, flinkService);
+    public SessionReconciler(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkOperatorConfiguration operatorConfiguration) {
+        super(kubernetesClient, flinkService, operatorConfiguration);
     }
 
     @Override
@@ -66,7 +66,8 @@ public class SessionReconciler extends BaseReconciler {
         }
 
         return UpdateControl.updateStatus(flinkApp)
-                .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
+                .rescheduleAfter(
+                        operatorConfiguration.getReconcileIntervalInSec(), TimeUnit.SECONDS);
     }
 
     private void upgradeSessionCluster(FlinkDeployment flinkApp, Configuration effectiveConfig)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
index 9f8046b..aaef01f 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.validation;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobManagerSpec;
@@ -38,7 +39,9 @@ import java.util.Set;
 public class DefaultDeploymentValidator implements FlinkDeploymentValidator {
 
     private static final String[] FORBIDDEN_CONF_KEYS =
-            new String[] {"kubernetes.namespace", "kubernetes.cluster-id"};
+            new String[] {
+                KubernetesConfigOptions.NAMESPACE.key(), KubernetesConfigOptions.CLUSTER_ID.key()
+            };
 
     private static final Set<String> ALLOWED_LOG_CONF_KEYS =
             Set.of(Constants.CONFIG_FILE_LOG4J_NAME, Constants.CONFIG_FILE_LOGBACK_NAME);
@@ -121,13 +124,12 @@ public class DefaultDeploymentValidator implements FlinkDeploymentValidator {
 
         return firstPresent(
                 validateResources("JobManager", jmSpec.getResource()),
-                validateJmReplicas("JobManager", jmSpec.getReplicas(), confMap));
+                validateJmReplicas(jmSpec.getReplicas(), confMap));
     }
 
-    private Optional<String> validateJmReplicas(
-            String component, int replicas, Map<String, String> confMap) {
+    private Optional<String> validateJmReplicas(int replicas, Map<String, String> confMap) {
         if (replicas < 1) {
-            return Optional.of(component + " replicas should not be configured less than one.");
+            return Optional.of("JobManager replicas should not be configured less than one.");
         } else if (replicas > 1
                 && !HighAvailabilityMode.isHighAvailabilityModeActivated(
                         Configuration.fromMap(confMap))) {
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index f4468c6..e464123 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -18,8 +18,10 @@
 package org.apache.flink.kubernetes.operator.controller;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
@@ -51,6 +53,8 @@ import static org.junit.Assert.assertTrue;
 public class FlinkDeploymentControllerTest {
 
     private final Context context = JobReconcilerTest.createContextWithReadyJobManagerDeployment();
+    private final FlinkOperatorConfiguration operatorConfiguration =
+            FlinkOperatorConfiguration.fromConfiguration(new Configuration());
 
     @Test
     public void verifyBasicReconcileLoop() {
@@ -63,7 +67,9 @@ public class FlinkDeploymentControllerTest {
         updateControl = testController.reconcile(appCluster, TestUtils.createEmptyContext());
         assertTrue(updateControl.isUpdateStatus());
         assertEquals(
-                JobManagerDeploymentStatus.DEPLOYING.toUpdateControl(appCluster).getScheduleDelay(),
+                JobManagerDeploymentStatus.DEPLOYING
+                        .toUpdateControl(appCluster, operatorConfiguration)
+                        .getScheduleDelay(),
                 updateControl.getScheduleDelay());
 
         // Validate reconciliation status
@@ -77,14 +83,16 @@ public class FlinkDeploymentControllerTest {
         assertTrue(updateControl.isUpdateStatus());
         assertEquals(
                 JobManagerDeploymentStatus.DEPLOYED_NOT_READY
-                        .toUpdateControl(appCluster)
+                        .toUpdateControl(appCluster, operatorConfiguration)
                         .getScheduleDelay(),
                 updateControl.getScheduleDelay());
 
         updateControl = testController.reconcile(appCluster, context);
         assertTrue(updateControl.isUpdateStatus());
         assertEquals(
-                JobManagerDeploymentStatus.READY.toUpdateControl(appCluster).getScheduleDelay(),
+                JobManagerDeploymentStatus.READY
+                        .toUpdateControl(appCluster, operatorConfiguration)
+                        .getScheduleDelay(),
                 updateControl.getScheduleDelay());
 
         // Validate job status
@@ -188,7 +196,7 @@ public class FlinkDeploymentControllerTest {
                 testController.reconcile(appCluster, context);
         assertEquals(
                 JobManagerDeploymentStatus.DEPLOYED_NOT_READY
-                        .toUpdateControl(appCluster)
+                        .toUpdateControl(appCluster, operatorConfiguration)
                         .getScheduleDelay(),
                 updateControl.getScheduleDelay());
         testController.reconcile(appCluster, context);
@@ -212,11 +220,13 @@ public class FlinkDeploymentControllerTest {
 
     private FlinkDeploymentController createTestController(TestingFlinkService flinkService) {
         Observer observer = new Observer(flinkService);
-        JobReconciler jobReconciler = new JobReconciler(null, flinkService);
-        SessionReconciler sessionReconciler = new SessionReconciler(null, flinkService);
+        JobReconciler jobReconciler = new JobReconciler(null, flinkService, operatorConfiguration);
+        SessionReconciler sessionReconciler =
+                new SessionReconciler(null, flinkService, operatorConfiguration);
 
         return new FlinkDeploymentController(
                 FlinkUtils.loadDefaultConfig(),
+                operatorConfiguration,
                 null,
                 "test",
                 new DefaultDeploymentValidator(),
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
index 2ee8793..fd940d4 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
@@ -47,6 +48,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 /** @link JobStatusObserver unit tests */
 public class JobReconcilerTest {
 
+    private final FlinkOperatorConfiguration operatorConfiguration =
+            FlinkOperatorConfiguration.fromConfiguration(new Configuration());
+
     public static Context createContextWithReadyJobManagerDeployment() {
         return new Context() {
             @Override
@@ -75,7 +79,7 @@ public class JobReconcilerTest {
         Context context = JobReconcilerTest.createContextWithReadyJobManagerDeployment();
         TestingFlinkService flinkService = new TestingFlinkService();
 
-        JobReconciler reconciler = new JobReconciler(null, flinkService);
+        JobReconciler reconciler = new JobReconciler(null, flinkService, operatorConfiguration);
         FlinkDeployment deployment = TestUtils.buildApplicationCluster();
         Configuration config = FlinkUtils.getEffectiveConfig(deployment, new Configuration());
 
@@ -117,7 +121,8 @@ public class JobReconcilerTest {
         final TestingFlinkService flinkService = new TestingFlinkService();
         Observer observer = new Observer(flinkService);
 
-        final JobReconciler reconciler = new JobReconciler(null, flinkService);
+        final JobReconciler reconciler =
+                new JobReconciler(null, flinkService, operatorConfiguration);
         final FlinkDeployment deployment = TestUtils.buildApplicationCluster();
         final Configuration config = FlinkUtils.getEffectiveConfig(deployment, new Configuration());