You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by th...@apache.org on 2022/02/27 18:12:12 UTC

[flink-kubernetes-operator] 02/02: [FLINK-26261] Refactor to simplify reconciliation logic

This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 684b4597764daefe53fb1399298324bec5bc738e
Author: Thomas Weise <th...@apache.org>
AuthorDate: Sat Feb 26 14:55:43 2022 -0800

    [FLINK-26261] Refactor to simplify reconciliation logic
---
 .../flink/kubernetes/operator/FlinkOperator.java   |   3 -
 .../controller/FlinkDeploymentController.java      |  95 ++++---------------
 .../operator/observer/JobStatusObserver.java       |   4 -
 .../operator/reconciler/BaseReconciler.java        | 103 +++++++++++++++++++++
 .../operator/reconciler/JobReconciler.java         |  36 ++++++-
 .../operator/reconciler/SessionReconciler.java     |  25 ++++-
 .../kubernetes/operator/service/FlinkService.java  |   2 +-
 .../kubernetes/operator/TestingFlinkService.java   |   2 +-
 .../controller/FlinkDeploymentControllerTest.java  |  38 ++------
 .../operator/reconciler/JobReconcilerTest.java     |  45 +++++++--
 10 files changed, 218 insertions(+), 135 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 7e8edf5..8fa72b9 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -21,7 +21,6 @@ import org.apache.flink.kubernetes.operator.config.DefaultConfig;
 import org.apache.flink.kubernetes.operator.controller.FlinkControllerConfig;
 import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
 import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
-import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
 import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
 import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
@@ -56,7 +55,6 @@ public class FlinkOperator {
 
         FlinkService flinkService = new FlinkService(client);
 
-        JobStatusObserver observer = new JobStatusObserver(flinkService);
         JobReconciler jobReconciler = new JobReconciler(client, flinkService);
         SessionReconciler sessionReconciler = new SessionReconciler(client, flinkService);
 
@@ -68,7 +66,6 @@ public class FlinkOperator {
                         client,
                         namespace,
                         validator,
-                        observer,
                         jobReconciler,
                         sessionReconciler);
 
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index a6c6072..2e79926 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -23,16 +23,15 @@ import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
 import org.apache.flink.kubernetes.operator.exception.InvalidDeploymentException;
 import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
-import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
+import org.apache.flink.kubernetes.operator.reconciler.BaseReconciler;
 import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
 import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.IngressUtils;
 import org.apache.flink.kubernetes.operator.validation.FlinkDeploymentValidator;
+import org.apache.flink.kubernetes.utils.Constants;
 
 import io.fabric8.kubernetes.api.model.apps.Deployment;
-import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
-import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
@@ -50,10 +49,8 @@ import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
-import java.util.concurrent.TimeUnit;
 
 /** Controller that runs the main reconcile loop for Flink deployments. */
 @ControllerConfiguration(generationAwareEventProcessing = false)
@@ -63,33 +60,26 @@ public class FlinkDeploymentController
                 EventSourceInitializer<FlinkDeployment> {
     private static final Logger LOG = LoggerFactory.getLogger(FlinkDeploymentController.class);
 
-    public static final int REFRESH_SECONDS = 60;
-    public static final int PORT_READY_DELAY_SECONDS = 10;
-
     private final KubernetesClient kubernetesClient;
 
     private final String operatorNamespace;
 
     private final FlinkDeploymentValidator validator;
-    private final JobStatusObserver observer;
     private final JobReconciler jobReconciler;
     private final SessionReconciler sessionReconciler;
     private final DefaultConfig defaultConfig;
-    private final HashSet<String> jobManagerDeployments = new HashSet<>();
 
     public FlinkDeploymentController(
             DefaultConfig defaultConfig,
             KubernetesClient kubernetesClient,
             String operatorNamespace,
             FlinkDeploymentValidator validator,
-            JobStatusObserver observer,
             JobReconciler jobReconciler,
             SessionReconciler sessionReconciler) {
         this.defaultConfig = defaultConfig;
         this.kubernetesClient = kubernetesClient;
         this.operatorNamespace = operatorNamespace;
         this.validator = validator;
-        this.observer = observer;
         this.jobReconciler = jobReconciler;
         this.sessionReconciler = sessionReconciler;
     }
@@ -104,7 +94,7 @@ public class FlinkDeploymentController
                 operatorNamespace,
                 kubernetesClient,
                 true);
-        jobManagerDeployments.remove(flinkApp.getMetadata().getSelfLink());
+        getReconciler(flinkApp).removeDeployment(flinkApp);
         return DeleteControl.defaultDelete();
     }
 
