You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/03/03 06:17:59 UTC
[flink-kubernetes-operator] branch main updated: [FLINK-26399] Make some option of operator configurable
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 43e1716 [FLINK-26399] Make some option of operator configurable
43e1716 is described below
commit 43e171627e5cf47a866958c1e88b5c182b686802
Author: 愚鲤 <yu...@alibaba-inc.com>
AuthorDate: Wed Mar 2 23:26:21 2022 +0800
[FLINK-26399] Make some option of operator configurable
Closes #31
---
.../flink/kubernetes/operator/FlinkOperator.java | 10 ++++-
.../config/FlinkOperatorConfiguration.java | 52 ++++++++++++++++++++++
.../operator/config/OperatorConfigOptions.java | 41 +++++++++++++++++
.../controller/FlinkDeploymentController.java | 8 +++-
.../metrics/KubernetesOperatorMetricOptions.java | 1 +
.../observer/JobManagerDeploymentStatus.java | 15 ++++---
.../operator/reconciler/BaseReconciler.java | 15 +++----
.../operator/reconciler/JobReconciler.java | 14 ++++--
.../operator/reconciler/SessionReconciler.java | 15 ++++---
.../validation/DefaultDeploymentValidator.java | 12 ++---
.../controller/FlinkDeploymentControllerTest.java | 22 ++++++---
.../operator/reconciler/JobReconcilerTest.java | 9 +++-
12 files changed, 173 insertions(+), 41 deletions(-)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index fd9d60c..284860d 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -18,6 +18,7 @@
package org.apache.flink.kubernetes.operator;
import org.apache.flink.kubernetes.operator.config.DefaultConfig;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.controller.FlinkControllerConfig;
import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
@@ -55,16 +56,21 @@ public class FlinkOperator {
Operator operator = new Operator(client, configurationService);
FlinkService flinkService = new FlinkService(client);
+ FlinkOperatorConfiguration operatorConfiguration =
+ FlinkOperatorConfiguration.fromConfiguration(defaultConfig.getOperatorConfig());
Observer observer = new Observer(flinkService);
- JobReconciler jobReconciler = new JobReconciler(client, flinkService);
- SessionReconciler sessionReconciler = new SessionReconciler(client, flinkService);
+ JobReconciler jobReconciler =
+ new JobReconciler(client, flinkService, operatorConfiguration);
+ SessionReconciler sessionReconciler =
+ new SessionReconciler(client, flinkService, operatorConfiguration);
FlinkDeploymentValidator validator = new DefaultDeploymentValidator();
FlinkDeploymentController controller =
new FlinkDeploymentController(
defaultConfig,
+ operatorConfiguration,
client,
namespace,
validator,
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
new file mode 100644
index 0000000..00e741d
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.config;
+
+import org.apache.flink.configuration.Configuration;
+
+/** Configuration class for operator. */
+public class FlinkOperatorConfiguration {
+
+ private final int reconcileIntervalInSec;
+
+ private final int portCheckIntervalInSec;
+
+ public FlinkOperatorConfiguration(int reconcileIntervalInSec, int portCheckIntervalInSec) {
+ this.reconcileIntervalInSec = reconcileIntervalInSec;
+ this.portCheckIntervalInSec = portCheckIntervalInSec;
+ }
+
+ public int getReconcileIntervalInSec() {
+ return reconcileIntervalInSec;
+ }
+
+ public int getPortCheckIntervalInSec() {
+ return portCheckIntervalInSec;
+ }
+
+ public static FlinkOperatorConfiguration fromConfiguration(Configuration operatorConfig) {
+ int reconcileIntervalInSec =
+ operatorConfig.getInteger(
+ OperatorConfigOptions.OPERATOR_RECONCILER_RESCHEDULE_INTERVAL_IN_SEC);
+ int portCheckIntervalInSec =
+ operatorConfig.getInteger(
+ OperatorConfigOptions.OPERATOR_OBSERVER_PORT_CHECK_INTERVAL_IN_SEC);
+ return new FlinkOperatorConfiguration(reconcileIntervalInSec, portCheckIntervalInSec);
+ }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java
new file mode 100644
index 0000000..0a80c07
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/OperatorConfigOptions.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.config;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/** This class holds configuration constants used by flink operator. */
+public class OperatorConfigOptions {
+
+ public static final ConfigOption<Integer> OPERATOR_RECONCILER_RESCHEDULE_INTERVAL_IN_SEC =
+ ConfigOptions.key("operator.reconciler.reschedule.interval.sec")
+ .intType()
+ .defaultValue(60)
+ .withDescription(
+ "The interval in second for the controller to reschedule the reconcile process");
+
+ public static final ConfigOption<Integer> OPERATOR_OBSERVER_PORT_CHECK_INTERVAL_IN_SEC =
+ ConfigOptions.key("operator.observer.port-check.interval.sec")
+ .intType()
+ .defaultValue(10)
+ .withDescription(
+ "The interval in second for the controller to reschedule the reconcile process to "
+ + "wait for deployment to be ready");
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index b9d5cd4..bc27ac5 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.controller;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.config.DefaultConfig;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
import org.apache.flink.kubernetes.operator.exception.InvalidDeploymentException;
@@ -69,9 +70,11 @@ public class FlinkDeploymentController
private final JobReconciler jobReconciler;
private final SessionReconciler sessionReconciler;
private final DefaultConfig defaultConfig;
+ private final FlinkOperatorConfiguration operatorConfiguration;
public FlinkDeploymentController(
DefaultConfig defaultConfig,
+ FlinkOperatorConfiguration operatorConfiguration,
KubernetesClient kubernetesClient,
String operatorNamespace,
FlinkDeploymentValidator validator,
@@ -79,6 +82,7 @@ public class FlinkDeploymentController
JobReconciler jobReconciler,
SessionReconciler sessionReconciler) {
this.defaultConfig = defaultConfig;
+ this.operatorConfiguration = operatorConfiguration;
this.kubernetesClient = kubernetesClient;
this.operatorNamespace = operatorNamespace;
this.validator = validator;
@@ -114,7 +118,9 @@ public class FlinkDeploymentController
boolean readyToReconcile = observer.observe(flinkApp, context, effectiveConfig);
if (!readyToReconcile) {
- return flinkApp.getStatus().getJobManagerDeploymentStatus().toUpdateControl(flinkApp);
+ return flinkApp.getStatus()
+ .getJobManagerDeploymentStatus()
+ .toUpdateControl(flinkApp, operatorConfiguration);
}
try {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
index 5d092f8..3c07674 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.ConfigOptions;
public class KubernetesOperatorMetricOptions {
public static final ConfigOption<String> SCOPE_NAMING_KUBERNETES_OPERATOR =
ConfigOptions.key("metrics.scope.kubernetes-operator")
+ .stringType()
.defaultValue("<host>.kubernetes-operator.<namespace>.<name>")
.withDescription(
"Defines the scope format string that is applied to all metrics scoped to the kubernetes operator.");
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
index 08262ff..960c1cd 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
@@ -17,15 +17,13 @@
package org.apache.flink.kubernetes.operator.observer;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import java.util.concurrent.TimeUnit;
-import static org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.PORT_READY_DELAY_SECONDS;
-import static org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.REFRESH_SECONDS;
-
/** Status of the Flink JobManager Kubernetes deployment. */
public enum JobManagerDeploymentStatus {
@@ -41,15 +39,20 @@ public enum JobManagerDeploymentStatus {
/** JobManager deployment not found, probably not started or killed by user. */
MISSING;
- public UpdateControl<FlinkDeployment> toUpdateControl(FlinkDeployment flinkDeployment) {
+ public UpdateControl<FlinkDeployment> toUpdateControl(
+ FlinkDeployment flinkDeployment, FlinkOperatorConfiguration operatorConfiguration) {
switch (this) {
case DEPLOYING:
case READY:
return UpdateControl.updateStatus(flinkDeployment)
- .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
+ .rescheduleAfter(
+ operatorConfiguration.getReconcileIntervalInSec(),
+ TimeUnit.SECONDS);
case DEPLOYED_NOT_READY:
return UpdateControl.updateStatus(flinkDeployment)
- .rescheduleAfter(PORT_READY_DELAY_SECONDS, TimeUnit.SECONDS);
+ .rescheduleAfter(
+ operatorConfiguration.getPortCheckIntervalInSec(),
+ TimeUnit.SECONDS);
case MISSING:
default:
return null;
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
index b94956b..b4a98d8 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
@@ -18,6 +18,7 @@
package org.apache.flink.kubernetes.operator.reconciler;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.service.FlinkService;
@@ -28,23 +29,21 @@ import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/** BaseReconciler with functionality that is common to job and session modes. */
public abstract class BaseReconciler {
- private static final Logger LOG = LoggerFactory.getLogger(BaseReconciler.class);
-
- public static final int REFRESH_SECONDS = 60;
- public static final int PORT_READY_DELAY_SECONDS = 10;
-
+ protected final FlinkOperatorConfiguration operatorConfiguration;
protected final KubernetesClient kubernetesClient;
protected final FlinkService flinkService;
- public BaseReconciler(KubernetesClient kubernetesClient, FlinkService flinkService) {
+ public BaseReconciler(
+ KubernetesClient kubernetesClient,
+ FlinkService flinkService,
+ FlinkOperatorConfiguration operatorConfiguration) {
this.kubernetesClient = kubernetesClient;
this.flinkService = flinkService;
+ this.operatorConfiguration = operatorConfiguration;
}
public abstract UpdateControl<FlinkDeployment> reconcile(
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
index 3abb860..3d0cb39 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.reconciler;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
@@ -48,8 +49,11 @@ public class JobReconciler extends BaseReconciler {
private static final Logger LOG = LoggerFactory.getLogger(JobReconciler.class);
- public JobReconciler(KubernetesClient kubernetesClient, FlinkService flinkService) {
- super(kubernetesClient, flinkService);
+ public JobReconciler(
+ KubernetesClient kubernetesClient,
+ FlinkService flinkService,
+ FlinkOperatorConfiguration operatorConfiguration) {
+ super(kubernetesClient, flinkService, operatorConfiguration);
}
@Override
@@ -69,7 +73,8 @@ public class JobReconciler extends BaseReconciler {
Optional.ofNullable(jobSpec.getInitialSavepointPath()));
IngressUtils.updateIngressRules(
flinkApp, effectiveConfig, operatorNamespace, kubernetesClient, false);
- return JobManagerDeploymentStatus.DEPLOYING.toUpdateControl(flinkApp);
+ return JobManagerDeploymentStatus.DEPLOYING.toUpdateControl(
+ flinkApp, operatorConfiguration);
}
// TODO: following assumes that current job is running
@@ -105,7 +110,8 @@ public class JobReconciler extends BaseReconciler {
}
return UpdateControl.updateStatus(flinkApp)
- .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
+ .rescheduleAfter(
+ operatorConfiguration.getReconcileIntervalInSec(), TimeUnit.SECONDS);
}
private void deployFlinkJob(
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
index d63fdeb..7596b4d 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
@@ -18,6 +18,7 @@
package org.apache.flink.kubernetes.operator.reconciler;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.service.FlinkService;
@@ -26,8 +27,6 @@ import org.apache.flink.kubernetes.operator.utils.IngressUtils;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
@@ -37,10 +36,11 @@ import java.util.concurrent.TimeUnit;
*/
public class SessionReconciler extends BaseReconciler {
- private static final Logger LOG = LoggerFactory.getLogger(SessionReconciler.class);
-
- public SessionReconciler(KubernetesClient kubernetesClient, FlinkService flinkService) {
- super(kubernetesClient, flinkService);
+ public SessionReconciler(
+ KubernetesClient kubernetesClient,
+ FlinkService flinkService,
+ FlinkOperatorConfiguration operatorConfiguration) {
+ super(kubernetesClient, flinkService, operatorConfiguration);
}
@Override
@@ -66,7 +66,8 @@ public class SessionReconciler extends BaseReconciler {
}
return UpdateControl.updateStatus(flinkApp)
- .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
+ .rescheduleAfter(
+ operatorConfiguration.getReconcileIntervalInSec(), TimeUnit.SECONDS);
}
private void upgradeSessionCluster(FlinkDeployment flinkApp, Configuration effectiveConfig)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
index 9f8046b..aaef01f 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.validation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobManagerSpec;
@@ -38,7 +39,9 @@ import java.util.Set;
public class DefaultDeploymentValidator implements FlinkDeploymentValidator {
private static final String[] FORBIDDEN_CONF_KEYS =
- new String[] {"kubernetes.namespace", "kubernetes.cluster-id"};
+ new String[] {
+ KubernetesConfigOptions.NAMESPACE.key(), KubernetesConfigOptions.CLUSTER_ID.key()
+ };
private static final Set<String> ALLOWED_LOG_CONF_KEYS =
Set.of(Constants.CONFIG_FILE_LOG4J_NAME, Constants.CONFIG_FILE_LOGBACK_NAME);
@@ -121,13 +124,12 @@ public class DefaultDeploymentValidator implements FlinkDeploymentValidator {
return firstPresent(
validateResources("JobManager", jmSpec.getResource()),
- validateJmReplicas("JobManager", jmSpec.getReplicas(), confMap));
+ validateJmReplicas(jmSpec.getReplicas(), confMap));
}
- private Optional<String> validateJmReplicas(
- String component, int replicas, Map<String, String> confMap) {
+ private Optional<String> validateJmReplicas(int replicas, Map<String, String> confMap) {
if (replicas < 1) {
- return Optional.of(component + " replicas should not be configured less than one.");
+ return Optional.of("JobManager replicas should not be configured less than one.");
} else if (replicas > 1
&& !HighAvailabilityMode.isHighAvailabilityModeActivated(
Configuration.fromMap(confMap))) {
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index f4468c6..e464123 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -18,8 +18,10 @@
package org.apache.flink.kubernetes.operator.controller;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
@@ -51,6 +53,8 @@ import static org.junit.Assert.assertTrue;
public class FlinkDeploymentControllerTest {
private final Context context = JobReconcilerTest.createContextWithReadyJobManagerDeployment();
+ private final FlinkOperatorConfiguration operatorConfiguration =
+ FlinkOperatorConfiguration.fromConfiguration(new Configuration());
@Test
public void verifyBasicReconcileLoop() {
@@ -63,7 +67,9 @@ public class FlinkDeploymentControllerTest {
updateControl = testController.reconcile(appCluster, TestUtils.createEmptyContext());
assertTrue(updateControl.isUpdateStatus());
assertEquals(
- JobManagerDeploymentStatus.DEPLOYING.toUpdateControl(appCluster).getScheduleDelay(),
+ JobManagerDeploymentStatus.DEPLOYING
+ .toUpdateControl(appCluster, operatorConfiguration)
+ .getScheduleDelay(),
updateControl.getScheduleDelay());
// Validate reconciliation status
@@ -77,14 +83,16 @@ public class FlinkDeploymentControllerTest {
assertTrue(updateControl.isUpdateStatus());
assertEquals(
JobManagerDeploymentStatus.DEPLOYED_NOT_READY
- .toUpdateControl(appCluster)
+ .toUpdateControl(appCluster, operatorConfiguration)
.getScheduleDelay(),
updateControl.getScheduleDelay());
updateControl = testController.reconcile(appCluster, context);
assertTrue(updateControl.isUpdateStatus());
assertEquals(
- JobManagerDeploymentStatus.READY.toUpdateControl(appCluster).getScheduleDelay(),
+ JobManagerDeploymentStatus.READY
+ .toUpdateControl(appCluster, operatorConfiguration)
+ .getScheduleDelay(),
updateControl.getScheduleDelay());
// Validate job status
@@ -188,7 +196,7 @@ public class FlinkDeploymentControllerTest {
testController.reconcile(appCluster, context);
assertEquals(
JobManagerDeploymentStatus.DEPLOYED_NOT_READY
- .toUpdateControl(appCluster)
+ .toUpdateControl(appCluster, operatorConfiguration)
.getScheduleDelay(),
updateControl.getScheduleDelay());
testController.reconcile(appCluster, context);
@@ -212,11 +220,13 @@ public class FlinkDeploymentControllerTest {
private FlinkDeploymentController createTestController(TestingFlinkService flinkService) {
Observer observer = new Observer(flinkService);
- JobReconciler jobReconciler = new JobReconciler(null, flinkService);
- SessionReconciler sessionReconciler = new SessionReconciler(null, flinkService);
+ JobReconciler jobReconciler = new JobReconciler(null, flinkService, operatorConfiguration);
+ SessionReconciler sessionReconciler =
+ new SessionReconciler(null, flinkService, operatorConfiguration);
return new FlinkDeploymentController(
FlinkUtils.loadDefaultConfig(),
+ operatorConfiguration,
null,
"test",
new DefaultDeploymentValidator(),
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
index 2ee8793..fd940d4 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
@@ -47,6 +48,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
/** @link JobStatusObserver unit tests */
public class JobReconcilerTest {
+ private final FlinkOperatorConfiguration operatorConfiguration =
+ FlinkOperatorConfiguration.fromConfiguration(new Configuration());
+
public static Context createContextWithReadyJobManagerDeployment() {
return new Context() {
@Override
@@ -75,7 +79,7 @@ public class JobReconcilerTest {
Context context = JobReconcilerTest.createContextWithReadyJobManagerDeployment();
TestingFlinkService flinkService = new TestingFlinkService();
- JobReconciler reconciler = new JobReconciler(null, flinkService);
+ JobReconciler reconciler = new JobReconciler(null, flinkService, operatorConfiguration);
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
Configuration config = FlinkUtils.getEffectiveConfig(deployment, new Configuration());
@@ -117,7 +121,8 @@ public class JobReconcilerTest {
final TestingFlinkService flinkService = new TestingFlinkService();
Observer observer = new Observer(flinkService);
- final JobReconciler reconciler = new JobReconciler(null, flinkService);
+ final JobReconciler reconciler =
+ new JobReconciler(null, flinkService, operatorConfiguration);
final FlinkDeployment deployment = TestUtils.buildApplicationCluster();
final Configuration config = FlinkUtils.getEffectiveConfig(deployment, new Configuration());