You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by th...@apache.org on 2022/02/27 18:12:12 UTC
[flink-kubernetes-operator] 02/02: [FLINK-26261] Refactor to simplify reconciliation logic
This is an automated email from the ASF dual-hosted git repository.
thw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 684b4597764daefe53fb1399298324bec5bc738e
Author: Thomas Weise <th...@apache.org>
AuthorDate: Sat Feb 26 14:55:43 2022 -0800
[FLINK-26261] Refactor to simplify reconciliation logic
---
.../flink/kubernetes/operator/FlinkOperator.java | 3 -
.../controller/FlinkDeploymentController.java | 95 ++++---------------
.../operator/observer/JobStatusObserver.java | 4 -
.../operator/reconciler/BaseReconciler.java | 103 +++++++++++++++++++++
.../operator/reconciler/JobReconciler.java | 36 ++++++-
.../operator/reconciler/SessionReconciler.java | 25 ++++-
.../kubernetes/operator/service/FlinkService.java | 2 +-
.../kubernetes/operator/TestingFlinkService.java | 2 +-
.../controller/FlinkDeploymentControllerTest.java | 38 ++------
.../operator/reconciler/JobReconcilerTest.java | 45 +++++++--
10 files changed, 218 insertions(+), 135 deletions(-)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 7e8edf5..8fa72b9 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -21,7 +21,6 @@ import org.apache.flink.kubernetes.operator.config.DefaultConfig;
import org.apache.flink.kubernetes.operator.controller.FlinkControllerConfig;
import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
-import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
import org.apache.flink.kubernetes.operator.service.FlinkService;
@@ -56,7 +55,6 @@ public class FlinkOperator {
FlinkService flinkService = new FlinkService(client);
- JobStatusObserver observer = new JobStatusObserver(flinkService);
JobReconciler jobReconciler = new JobReconciler(client, flinkService);
SessionReconciler sessionReconciler = new SessionReconciler(client, flinkService);
@@ -68,7 +66,6 @@ public class FlinkOperator {
client,
namespace,
validator,
- observer,
jobReconciler,
sessionReconciler);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index a6c6072..2e79926 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -23,16 +23,15 @@ import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
import org.apache.flink.kubernetes.operator.exception.InvalidDeploymentException;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
-import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
+import org.apache.flink.kubernetes.operator.reconciler.BaseReconciler;
import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
import org.apache.flink.kubernetes.operator.validation.FlinkDeploymentValidator;
+import org.apache.flink.kubernetes.utils.Constants;
import io.fabric8.kubernetes.api.model.apps.Deployment;
-import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
-import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.javaoperatorsdk.operator.api.reconciler.Context;
@@ -50,10 +49,8 @@ import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashSet;
import java.util.List;
import java.util.Optional;
-import java.util.concurrent.TimeUnit;
/** Controller that runs the main reconcile loop for Flink deployments. */
@ControllerConfiguration(generationAwareEventProcessing = false)
@@ -63,33 +60,26 @@ public class FlinkDeploymentController
EventSourceInitializer<FlinkDeployment> {
private static final Logger LOG = LoggerFactory.getLogger(FlinkDeploymentController.class);
- public static final int REFRESH_SECONDS = 60;
- public static final int PORT_READY_DELAY_SECONDS = 10;
-
private final KubernetesClient kubernetesClient;
private final String operatorNamespace;
private final FlinkDeploymentValidator validator;
- private final JobStatusObserver observer;
private final JobReconciler jobReconciler;
private final SessionReconciler sessionReconciler;
private final DefaultConfig defaultConfig;
- private final HashSet<String> jobManagerDeployments = new HashSet<>();
public FlinkDeploymentController(
DefaultConfig defaultConfig,
KubernetesClient kubernetesClient,
String operatorNamespace,
FlinkDeploymentValidator validator,
- JobStatusObserver observer,
JobReconciler jobReconciler,
SessionReconciler sessionReconciler) {
this.defaultConfig = defaultConfig;
this.kubernetesClient = kubernetesClient;
this.operatorNamespace = operatorNamespace;
this.validator = validator;
- this.observer = observer;
this.jobReconciler = jobReconciler;
this.sessionReconciler = sessionReconciler;
}
@@ -104,7 +94,7 @@ public class FlinkDeploymentController
operatorNamespace,
kubernetesClient,
true);
- jobManagerDeployments.remove(flinkApp.getMetadata().getSelfLink());
+ getReconciler(flinkApp).removeDeployment(flinkApp);
return DeleteControl.defaultDelete();
}
@@ -122,14 +112,11 @@ public class FlinkDeploymentController
Configuration effectiveConfig =
FlinkUtils.getEffectiveConfig(flinkApp, defaultConfig.getFlinkConfig());
try {
- // only check job status when the JM deployment is ready
- boolean shouldReconcile =
- !jobManagerDeployments.contains(flinkApp.getMetadata().getSelfLink())
- || observer.observeFlinkJobStatus(flinkApp, effectiveConfig);
- if (shouldReconcile) {
- reconcileFlinkDeployment(operatorNamespace, flinkApp, effectiveConfig);
- updateForReconciliationSuccess(flinkApp);
- }
+ UpdateControl<FlinkDeployment> updateControl =
+ getReconciler(flinkApp)
+ .reconcile(operatorNamespace, flinkApp, context, effectiveConfig);
+ updateForReconciliationSuccess(flinkApp);
+ return updateControl;
} catch (InvalidDeploymentException ide) {
LOG.error("Reconciliation failed", ide);
updateForReconciliationError(flinkApp, ide.getMessage());
@@ -137,62 +124,10 @@ public class FlinkDeploymentController
} catch (Exception e) {
throw new ReconciliationException(e);
}
-
- return checkJobManagerDeployment(flinkApp, context, effectiveConfig);
}
- private UpdateControl<FlinkDeployment> checkJobManagerDeployment(
- FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
- if (!jobManagerDeployments.contains(flinkApp.getMetadata().getSelfLink())) {
- Optional<Deployment> deployment = context.getSecondaryResource(Deployment.class);
- if (deployment.isPresent()) {
- DeploymentStatus status = deployment.get().getStatus();
- DeploymentSpec spec = deployment.get().getSpec();
- if (status != null
- && status.getAvailableReplicas() != null
- && spec.getReplicas().intValue() == status.getReplicas()
- && spec.getReplicas().intValue() == status.getAvailableReplicas()) {
- // typically it takes a few seconds for the REST server to be ready
- if (observer.isJobManagerReady(effectiveConfig)) {
- LOG.info(
- "JobManager deployment {} in namespace {} is ready",
- flinkApp.getMetadata().getName(),
- flinkApp.getMetadata().getNamespace());
- jobManagerDeployments.add(flinkApp.getMetadata().getSelfLink());
- if (flinkApp.getStatus().getJobStatus() != null) {
- // short circuit, if the job was already running
- // reschedule for immediate job status check
- return UpdateControl.updateStatus(flinkApp).rescheduleAfter(0);
- }
- }
- LOG.info(
- "JobManager deployment {} in namespace {} port not ready",
- flinkApp.getMetadata().getName(),
- flinkApp.getMetadata().getNamespace());
- return UpdateControl.updateStatus(flinkApp)
- .rescheduleAfter(PORT_READY_DELAY_SECONDS, TimeUnit.SECONDS);
- } else {
- LOG.info(
- "JobManager deployment {} in namespace {} not yet ready, status {}",
- flinkApp.getMetadata().getName(),
- flinkApp.getMetadata().getNamespace(),
- status);
- }
- }
- }
- return UpdateControl.updateStatus(flinkApp)
- .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
- }
-
- private void reconcileFlinkDeployment(
- String operatorNamespace, FlinkDeployment flinkApp, Configuration effectiveConfig)
- throws Exception {
-
- if (flinkApp.getSpec().getJob() == null) {
- sessionReconciler.reconcile(operatorNamespace, flinkApp, effectiveConfig);
- } else {
- jobReconciler.reconcile(operatorNamespace, flinkApp, effectiveConfig);
- }
+ private BaseReconciler getReconciler(FlinkDeployment flinkDeployment) {
+ return flinkDeployment.getSpec().getJob() == null ? sessionReconciler : jobReconciler;
}
private void updateForReconciliationSuccess(FlinkDeployment flinkApp) {
@@ -217,10 +152,14 @@ public class FlinkDeploymentController
.apps()
.deployments()
.inAnyNamespace()
- .withLabel("type", "flink-native-kubernetes")
- .withLabel("component", "jobmanager")
+ .withLabel(Constants.LABEL_TYPE_KEY, Constants.LABEL_TYPE_NATIVE_TYPE)
+ .withLabel(
+ Constants.LABEL_COMPONENT_KEY,
+ Constants.LABEL_COMPONENT_JOB_MANAGER)
.runnableInformer(0);
- return List.of(new InformerEventSource<>(deploymentInformer, Mappers.fromLabel("app")));
+ return List.of(
+ new InformerEventSource<>(
+ deploymentInformer, Mappers.fromLabel(Constants.LABEL_APP_KEY)));
}
@Override
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
index cb56dc3..577d73b 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
@@ -83,10 +83,6 @@ public class JobStatusObserver {
}
}
- public boolean isJobManagerReady(Configuration config) {
- return flinkService.isJobManagerReady(config);
- }
-
/** Merge previous job status with the new one from the flink job cluster. */
private JobStatus mergeJobStatus(
JobStatus oldStatus, List<JobStatusMessage> clusterJobStatuses) {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
new file mode 100644
index 0000000..f1c0c23
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
+import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+/** BaseReconciler with functionality that is common to job and session modes. */
+public abstract class BaseReconciler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BaseReconciler.class);
+
+ public static final int REFRESH_SECONDS = 60;
+ public static final int PORT_READY_DELAY_SECONDS = 10;
+
+ private final HashSet<String> jobManagerDeployments = new HashSet<>();
+
+ public boolean removeDeployment(FlinkDeployment flinkApp) {
+ return jobManagerDeployments.remove(flinkApp.getMetadata().getUid());
+ }
+
+ public abstract UpdateControl<FlinkDeployment> reconcile(
+ String operatorNamespace,
+ FlinkDeployment flinkApp,
+ Context context,
+ Configuration effectiveConfig)
+ throws Exception;
+
+ protected UpdateControl<FlinkDeployment> checkJobManagerDeployment(
+ FlinkDeployment flinkApp,
+ Context context,
+ Configuration effectiveConfig,
+ FlinkService flinkService) {
+ if (!jobManagerDeployments.contains(flinkApp.getMetadata().getUid())) {
+ Optional<Deployment> deployment = context.getSecondaryResource(Deployment.class);
+ if (deployment.isPresent()) {
+ DeploymentStatus status = deployment.get().getStatus();
+ DeploymentSpec spec = deployment.get().getSpec();
+ if (status != null
+ && status.getAvailableReplicas() != null
+ && spec.getReplicas().intValue() == status.getReplicas()
+ && spec.getReplicas().intValue() == status.getAvailableReplicas()) {
+ // typically it takes a few seconds for the REST server to be ready
+ if (flinkService.isJobManagerPortReady(effectiveConfig)) {
+ LOG.info(
+ "JobManager deployment {} in namespace {} is ready",
+ flinkApp.getMetadata().getName(),
+ flinkApp.getMetadata().getNamespace());
+ jobManagerDeployments.add(flinkApp.getMetadata().getUid());
+ if (flinkApp.getStatus().getJobStatus() != null) {
+ // pre-existing deployments on operator restart - proceed with
+ // reconciliation
+ return null;
+ }
+ }
+ LOG.info(
+ "JobManager deployment {} in namespace {} port not ready",
+ flinkApp.getMetadata().getName(),
+ flinkApp.getMetadata().getNamespace());
+ return UpdateControl.updateStatus(flinkApp)
+ .rescheduleAfter(PORT_READY_DELAY_SECONDS, TimeUnit.SECONDS);
+ }
+ LOG.info(
+ "JobManager deployment {} in namespace {} not yet ready, status {}",
+ flinkApp.getMetadata().getName(),
+ flinkApp.getMetadata().getNamespace(),
+ status);
+ // TODO: how frequently do we want here
+ return UpdateControl.updateStatus(flinkApp)
+ .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
+ }
+ }
+ return null;
+ }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
index fb599e8..7a072e0 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
@@ -25,36 +25,45 @@ import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Optional;
+import java.util.concurrent.TimeUnit;
/**
* Reconciler responsible for handling the job lifecycle according to the desired and current
* states.
*/
-public class JobReconciler {
+public class JobReconciler extends BaseReconciler {
private static final Logger LOG = LoggerFactory.getLogger(JobReconciler.class);
private final KubernetesClient kubernetesClient;
private final FlinkService flinkService;
+ private final JobStatusObserver observer;
public JobReconciler(KubernetesClient kubernetesClient, FlinkService flinkService) {
this.kubernetesClient = kubernetesClient;
this.flinkService = flinkService;
+ this.observer = new JobStatusObserver(flinkService);
}
- public void reconcile(
- String operatorNamespace, FlinkDeployment flinkApp, Configuration effectiveConfig)
+ @Override
+ public UpdateControl<FlinkDeployment> reconcile(
+ String operatorNamespace,
+ FlinkDeployment flinkApp,
+ Context context,
+ Configuration effectiveConfig)
throws Exception {
-
FlinkDeploymentSpec lastReconciledSpec =
flinkApp.getStatus().getReconciliationStatus().getLastReconciledSpec();
JobSpec jobSpec = flinkApp.getSpec().getJob();
@@ -65,9 +74,22 @@ public class JobReconciler {
Optional.ofNullable(jobSpec.getInitialSavepointPath()));
IngressUtils.updateIngressRules(
flinkApp, effectiveConfig, operatorNamespace, kubernetesClient, false);
- return;
}
+ // wait until the deployment is ready
+ UpdateControl<FlinkDeployment> uc =
+ checkJobManagerDeployment(flinkApp, context, effectiveConfig, flinkService);
+ if (uc != null) {
+ return uc;
+ }
+
+ if (!observer.observeFlinkJobStatus(flinkApp, effectiveConfig)) {
+ return UpdateControl.updateStatus(flinkApp)
+ .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
+ }
+
+ // TODO: following assumes that current job is running
+ // What if it never enters running state due to bad deployment?
boolean specChanged = !flinkApp.getSpec().equals(lastReconciledSpec);
if (specChanged) {
JobState currentJobState = lastReconciledSpec.getJob().getState();
@@ -97,6 +119,9 @@ public class JobReconciler {
}
}
}
+
+ return UpdateControl.updateStatus(flinkApp)
+ .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
}
private void deployFlinkJob(
@@ -150,6 +175,7 @@ public class JobReconciler {
effectiveConfig);
JobStatus jobStatus = flinkApp.getStatus().getJobStatus();
jobStatus.setState("suspended");
+ removeDeployment(flinkApp);
savepointOpt.ifPresent(jobStatus::setSavepointLocation);
return savepointOpt;
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
index 820e31f..0c9ade3 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
@@ -24,14 +24,18 @@ import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.TimeUnit;
+
/**
* Reconciler responsible for handling the session cluster lifecycle according to the desired and
* current states.
*/
-public class SessionReconciler {
+public class SessionReconciler extends BaseReconciler {
private static final Logger LOG = LoggerFactory.getLogger(SessionReconciler.class);
@@ -43,8 +47,12 @@ public class SessionReconciler {
this.flinkService = flinkService;
}
- public void reconcile(
- String operatorNamespace, FlinkDeployment flinkApp, Configuration effectiveConfig)
+ @Override
+ public UpdateControl<FlinkDeployment> reconcile(
+ String operatorNamespace,
+ FlinkDeployment flinkApp,
+ Context context,
+ Configuration effectiveConfig)
throws Exception {
FlinkDeploymentSpec lastReconciledSpec =
@@ -54,14 +62,21 @@ public class SessionReconciler {
flinkService.submitSessionCluster(flinkApp, effectiveConfig);
IngressUtils.updateIngressRules(
flinkApp, effectiveConfig, operatorNamespace, kubernetesClient, false);
- return;
}
- boolean specChanged = !flinkApp.getSpec().equals(lastReconciledSpec);
+ UpdateControl<FlinkDeployment> uc =
+ checkJobManagerDeployment(flinkApp, context, effectiveConfig, flinkService);
+ if (uc != null) {
+ return uc;
+ }
+ boolean specChanged = !flinkApp.getSpec().equals(lastReconciledSpec);
if (specChanged) {
upgradeSessionCluster(flinkApp, effectiveConfig);
}
+
+ return UpdateControl.updateStatus(flinkApp)
+ .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
}
private void upgradeSessionCluster(FlinkDeployment flinkApp, Configuration effectiveConfig)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index a2e731c..fc98dcf 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -100,7 +100,7 @@ public class FlinkService {
LOG.info("Session cluster {} deployed", deployment.getMetadata().getName());
}
- public boolean isJobManagerReady(Configuration config) {
+ public boolean isJobManagerPortReady(Configuration config) {
final URI uri;
try (ClusterClient<String> clusterClient = getClusterClient(config)) {
uri = URI.create(clusterClient.getWebInterfaceURL());
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index 398ef84..626c0ae 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -102,7 +102,7 @@ public class TestingFlinkService extends FlinkService {
}
@Override
- public boolean isJobManagerReady(Configuration config) {
+ public boolean isJobManagerPortReady(Configuration config) {
return true;
}
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 24db802..ab7c884 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -25,24 +25,20 @@ import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
-import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
+import org.apache.flink.kubernetes.operator.reconciler.BaseReconciler;
import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
+import org.apache.flink.kubernetes.operator.reconciler.JobReconcilerTest;
import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.validation.DefaultDeploymentValidator;
import org.apache.flink.runtime.client.JobStatusMessage;
-import io.fabric8.kubernetes.api.model.apps.Deployment;
-import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
-import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
import io.javaoperatorsdk.operator.api.reconciler.Context;
-import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.List;
-import java.util.Optional;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -53,27 +49,7 @@ import static org.junit.Assert.assertTrue;
/** @link JobStatusObserver unit tests */
public class FlinkDeploymentControllerTest {
- private final Context context =
- new Context() {
- @Override
- public Optional<RetryInfo> getRetryInfo() {
- return Optional.empty();
- }
-
- @Override
- public <T> Optional<T> getSecondaryResource(
- Class<T> expectedType, String eventSourceName) {
- DeploymentStatus status = new DeploymentStatus();
- status.setAvailableReplicas(1);
- status.setReplicas(1);
- DeploymentSpec spec = new DeploymentSpec();
- spec.setReplicas(1);
- Deployment deployment = new Deployment();
- deployment.setSpec(spec);
- deployment.setStatus(status);
- return Optional.of((T) deployment);
- }
- };
+ private final Context context = JobReconcilerTest.createContextWithReadyJobManagerDeployment();
@Test
public void verifyBasicReconcileLoop() {
@@ -86,13 +62,13 @@ public class FlinkDeploymentControllerTest {
updateControl = testController.reconcile(appCluster, context);
assertTrue(updateControl.isUpdateStatus());
assertEquals(
- FlinkDeploymentController.PORT_READY_DELAY_SECONDS * 1000,
+ BaseReconciler.PORT_READY_DELAY_SECONDS * 1000,
(long) updateControl.getScheduleDelay().get());
updateControl = testController.reconcile(appCluster, context);
assertTrue(updateControl.isUpdateStatus());
assertEquals(
- FlinkDeploymentController.REFRESH_SECONDS * 1000,
+ BaseReconciler.REFRESH_SECONDS * 1000,
(long) updateControl.getScheduleDelay().get());
// Validate reconciliation status
@@ -105,7 +81,7 @@ public class FlinkDeploymentControllerTest {
updateControl = testController.reconcile(appCluster, context);
assertTrue(updateControl.isUpdateStatus());
assertEquals(
- FlinkDeploymentController.REFRESH_SECONDS * 1000,
+ BaseReconciler.REFRESH_SECONDS * 1000,
(long) updateControl.getScheduleDelay().get());
// Validate job status
@@ -216,7 +192,6 @@ public class FlinkDeploymentControllerTest {
}
private FlinkDeploymentController createTestController(TestingFlinkService flinkService) {
- JobStatusObserver observer = new JobStatusObserver(flinkService);
JobReconciler jobReconciler = new JobReconciler(null, flinkService);
SessionReconciler sessionReconciler = new SessionReconciler(null, flinkService);
@@ -225,7 +200,6 @@ public class FlinkDeploymentControllerTest {
null,
"test",
new DefaultDeploymentValidator(),
- observer,
jobReconciler,
sessionReconciler);
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
index 8f99806..6a2109b 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
@@ -25,13 +25,20 @@ import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
+import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
import org.junit.jupiter.api.Test;
import java.util.List;
+import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -40,15 +47,39 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
/** @link JobStatusObserver unit tests */
public class JobReconcilerTest {
+ public static Context createContextWithReadyJobManagerDeployment() {
+ return new Context() {
+ @Override
+ public Optional<RetryInfo> getRetryInfo() {
+ return Optional.empty();
+ }
+
+ @Override
+ public <T> Optional<T> getSecondaryResource(
+ Class<T> expectedType, String eventSourceName) {
+ DeploymentStatus status = new DeploymentStatus();
+ status.setAvailableReplicas(1);
+ status.setReplicas(1);
+ DeploymentSpec spec = new DeploymentSpec();
+ spec.setReplicas(1);
+ Deployment deployment = new Deployment();
+ deployment.setSpec(spec);
+ deployment.setStatus(status);
+ return Optional.of((T) deployment);
+ }
+ };
+ }
+
@Test
public void testUpgrade() throws Exception {
+ Context context = JobReconcilerTest.createContextWithReadyJobManagerDeployment();
TestingFlinkService flinkService = new TestingFlinkService();
JobReconciler reconciler = new JobReconciler(null, flinkService);
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
Configuration config = FlinkUtils.getEffectiveConfig(deployment, new Configuration());
- reconciler.reconcile("test", deployment, config);
+ reconciler.reconcile("test", deployment, context, config);
List<Tuple2<String, JobStatusMessage>> runningJobs = flinkService.listJobs();
verifyAndSetRunningJobsToStatus(deployment, runningJobs);
@@ -56,7 +87,7 @@ public class JobReconcilerTest {
FlinkDeployment statelessUpgrade = TestUtils.clone(deployment);
statelessUpgrade.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS);
statelessUpgrade.getSpec().getFlinkConfiguration().put("new", "conf");
- reconciler.reconcile("test", statelessUpgrade, config);
+ reconciler.reconcile("test", statelessUpgrade, context, config);
runningJobs = flinkService.listJobs();
assertEquals(1, runningJobs.size());
@@ -72,7 +103,7 @@ public class JobReconcilerTest {
statefulUpgrade.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
statefulUpgrade.getSpec().getFlinkConfiguration().put("new", "conf2");
- reconciler.reconcile("test", statefulUpgrade, new Configuration(config));
+ reconciler.reconcile("test", statefulUpgrade, context, new Configuration(config));
runningJobs = flinkService.listJobs();
assertEquals(1, runningJobs.size());
@@ -82,13 +113,15 @@ public class JobReconcilerTest {
@Test
public void testUpgradeModeChangeFromSavepointToLastState() throws Exception {
final String expectedSavepointPath = "savepoint_0";
+ final Context context = JobReconcilerTest.createContextWithReadyJobManagerDeployment();
final TestingFlinkService flinkService = new TestingFlinkService();
+ JobStatusObserver observer = new JobStatusObserver(flinkService);
final JobReconciler reconciler = new JobReconciler(null, flinkService);
final FlinkDeployment deployment = TestUtils.buildApplicationCluster();
final Configuration config = FlinkUtils.getEffectiveConfig(deployment, new Configuration());
- reconciler.reconcile("test", deployment, config);
+ reconciler.reconcile("test", deployment, context, config);
List<Tuple2<String, JobStatusMessage>> runningJobs = flinkService.listJobs();
verifyAndSetRunningJobsToStatus(deployment, runningJobs);
@@ -97,7 +130,7 @@ public class JobReconcilerTest {
deployment.getSpec().getJob().setState(JobState.SUSPENDED);
deployment.getSpec().setImage("new-image-1");
- reconciler.reconcile("test", deployment, config);
+ reconciler.reconcile("test", deployment, context, config);
assertEquals(0, flinkService.listJobs().size());
assertTrue(
JobState.SUSPENDED
@@ -118,7 +151,7 @@ public class JobReconcilerTest {
deployment.getSpec().getJob().setState(JobState.RUNNING);
deployment.getSpec().setImage("new-image-2");
- reconciler.reconcile("test", deployment, config);
+ reconciler.reconcile("test", deployment, context, config);
runningJobs = flinkService.listJobs();
assertEquals(expectedSavepointPath, config.get(SavepointConfigOptions.SAVEPOINT_PATH));
assertEquals(1, runningJobs.size());