@@ -122,14 +112,11 @@ public class FlinkDeploymentController
         Configuration effectiveConfig =
                 FlinkUtils.getEffectiveConfig(flinkApp, defaultConfig.getFlinkConfig());
         try {
-            // only check job status when the JM deployment is ready
-            boolean shouldReconcile =
-                    !jobManagerDeployments.contains(flinkApp.getMetadata().getSelfLink())
-                            || observer.observeFlinkJobStatus(flinkApp, effectiveConfig);
-            if (shouldReconcile) {
-                reconcileFlinkDeployment(operatorNamespace, flinkApp, effectiveConfig);
-                updateForReconciliationSuccess(flinkApp);
-            }
+            UpdateControl<FlinkDeployment> updateControl =
+                    getReconciler(flinkApp)
+                            .reconcile(operatorNamespace, flinkApp, context, effectiveConfig);
+            updateForReconciliationSuccess(flinkApp);
+            return updateControl;
         } catch (InvalidDeploymentException ide) {
             LOG.error("Reconciliation failed", ide);
             updateForReconciliationError(flinkApp, ide.getMessage());
@@ -137,62 +124,10 @@ public class FlinkDeploymentController
         } catch (Exception e) {
             throw new ReconciliationException(e);
         }
-
-        return checkJobManagerDeployment(flinkApp, context, effectiveConfig);
     }
 
-    private UpdateControl<FlinkDeployment> checkJobManagerDeployment(
-            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
-        if (!jobManagerDeployments.contains(flinkApp.getMetadata().getSelfLink())) {
-            Optional<Deployment> deployment = context.getSecondaryResource(Deployment.class);
-            if (deployment.isPresent()) {
-                DeploymentStatus status = deployment.get().getStatus();
-                DeploymentSpec spec = deployment.get().getSpec();
-                if (status != null
-                        && status.getAvailableReplicas() != null
-                        && spec.getReplicas().intValue() == status.getReplicas()
-                        && spec.getReplicas().intValue() == status.getAvailableReplicas()) {
-                    // typically it takes a few seconds for the REST server to be ready
-                    if (observer.isJobManagerReady(effectiveConfig)) {
-                        LOG.info(
-                                "JobManager deployment {} in namespace {} is ready",
-                                flinkApp.getMetadata().getName(),
-                                flinkApp.getMetadata().getNamespace());
-                        jobManagerDeployments.add(flinkApp.getMetadata().getSelfLink());
-                        if (flinkApp.getStatus().getJobStatus() != null) {
-                            // short circuit, if the job was already running
-                            // reschedule for immediate job status check
-                            return UpdateControl.updateStatus(flinkApp).rescheduleAfter(0);
-                        }
-                    }
-                    LOG.info(
-                            "JobManager deployment {} in namespace {} port not ready",
-                            flinkApp.getMetadata().getName(),
-                            flinkApp.getMetadata().getNamespace());
-                    return UpdateControl.updateStatus(flinkApp)
-                            .rescheduleAfter(PORT_READY_DELAY_SECONDS, TimeUnit.SECONDS);
-                } else {
-                    LOG.info(
-                            "JobManager deployment {} in namespace {} not yet ready, status {}",
-                            flinkApp.getMetadata().getName(),
-                            flinkApp.getMetadata().getNamespace(),
-                            status);
-                }
-            }
-        }
-        return UpdateControl.updateStatus(flinkApp)
-                .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
-    }
-
-    private void reconcileFlinkDeployment(
-            String operatorNamespace, FlinkDeployment flinkApp, Configuration effectiveConfig)
-            throws Exception {
-
-        if (flinkApp.getSpec().getJob() == null) {
-            sessionReconciler.reconcile(operatorNamespace, flinkApp, effectiveConfig);
-        } else {
-            jobReconciler.reconcile(operatorNamespace, flinkApp, effectiveConfig);
-        }
+    private BaseReconciler getReconciler(FlinkDeployment flinkDeployment) {
+        return flinkDeployment.getSpec().getJob() == null ? sessionReconciler : jobReconciler;
     }
 
     private void updateForReconciliationSuccess(FlinkDeployment flinkApp) {
@@ -217,10 +152,14 @@ public class FlinkDeploymentController
                         .apps()
                         .deployments()
                         .inAnyNamespace()
-                        .withLabel("type", "flink-native-kubernetes")
-                        .withLabel("component", "jobmanager")
+                        .withLabel(Constants.LABEL_TYPE_KEY, Constants.LABEL_TYPE_NATIVE_TYPE)
+                        .withLabel(
+                                Constants.LABEL_COMPONENT_KEY,
+                                Constants.LABEL_COMPONENT_JOB_MANAGER)
                         .runnableInformer(0);
-        return List.of(new InformerEventSource<>(deploymentInformer, Mappers.fromLabel("app")));
+        return List.of(
+                new InformerEventSource<>(
+                        deploymentInformer, Mappers.fromLabel(Constants.LABEL_APP_KEY)));
     }
 
     @Override
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
index cb56dc3..577d73b 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
@@ -83,10 +83,6 @@ public class JobStatusObserver {
         }
     }
 
-    public boolean isJobManagerReady(Configuration config) {
-        return flinkService.isJobManagerReady(config);
-    }
-
     /** Merge previous job status with the new one from the flink job cluster. */
     private JobStatus mergeJobStatus(
             JobStatus oldStatus, List<JobStatusMessage> clusterJobStatuses) {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
new file mode 100644
index 0000000..f1c0c23
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
+import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+/** BaseReconciler with functionality that is common to job and session modes. */
+public abstract class BaseReconciler {
+
+    private static final Logger LOG = LoggerFactory.getLogger(BaseReconciler.class);
+
+    public static final int REFRESH_SECONDS = 60;
+    public static final int PORT_READY_DELAY_SECONDS = 10;
+
+    private final HashSet<String> jobManagerDeployments = new HashSet<>();
+
+    public boolean removeDeployment(FlinkDeployment flinkApp) {
+        return jobManagerDeployments.remove(flinkApp.getMetadata().getUid());
+    }
+
+    public abstract UpdateControl<FlinkDeployment> reconcile(
+            String operatorNamespace,
+            FlinkDeployment flinkApp,
+            Context context,
+            Configuration effectiveConfig)
+            throws Exception;
+
+    protected UpdateControl<FlinkDeployment> checkJobManagerDeployment(
+            FlinkDeployment flinkApp,
+            Context context,
+            Configuration effectiveConfig,
+            FlinkService flinkService) {
+        if (!jobManagerDeployments.contains(flinkApp.getMetadata().getUid())) {
+            Optional<Deployment> deployment = context.getSecondaryResource(Deployment.class);
+            if (deployment.isPresent()) {
+                DeploymentStatus status = deployment.get().getStatus();
+                DeploymentSpec spec = deployment.get().getSpec();
+                if (status != null
+                        && status.getAvailableReplicas() != null
+                        && spec.getReplicas().intValue() == status.getReplicas()
+                        && spec.getReplicas().intValue() == status.getAvailableReplicas()) {
+                    // typically it takes a few seconds for the REST server to be ready
+                    if (flinkService.isJobManagerPortReady(effectiveConfig)) {
+                        LOG.info(
+                                "JobManager deployment {} in namespace {} is ready",
+                                flinkApp.getMetadata().getName(),
+                                flinkApp.getMetadata().getNamespace());
+                        jobManagerDeployments.add(flinkApp.getMetadata().getUid());
+                        if (flinkApp.getStatus().getJobStatus() != null) {
+                            // pre-existing deployments on operator restart - proceed with
+                            // reconciliation
+                            return null;
+                        }
+                    }
+                    LOG.info(
+                            "JobManager deployment {} in namespace {} port not ready",
+                            flinkApp.getMetadata().getName(),
+                            flinkApp.getMetadata().getNamespace());
+                    return UpdateControl.updateStatus(flinkApp)
+                            .rescheduleAfter(PORT_READY_DELAY_SECONDS, TimeUnit.SECONDS);
+                }
+                LOG.info(
+                        "JobManager deployment {} in namespace {} not yet ready, status {}",
+                        flinkApp.getMetadata().getName(),
+                        flinkApp.getMetadata().getNamespace(),
+                        status);
+                // TODO: how frequently do we want here
+                return UpdateControl.updateStatus(flinkApp)
+                        .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
+            }
+        }
+        return null;
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
index fb599e8..7a072e0 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
@@ -25,36 +25,45 @@ import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.IngressUtils;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Optional;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Reconciler responsible for handling the job lifecycle according to the desired and current
  * states.
  */
-public class JobReconciler {
+public class JobReconciler extends BaseReconciler {
 
     private static final Logger LOG = LoggerFactory.getLogger(JobReconciler.class);
 
     private final KubernetesClient kubernetesClient;
     private final FlinkService flinkService;
+    private final JobStatusObserver observer;
 
     public JobReconciler(KubernetesClient kubernetesClient, FlinkService flinkService) {
         this.kubernetesClient = kubernetesClient;
         this.flinkService = flinkService;
+        this.observer = new JobStatusObserver(flinkService);
     }
 
-    public void reconcile(
-            String operatorNamespace, FlinkDeployment flinkApp, Configuration effectiveConfig)
+    @Override
+    public UpdateControl<FlinkDeployment> reconcile(
+            String operatorNamespace,
+            FlinkDeployment flinkApp,
+            Context context,
+            Configuration effectiveConfig)
             throws Exception {
-
         FlinkDeploymentSpec lastReconciledSpec =
                 flinkApp.getStatus().getReconciliationStatus().getLastReconciledSpec();
         JobSpec jobSpec = flinkApp.getSpec().getJob();
@@ -65,9 +74,22 @@ public class JobReconciler {
                     Optional.ofNullable(jobSpec.getInitialSavepointPath()));
             IngressUtils.updateIngressRules(
                     flinkApp, effectiveConfig, operatorNamespace, kubernetesClient, false);
-            return;
         }
 
+        // wait until the deployment is ready
+        UpdateControl<FlinkDeployment> uc =
+                checkJobManagerDeployment(flinkApp, context, effectiveConfig, flinkService);
+        if (uc != null) {
+            return uc;
+        }
+
+        if (!observer.observeFlinkJobStatus(flinkApp, effectiveConfig)) {
+            return UpdateControl.updateStatus(flinkApp)
+                    .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
+        }
+
+        // TODO: following assumes that current job is running
+        // What if it never enters running state due to bad deployment?
         boolean specChanged = !flinkApp.getSpec().equals(lastReconciledSpec);
         if (specChanged) {
             JobState currentJobState = lastReconciledSpec.getJob().getState();
@@ -97,6 +119,9 @@ public class JobReconciler {
                 }
             }
         }
+
+        return UpdateControl.updateStatus(flinkApp)
+                .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
     }
 
     private void deployFlinkJob(
@@ -150,6 +175,7 @@ public class JobReconciler {
                         effectiveConfig);
         JobStatus jobStatus = flinkApp.getStatus().getJobStatus();
         jobStatus.setState("suspended");
+        removeDeployment(flinkApp);
         savepointOpt.ifPresent(jobStatus::setSavepointLocation);
         return savepointOpt;
     }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
index 820e31f..0c9ade3 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
@@ -24,14 +24,18 @@ import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.IngressUtils;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.TimeUnit;
+
 /**
  * Reconciler responsible for handling the session cluster lifecycle according to the desired and
  * current states.
  */
-public class SessionReconciler {
+public class SessionReconciler extends BaseReconciler {
 
     private static final Logger LOG = LoggerFactory.getLogger(SessionReconciler.class);
 
@@ -43,8 +47,12 @@ public class SessionReconciler {
         this.flinkService = flinkService;
     }
 
-    public void reconcile(
-            String operatorNamespace, FlinkDeployment flinkApp, Configuration effectiveConfig)
+    @Override
+    public UpdateControl<FlinkDeployment> reconcile(
+            String operatorNamespace,
+            FlinkDeployment flinkApp,
+            Context context,
+            Configuration effectiveConfig)
             throws Exception {
 
         FlinkDeploymentSpec lastReconciledSpec =
@@ -54,14 +62,21 @@ public class SessionReconciler {
             flinkService.submitSessionCluster(flinkApp, effectiveConfig);
             IngressUtils.updateIngressRules(
                     flinkApp, effectiveConfig, operatorNamespace, kubernetesClient, false);
-            return;
         }
 
-        boolean specChanged = !flinkApp.getSpec().equals(lastReconciledSpec);
+        UpdateControl<FlinkDeployment> uc =
+                checkJobManagerDeployment(flinkApp, context, effectiveConfig, flinkService);
+        if (uc != null) {
+            return uc;
+        }
 
+        boolean specChanged = !flinkApp.getSpec().equals(lastReconciledSpec);
         if (specChanged) {
             upgradeSessionCluster(flinkApp, effectiveConfig);
         }
+
+        return UpdateControl.updateStatus(flinkApp)
+                .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
     }
 
     private void upgradeSessionCluster(FlinkDeployment flinkApp, Configuration effectiveConfig)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index a2e731c..fc98dcf 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -100,7 +100,7 @@ public class FlinkService {
         LOG.info("Session cluster {} deployed", deployment.getMetadata().getName());
     }
 
-    public boolean isJobManagerReady(Configuration config) {
+    public boolean isJobManagerPortReady(Configuration config) {
         final URI uri;
         try (ClusterClient<String> clusterClient = getClusterClient(config)) {
             uri = URI.create(clusterClient.getWebInterfaceURL());
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index 398ef84..626c0ae 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -102,7 +102,7 @@ public class TestingFlinkService extends FlinkService {
     }
 
     @Override
-    public boolean isJobManagerReady(Configuration config) {
+    public boolean isJobManagerPortReady(Configuration config) {
         return true;
     }
 }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 24db802..ab7c884 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -25,24 +25,20 @@ import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
-import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
+import org.apache.flink.kubernetes.operator.reconciler.BaseReconciler;
 import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
+import org.apache.flink.kubernetes.operator.reconciler.JobReconcilerTest;
 import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.validation.DefaultDeploymentValidator;
 import org.apache.flink.runtime.client.JobStatusMessage;
 
-import io.fabric8.kubernetes.api.model.apps.Deployment;
-import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
-import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
-import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
 import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Optional;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -53,27 +49,7 @@ import static org.junit.Assert.assertTrue;
 /** @link JobStatusObserver unit tests */
 public class FlinkDeploymentControllerTest {
 
-    private final Context context =
-            new Context() {
-                @Override
-                public Optional<RetryInfo> getRetryInfo() {
-                    return Optional.empty();
-                }
-
-                @Override
-                public <T> Optional<T> getSecondaryResource(
-                        Class<T> expectedType, String eventSourceName) {
-                    DeploymentStatus status = new DeploymentStatus();
-                    status.setAvailableReplicas(1);
-                    status.setReplicas(1);
-                    DeploymentSpec spec = new DeploymentSpec();
-                    spec.setReplicas(1);
-                    Deployment deployment = new Deployment();
-                    deployment.setSpec(spec);
-                    deployment.setStatus(status);
-                    return Optional.of((T) deployment);
-                }
-            };
+    private final Context context = JobReconcilerTest.createContextWithReadyJobManagerDeployment();
 
     @Test
     public void verifyBasicReconcileLoop() {
@@ -86,13 +62,13 @@ public class FlinkDeploymentControllerTest {
         updateControl = testController.reconcile(appCluster, context);
         assertTrue(updateControl.isUpdateStatus());
         assertEquals(
-                FlinkDeploymentController.PORT_READY_DELAY_SECONDS * 1000,
+                BaseReconciler.PORT_READY_DELAY_SECONDS * 1000,
                 (long) updateControl.getScheduleDelay().get());
 
         updateControl = testController.reconcile(appCluster, context);
         assertTrue(updateControl.isUpdateStatus());
         assertEquals(
-                FlinkDeploymentController.REFRESH_SECONDS * 1000,
+                BaseReconciler.REFRESH_SECONDS * 1000,
                 (long) updateControl.getScheduleDelay().get());
 
         // Validate reconciliation status
@@ -105,7 +81,7 @@ public class FlinkDeploymentControllerTest {
         updateControl = testController.reconcile(appCluster, context);
         assertTrue(updateControl.isUpdateStatus());
         assertEquals(
-                FlinkDeploymentController.REFRESH_SECONDS * 1000,
+                BaseReconciler.REFRESH_SECONDS * 1000,
                 (long) updateControl.getScheduleDelay().get());
 
         // Validate job status
@@ -216,7 +192,6 @@ public class FlinkDeploymentControllerTest {
     }
 
     private FlinkDeploymentController createTestController(TestingFlinkService flinkService) {
-        JobStatusObserver observer = new JobStatusObserver(flinkService);
         JobReconciler jobReconciler = new JobReconciler(null, flinkService);
         SessionReconciler sessionReconciler = new SessionReconciler(null, flinkService);
 
@@ -225,7 +200,6 @@ public class FlinkDeploymentControllerTest {
                 null,
                 "test",
                 new DefaultDeploymentValidator(),
-                observer,
                 jobReconciler,
                 sessionReconciler);
     }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
index 8f99806..6a2109b 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
@@ -25,13 +25,20 @@ import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
 
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
+import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
 import org.junit.jupiter.api.Test;
 
 import java.util.List;
+import java.util.Optional;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -40,15 +47,39 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 /** @link JobStatusObserver unit tests */
 public class JobReconcilerTest {
 
+    public static Context createContextWithReadyJobManagerDeployment() {
+        return new Context() {
+            @Override
+            public Optional<RetryInfo> getRetryInfo() {
+                return Optional.empty();
+            }
+
+            @Override
+            public <T> Optional<T> getSecondaryResource(
+                    Class<T> expectedType, String eventSourceName) {
+                DeploymentStatus status = new DeploymentStatus();
+                status.setAvailableReplicas(1);
+                status.setReplicas(1);
+                DeploymentSpec spec = new DeploymentSpec();
+                spec.setReplicas(1);
+                Deployment deployment = new Deployment();
+                deployment.setSpec(spec);
+                deployment.setStatus(status);
+                return Optional.of((T) deployment);
+            }
+        };
+    }
+
     @Test
     public void testUpgrade() throws Exception {
+        Context context = JobReconcilerTest.createContextWithReadyJobManagerDeployment();
         TestingFlinkService flinkService = new TestingFlinkService();
 
         JobReconciler reconciler = new JobReconciler(null, flinkService);
         FlinkDeployment deployment = TestUtils.buildApplicationCluster();
         Configuration config = FlinkUtils.getEffectiveConfig(deployment, new Configuration());
 
-        reconciler.reconcile("test", deployment, config);
+        reconciler.reconcile("test", deployment, context, config);
         List<Tuple2<String, JobStatusMessage>> runningJobs = flinkService.listJobs();
         verifyAndSetRunningJobsToStatus(deployment, runningJobs);
 
@@ -56,7 +87,7 @@ public class JobReconcilerTest {
         FlinkDeployment statelessUpgrade = TestUtils.clone(deployment);
         statelessUpgrade.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS);
         statelessUpgrade.getSpec().getFlinkConfiguration().put("new", "conf");
-        reconciler.reconcile("test", statelessUpgrade, config);
+        reconciler.reconcile("test", statelessUpgrade, context, config);
 
         runningJobs = flinkService.listJobs();
         assertEquals(1, runningJobs.size());
@@ -72,7 +103,7 @@ public class JobReconcilerTest {
         statefulUpgrade.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
         statefulUpgrade.getSpec().getFlinkConfiguration().put("new", "conf2");
 
-        reconciler.reconcile("test", statefulUpgrade, new Configuration(config));
+        reconciler.reconcile("test", statefulUpgrade, context, new Configuration(config));
 
         runningJobs = flinkService.listJobs();
         assertEquals(1, runningJobs.size());
@@ -82,13 +113,15 @@ public class JobReconcilerTest {
     @Test
     public void testUpgradeModeChangeFromSavepointToLastState() throws Exception {
         final String expectedSavepointPath = "savepoint_0";
+        final Context context = JobReconcilerTest.createContextWithReadyJobManagerDeployment();
         final TestingFlinkService flinkService = new TestingFlinkService();
+        JobStatusObserver observer = new JobStatusObserver(flinkService);
 
         final JobReconciler reconciler = new JobReconciler(null, flinkService);
         final FlinkDeployment deployment = TestUtils.buildApplicationCluster();
         final Configuration config = FlinkUtils.getEffectiveConfig(deployment, new Configuration());
 
-        reconciler.reconcile("test", deployment, config);
+        reconciler.reconcile("test", deployment, context, config);
         List<Tuple2<String, JobStatusMessage>> runningJobs = flinkService.listJobs();
         verifyAndSetRunningJobsToStatus(deployment, runningJobs);
 
@@ -97,7 +130,7 @@ public class JobReconcilerTest {
         deployment.getSpec().getJob().setState(JobState.SUSPENDED);
         deployment.getSpec().setImage("new-image-1");
 
-        reconciler.reconcile("test", deployment, config);
+        reconciler.reconcile("test", deployment, context, config);
         assertEquals(0, flinkService.listJobs().size());
         assertTrue(
                 JobState.SUSPENDED
@@ -118,7 +151,7 @@ public class JobReconcilerTest {
         deployment.getSpec().getJob().setState(JobState.RUNNING);
         deployment.getSpec().setImage("new-image-2");
 
-        reconciler.reconcile("test", deployment, config);
+        reconciler.reconcile("test", deployment, context, config);
         runningJobs = flinkService.listJobs();
         assertEquals(expectedSavepointPath, config.get(SavepointConfigOptions.SAVEPOINT_PATH));
         assertEquals(1, runningJobs.size());