You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/03/14 12:19:38 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-26546] Extract Observer Interface

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 890547f  [FLINK-26546] Extract Observer Interface
890547f is described below

commit 890547f063e75f5929d2809e1cfe8895d0d2ac9f
Author: Aitozi <yu...@alibaba-inc.com>
AuthorDate: Sun Mar 13 20:27:07 2022 +0800

    [FLINK-26546] Extract Observer Interface
---
 .../flink/kubernetes/operator/FlinkOperator.java   |  12 +-
 .../flink/kubernetes/operator/config/Mode.java     |  31 ++++
 .../controller/FlinkDeploymentController.java      |  14 +-
 .../kubernetes/operator/observer/BaseObserver.java | 117 +++++++++++++
 .../kubernetes/operator/observer/JobObserver.java  | 126 ++++++++++++++
 .../kubernetes/operator/observer/Observer.java     | 191 ++-------------------
 .../ObserverFactory.java}                          |  42 ++---
 .../operator/observer/SessionObserver.java         |  40 +++++
 .../operator/reconciler/JobReconciler.java         |   2 +-
 .../operator/reconciler/ReconcilerFactory.java     |  12 +-
 .../controller/FlinkDeploymentControllerTest.java  |   7 +-
 .../{ObserverTest.java => JobObserverTest.java}    |  48 +-----
 .../operator/observer/SessionObserverTest.java     |  69 ++++++++
 13 files changed, 433 insertions(+), 278 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 7e46b0c..1125b65 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -22,7 +22,7 @@ import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.controller.FlinkControllerConfig;
 import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
 import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
-import org.apache.flink.kubernetes.operator.observer.Observer;
+import org.apache.flink.kubernetes.operator.observer.ObserverFactory;
 import org.apache.flink.kubernetes.operator.reconciler.ReconcilerFactory;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
@@ -57,12 +57,10 @@ public class FlinkOperator {
         FlinkService flinkService = new FlinkService(client);
         FlinkOperatorConfiguration operatorConfiguration =
                 FlinkOperatorConfiguration.fromConfiguration(defaultConfig.getOperatorConfig());
-
-        Observer observer = new Observer(flinkService, operatorConfiguration);
-
         FlinkDeploymentValidator validator = new DefaultDeploymentValidator();
-        ReconcilerFactory factory =
+        ReconcilerFactory reconcilerFactory =
                 new ReconcilerFactory(client, flinkService, operatorConfiguration);
+        ObserverFactory observerFactory = new ObserverFactory(flinkService, operatorConfiguration);
 
         FlinkDeploymentController controller =
                 new FlinkDeploymentController(
@@ -71,8 +69,8 @@ public class FlinkOperator {
                         client,
                         namespace,
                         validator,
-                        observer,
-                        factory);
+                        reconcilerFactory,
+                        observerFactory);
 
         FlinkControllerConfig controllerConfig = new FlinkControllerConfig(controller);
         controller.setControllerConfig(controllerConfig);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/Mode.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/Mode.java
new file mode 100644
index 0000000..e616432
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/Mode.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.config;
+
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+/** The mode of {@link FlinkDeployment}. */
+public enum Mode {
+    APPLICATION,
+    SESSION;
+
+    public static Mode getMode(FlinkDeployment flinkApp) {
+        return flinkApp.getSpec().getJob() != null ? APPLICATION : SESSION;
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index 195defd..4ef0959 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -24,7 +24,7 @@ import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
 import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
 import org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
-import org.apache.flink.kubernetes.operator.observer.Observer;
+import org.apache.flink.kubernetes.operator.observer.ObserverFactory;
 import org.apache.flink.kubernetes.operator.reconciler.ReconcilerFactory;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
@@ -65,8 +65,8 @@ public class FlinkDeploymentController
     private final String operatorNamespace;
 
     private final FlinkDeploymentValidator validator;
-    private final Observer observer;
     private final ReconcilerFactory reconcilerFactory;
+    private final ObserverFactory observerFactory;
     private final DefaultConfig defaultConfig;
     private final FlinkOperatorConfiguration operatorConfiguration;
 
@@ -78,15 +78,15 @@ public class FlinkDeploymentController
             KubernetesClient kubernetesClient,
             String operatorNamespace,
             FlinkDeploymentValidator validator,
-            Observer observer,
-            ReconcilerFactory reconcilerFactory) {
+            ReconcilerFactory reconcilerFactory,
+            ObserverFactory observerFactory) {
         this.defaultConfig = defaultConfig;
         this.operatorConfiguration = operatorConfiguration;
         this.kubernetesClient = kubernetesClient;
         this.operatorNamespace = operatorNamespace;
         this.validator = validator;
-        this.observer = observer;
         this.reconcilerFactory = reconcilerFactory;
+        this.observerFactory = observerFactory;
     }
 
     @Override
@@ -95,7 +95,7 @@ public class FlinkDeploymentController
         Configuration effectiveConfig =
                 FlinkUtils.getEffectiveConfig(flinkApp, defaultConfig.getFlinkConfig());
         try {
-            observer.observe(flinkApp, context, effectiveConfig);
+            observerFactory.getOrCreate(flinkApp).observe(flinkApp, context, effectiveConfig);
         } catch (DeploymentFailedException dfe) {
             // ignore during cleanup
         }
@@ -119,7 +119,7 @@ public class FlinkDeploymentController
                 FlinkUtils.getEffectiveConfig(flinkApp, defaultConfig.getFlinkConfig());
 
         try {
-            observer.observe(flinkApp, context, effectiveConfig);
+            observerFactory.getOrCreate(flinkApp).observe(flinkApp, context, effectiveConfig);
             reconcilerFactory
                     .getOrCreate(flinkApp)
                     .reconcile(operatorNamespace, flinkApp, context, effectiveConfig);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
new file mode 100644
index 0000000..2dee79e
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/BaseObserver.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.observer;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
+import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
+import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Optional;
+
+/** The base observer. */
+public abstract class BaseObserver implements Observer {
+
+    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    public static final String JOB_STATE_UNKNOWN = "UNKNOWN";
+    protected final FlinkService flinkService;
+    protected final FlinkOperatorConfiguration operatorConfiguration;
+
+    public BaseObserver(
+            FlinkService flinkService, FlinkOperatorConfiguration operatorConfiguration) {
+        this.flinkService = flinkService;
+        this.operatorConfiguration = operatorConfiguration;
+    }
+
+    protected void observeJmDeployment(
+            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+        FlinkDeploymentStatus deploymentStatus = flinkApp.getStatus();
+        JobManagerDeploymentStatus previousJmStatus =
+                deploymentStatus.getJobManagerDeploymentStatus();
+
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            return;
+        }
+
+        if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == previousJmStatus) {
+            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+            return;
+        }
+
+        Optional<Deployment> deployment = context.getSecondaryResource(Deployment.class);
+        if (deployment.isPresent()) {
+            DeploymentStatus status = deployment.get().getStatus();
+            DeploymentSpec spec = deployment.get().getSpec();
+            if (status != null
+                    && status.getAvailableReplicas() != null
+                    && spec.getReplicas().intValue() == status.getReplicas()
+                    && spec.getReplicas().intValue() == status.getAvailableReplicas()
+                    && flinkService.isJobManagerPortReady(effectiveConfig)) {
+
+                // typically it takes a few seconds for the REST server to be ready
+                logger.info(
+                        "JobManager deployment {} in namespace {} port ready, waiting for the REST API...",
+                        flinkApp.getMetadata().getName(),
+                        flinkApp.getMetadata().getNamespace());
+                deploymentStatus.setJobManagerDeploymentStatus(
+                        JobManagerDeploymentStatus.DEPLOYED_NOT_READY);
+                return;
+            }
+            logger.info(
+                    "JobManager deployment {} in namespace {} exists but not ready yet, status {}",
+                    flinkApp.getMetadata().getName(),
+                    flinkApp.getMetadata().getNamespace(),
+                    status);
+
+            List<DeploymentCondition> conditions = status.getConditions();
+            for (DeploymentCondition dc : conditions) {
+                if ("FailedCreate".equals(dc.getReason())
+                        && "ReplicaFailure".equals(dc.getType())) {
+                    // throw only when not already in error status to allow for spec update
+                    if (!JobManagerDeploymentStatus.ERROR.equals(
+                            deploymentStatus.getJobManagerDeploymentStatus())) {
+                        throw new DeploymentFailedException(
+                                DeploymentFailedException.COMPONENT_JOBMANAGER, dc);
+                    }
+                    return;
+                }
+            }
+            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
+            return;
+        }
+
+        deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
+    }
+
+    protected boolean isClusterReady(FlinkDeployment dep) {
+        return dep.getStatus().getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.READY;
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobObserver.java
new file mode 100644
index 0000000..ea5a58f
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobObserver.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.observer;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
+import org.apache.flink.runtime.client.JobStatusMessage;
+
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/** The observer of {@link org.apache.flink.kubernetes.operator.config.Mode#APPLICATION} cluster. */
+public class JobObserver extends BaseObserver {
+
+    public JobObserver(
+            FlinkService flinkService, FlinkOperatorConfiguration operatorConfiguration) {
+        super(flinkService, operatorConfiguration);
+    }
+
+    @Override
+    public void observe(FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+        observeJmDeployment(flinkApp, context, effectiveConfig);
+        if (isClusterReady(flinkApp)) {
+            boolean jobFound = observeFlinkJobStatus(flinkApp, effectiveConfig);
+            if (jobFound) {
+                observeSavepointStatus(flinkApp, effectiveConfig);
+            }
+        }
+    }
+
+    private boolean observeFlinkJobStatus(FlinkDeployment flinkApp, Configuration effectiveConfig) {
+        logger.info("Getting job statuses for {}", flinkApp.getMetadata().getName());
+        FlinkDeploymentStatus flinkAppStatus = flinkApp.getStatus();
+
+        Collection<JobStatusMessage> clusterJobStatuses;
+        try {
+            clusterJobStatuses = flinkService.listJobs(effectiveConfig);
+        } catch (Exception e) {
+            logger.error("Exception while listing jobs", e);
+            flinkAppStatus.getJobStatus().setState(JOB_STATE_UNKNOWN);
+            return false;
+        }
+        if (clusterJobStatuses.isEmpty()) {
+            logger.info("No jobs found on {} yet", flinkApp.getMetadata().getName());
+            flinkAppStatus.getJobStatus().setState(JOB_STATE_UNKNOWN);
+            return false;
+        }
+
+        updateJobStatus(flinkAppStatus.getJobStatus(), new ArrayList<>(clusterJobStatuses));
+        logger.info("Job statuses updated for {}", flinkApp.getMetadata().getName());
+        return true;
+    }
+
+    /** Update previous job status based on the job list from the cluster. */
+    private void updateJobStatus(JobStatus status, List<JobStatusMessage> clusterJobStatuses) {
+        Collections.sort(
+                clusterJobStatuses, (j1, j2) -> Long.compare(j2.getStartTime(), j1.getStartTime()));
+        JobStatusMessage newJob = clusterJobStatuses.get(0);
+
+        status.setState(newJob.getJobState().name());
+        status.setJobName(newJob.getJobName());
+        status.setJobId(newJob.getJobId().toHexString());
+        // track the start time, changing timestamp would cause busy reconciliation
+        status.setUpdateTime(String.valueOf(newJob.getStartTime()));
+    }
+
+    private void observeSavepointStatus(FlinkDeployment flinkApp, Configuration effectiveConfig) {
+        SavepointInfo savepointInfo = flinkApp.getStatus().getJobStatus().getSavepointInfo();
+        if (!SavepointUtils.savepointInProgress(flinkApp)) {
+            logger.debug("Checkpointing not in progress");
+            return;
+        }
+        SavepointFetchResult savepointFetchResult;
+        try {
+            savepointFetchResult = flinkService.fetchSavepointInfo(flinkApp, effectiveConfig);
+        } catch (Exception e) {
+            logger.error("Exception while fetching savepoint info", e);
+            return;
+        }
+
+        if (!savepointFetchResult.isTriggered()) {
+            String error = savepointFetchResult.getError();
+            if (error != null
+                    || SavepointUtils.gracePeriodEnded(operatorConfiguration, savepointInfo)) {
+                String errorMsg = error != null ? error : "Savepoint status unknown";
+                logger.error(errorMsg);
+                savepointInfo.resetTrigger();
+                ReconciliationUtils.updateForReconciliationError(flinkApp, errorMsg);
+                return;
+            }
+            logger.info("Savepoint operation not running, waiting within grace period");
+        }
+        if (savepointFetchResult.getSavepoint() == null) {
+            logger.info("Savepoint not completed yet");
+            return;
+        }
+        savepointInfo.updateLastSavepoint(savepointFetchResult.getSavepoint());
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
index 51ccf75..e354689 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
@@ -18,188 +18,19 @@
 package org.apache.flink.kubernetes.operator.observer;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
-import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
-import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo;
-import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
-import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
-import org.apache.flink.kubernetes.operator.service.FlinkService;
-import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
-import org.apache.flink.runtime.client.JobStatusMessage;
 
-import io.fabric8.kubernetes.api.model.apps.Deployment;
-import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
-import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
-import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-
-/** Observes the actual state of the running jobs on the Flink cluster. */
-public class Observer {
-
-    private static final Logger LOG = LoggerFactory.getLogger(Observer.class);
-
-    public static final String JOB_STATE_UNKNOWN = "UNKNOWN";
-
-    private final FlinkService flinkService;
-    private final FlinkOperatorConfiguration operatorConfiguration;
-
-    public Observer(FlinkService flinkService, FlinkOperatorConfiguration operatorConfiguration) {
-        this.flinkService = flinkService;
-        this.operatorConfiguration = operatorConfiguration;
-    }
-
-    public void observe(FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
-        observeJmDeployment(flinkApp, context, effectiveConfig);
-        if (isApplicationClusterReady(flinkApp)) {
-            boolean jobFound = observeFlinkJobStatus(flinkApp, effectiveConfig);
-            if (jobFound) {
-                observeSavepointStatus(flinkApp, effectiveConfig);
-            }
-        }
-    }
-
-    private boolean isApplicationClusterReady(FlinkDeployment dep) {
-        return dep.getSpec().getJob() != null
-                && dep.getStatus().getJobManagerDeploymentStatus()
-                        == JobManagerDeploymentStatus.READY;
-    }
-
-    private void observeJmDeployment(
-            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
-        FlinkDeploymentStatus deploymentStatus = flinkApp.getStatus();
-        JobManagerDeploymentStatus previousJmStatus =
-                deploymentStatus.getJobManagerDeploymentStatus();
-
-        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
-            return;
-        }
-
-        if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == previousJmStatus) {
-            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
-            return;
-        }
-
-        Optional<Deployment> deployment = context.getSecondaryResource(Deployment.class);
-        if (deployment.isPresent()) {
-            DeploymentStatus status = deployment.get().getStatus();
-            DeploymentSpec spec = deployment.get().getSpec();
-            if (status != null
-                    && status.getAvailableReplicas() != null
-                    && spec.getReplicas().intValue() == status.getReplicas()
-                    && spec.getReplicas().intValue() == status.getAvailableReplicas()
-                    && flinkService.isJobManagerPortReady(effectiveConfig)) {
-
-                // typically it takes a few seconds for the REST server to be ready
-                LOG.info(
-                        "JobManager deployment {} in namespace {} port ready, waiting for the REST API...",
-                        flinkApp.getMetadata().getName(),
-                        flinkApp.getMetadata().getNamespace());
-                deploymentStatus.setJobManagerDeploymentStatus(
-                        JobManagerDeploymentStatus.DEPLOYED_NOT_READY);
-                return;
-            }
-            LOG.info(
-                    "JobManager deployment {} in namespace {} exists but not ready yet, status {}",
-                    flinkApp.getMetadata().getName(),
-                    flinkApp.getMetadata().getNamespace(),
-                    status);
-
-            List<DeploymentCondition> conditions = status.getConditions();
-            for (DeploymentCondition dc : conditions) {
-                if ("FailedCreate".equals(dc.getReason())
-                        && "ReplicaFailure".equals(dc.getType())) {
-                    // throw only when not already in error status to allow for spec update
-                    if (!JobManagerDeploymentStatus.ERROR.equals(
-                            deploymentStatus.getJobManagerDeploymentStatus())) {
-                        throw new DeploymentFailedException(
-                                DeploymentFailedException.COMPONENT_JOBMANAGER, dc);
-                    }
-                    return;
-                }
-            }
-            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
-            return;
-        }
-
-        deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
-    }
-
-    private boolean observeFlinkJobStatus(FlinkDeployment flinkApp, Configuration effectiveConfig) {
-        LOG.info("Getting job statuses for {}", flinkApp.getMetadata().getName());
-        FlinkDeploymentStatus flinkAppStatus = flinkApp.getStatus();
-
-        Collection<JobStatusMessage> clusterJobStatuses;
-        try {
-            clusterJobStatuses = flinkService.listJobs(effectiveConfig);
-        } catch (Exception e) {
-            LOG.error("Exception while listing jobs", e);
-            flinkAppStatus.getJobStatus().setState(JOB_STATE_UNKNOWN);
-            return false;
-        }
-        if (clusterJobStatuses.isEmpty()) {
-            LOG.info("No jobs found on {} yet", flinkApp.getMetadata().getName());
-            flinkAppStatus.getJobStatus().setState(JOB_STATE_UNKNOWN);
-            return false;
-        }
-
-        updateJobStatus(flinkAppStatus.getJobStatus(), new ArrayList<>(clusterJobStatuses));
-        LOG.info("Job statuses updated for {}", flinkApp.getMetadata().getName());
-        return true;
-    }
-
-    private void observeSavepointStatus(FlinkDeployment flinkApp, Configuration effectiveConfig) {
-        SavepointInfo savepointInfo = flinkApp.getStatus().getJobStatus().getSavepointInfo();
-        if (!SavepointUtils.savepointInProgress(flinkApp)) {
-            LOG.debug("Checkpointing not in progress");
-            return;
-        }
-        SavepointFetchResult savepointFetchResult;
-        try {
-            savepointFetchResult = flinkService.fetchSavepointInfo(flinkApp, effectiveConfig);
-        } catch (Exception e) {
-            LOG.error("Exception while fetching savepoint info", e);
-            return;
-        }
-
-        if (!savepointFetchResult.isTriggered()) {
-            String error = savepointFetchResult.getError();
-            if (error != null
-                    || SavepointUtils.gracePeriodEnded(operatorConfiguration, savepointInfo)) {
-                String errorMsg = error != null ? error : "Savepoint status unknown";
-                LOG.error(errorMsg);
-                savepointInfo.resetTrigger();
-                ReconciliationUtils.updateForReconciliationError(flinkApp, errorMsg);
-                return;
-            }
-            LOG.info("Savepoint operation not running, waiting within grace period");
-        }
-        if (savepointFetchResult.getSavepoint() == null) {
-            LOG.info("Savepoint not completed yet");
-            return;
-        }
-        savepointInfo.updateLastSavepoint(savepointFetchResult.getSavepoint());
-    }
-
-    /** Update previous job status based on the job list from the cluster. */
-    private void updateJobStatus(JobStatus status, List<JobStatusMessage> clusterJobStatuses) {
-        Collections.sort(
-                clusterJobStatuses, (j1, j2) -> Long.compare(j2.getStartTime(), j1.getStartTime()));
-        JobStatusMessage newJob = clusterJobStatuses.get(0);
-
-        status.setState(newJob.getJobState().name());
-        status.setJobName(newJob.getJobName());
-        status.setJobId(newJob.getJobId().toHexString());
-        // track the start time, changing timestamp would cause busy reconciliation
-        status.setUpdateTime(String.valueOf(newJob.getStartTime()));
-    }
+/** The Observer of {@link FlinkDeployment}. */
+public interface Observer {
+
+    /**
+     * Observe the flinkApp status, It will reflect the changed status on the flinkApp resource.
+     *
+     * @param flinkApp the target flinkDeployment resource
+     * @param context the context with which the operation is executed
+     * @param effectiveConfig the effective config of the flinkApp
+     */
+    void observe(FlinkDeployment flinkApp, Context context, Configuration effectiveConfig);
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconcilerFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ObserverFactory.java
similarity index 57%
copy from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconcilerFactory.java
copy to flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ObserverFactory.java
index 94050ce..d3ec8b9 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconcilerFactory.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ObserverFactory.java
@@ -16,59 +16,43 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.reconciler;
+package org.apache.flink.kubernetes.operator.observer;
 
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.config.Mode;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 
-import io.fabric8.kubernetes.client.KubernetesClient;
-
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
-/** The factory to create reconciler based on app mode. */
-public class ReconcilerFactory {
+/** The factory to create the observer based ob the {@link FlinkDeployment} mode. */
+public class ObserverFactory {
 
-    private final KubernetesClient kubernetesClient;
     private final FlinkService flinkService;
     private final FlinkOperatorConfiguration operatorConfiguration;
-    private final Map<Mode, Reconciler> reconcilerMap;
+    private final Map<Mode, Observer> observerMap;
 
-    public ReconcilerFactory(
-            KubernetesClient kubernetesClient,
-            FlinkService flinkService,
-            FlinkOperatorConfiguration operatorConfiguration) {
-        this.kubernetesClient = kubernetesClient;
+    public ObserverFactory(
+            FlinkService flinkService, FlinkOperatorConfiguration operatorConfiguration) {
         this.flinkService = flinkService;
         this.operatorConfiguration = operatorConfiguration;
-        this.reconcilerMap = new ConcurrentHashMap<>();
+        this.observerMap = new ConcurrentHashMap<>();
     }
 
-    public Reconciler getOrCreate(FlinkDeployment flinkApp) {
-        return reconcilerMap.computeIfAbsent(
-                getMode(flinkApp),
+    public Observer getOrCreate(FlinkDeployment flinkApp) {
+        return observerMap.computeIfAbsent(
+                Mode.getMode(flinkApp),
                 mode -> {
                     switch (mode) {
                         case SESSION:
-                            return new SessionReconciler(
-                                    kubernetesClient, flinkService, operatorConfiguration);
+                            return new SessionObserver(flinkService, operatorConfiguration);
                         case APPLICATION:
-                            return new JobReconciler(
-                                    kubernetesClient, flinkService, operatorConfiguration);
+                            return new JobObserver(flinkService, operatorConfiguration);
                         default:
                             throw new UnsupportedOperationException(
                                     String.format("Unsupported running mode: %s", mode));
                     }
                 });
     }
-
-    private Mode getMode(FlinkDeployment flinkApp) {
-        return flinkApp.getSpec().getJob() != null ? Mode.APPLICATION : Mode.SESSION;
-    }
-
-    enum Mode {
-        APPLICATION,
-        SESSION
-    }
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SessionObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SessionObserver.java
new file mode 100644
index 0000000..5cb594c
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SessionObserver.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.observer;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+
+/** The observer of the {@link org.apache.flink.kubernetes.operator.config.Mode#SESSION} cluster. */
+public class SessionObserver extends BaseObserver {
+
+    public SessionObserver(
+            FlinkService flinkService, FlinkOperatorConfiguration operatorConfiguration) {
+        super(flinkService, operatorConfiguration);
+    }
+
+    @Override
+    public void observe(FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
+        observeJmDeployment(flinkApp, context, effectiveConfig);
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
index a43febd..2d71459 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
@@ -42,7 +42,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Optional;
 
-import static org.apache.flink.kubernetes.operator.observer.Observer.JOB_STATE_UNKNOWN;
+import static org.apache.flink.kubernetes.operator.observer.BaseObserver.JOB_STATE_UNKNOWN;
 
 /**
  * Reconciler responsible for handling the job lifecycle according to the desired and current
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconcilerFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconcilerFactory.java
index 94050ce..bdcd8e4 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconcilerFactory.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconcilerFactory.java
@@ -19,6 +19,7 @@
 package org.apache.flink.kubernetes.operator.reconciler;
 
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.config.Mode;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 
@@ -47,7 +48,7 @@ public class ReconcilerFactory {
 
     public Reconciler getOrCreate(FlinkDeployment flinkApp) {
         return reconcilerMap.computeIfAbsent(
-                getMode(flinkApp),
+                Mode.getMode(flinkApp),
                 mode -> {
                     switch (mode) {
                         case SESSION:
@@ -62,13 +63,4 @@ public class ReconcilerFactory {
                     }
                 });
     }
-
-    private Mode getMode(FlinkDeployment flinkApp) {
-        return flinkApp.getSpec().getJob() != null ? Mode.APPLICATION : Mode.SESSION;
-    }
-
-    enum Mode {
-        APPLICATION,
-        SESSION
-    }
 }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 658aa18..226186a 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
 import org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
-import org.apache.flink.kubernetes.operator.observer.Observer;
+import org.apache.flink.kubernetes.operator.observer.ObserverFactory;
 import org.apache.flink.kubernetes.operator.reconciler.ReconcilerFactory;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
@@ -446,7 +446,6 @@ public class FlinkDeploymentControllerTest {
 
     private FlinkDeploymentController createTestController(
             KubernetesClient kubernetesClient, TestingFlinkService flinkService) {
-        Observer observer = new Observer(flinkService, operatorConfiguration);
 
         FlinkDeploymentController controller =
                 new FlinkDeploymentController(
@@ -455,9 +454,9 @@ public class FlinkDeploymentControllerTest {
                         kubernetesClient,
                         "test",
                         new DefaultDeploymentValidator(),
-                        observer,
                         new ReconcilerFactory(
-                                kubernetesClient, flinkService, operatorConfiguration));
+                                kubernetesClient, flinkService, operatorConfiguration),
+                        new ObserverFactory(flinkService, operatorConfiguration));
         controller.setControllerConfig(new FlinkControllerConfig(controller));
         return controller;
     }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java
similarity index 83%
rename from flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ObserverTest.java
rename to flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java
index ea15298..c3869a4 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
-import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 
 import io.javaoperatorsdk.operator.api.reconciler.Context;
@@ -35,48 +34,16 @@ import org.junit.jupiter.api.Test;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 
-/** @link Observer unit tests */
-public class ObserverTest {
+/** {@link JobObserver} unit tests. */
+public class JobObserverTest {
 
     private final Context readyContext = TestUtils.createContextWithReadyJobManagerDeployment();
 
     @Test
-    public void observeSessionCluster() {
-        FlinkService flinkService = new TestingFlinkService();
-        Observer observer =
-                new Observer(
-                        flinkService,
-                        FlinkOperatorConfiguration.fromConfiguration(new Configuration()));
-        FlinkDeployment deployment = TestUtils.buildSessionCluster();
-        deployment
-                .getStatus()
-                .getReconciliationStatus()
-                .setLastReconciledSpec(deployment.getSpec());
-
-        observer.observe(
-                deployment,
-                readyContext,
-                FlinkUtils.getEffectiveConfig(deployment, new Configuration()));
-
-        assertEquals(
-                JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
-                deployment.getStatus().getJobManagerDeploymentStatus());
-
-        observer.observe(
-                deployment,
-                readyContext,
-                FlinkUtils.getEffectiveConfig(deployment, new Configuration()));
-
-        assertEquals(
-                JobManagerDeploymentStatus.READY,
-                deployment.getStatus().getJobManagerDeploymentStatus());
-    }
-
-    @Test
     public void observeApplicationCluster() {
         TestingFlinkService flinkService = new TestingFlinkService();
-        Observer observer =
-                new Observer(
+        JobObserver observer =
+                new JobObserver(
                         flinkService,
                         FlinkOperatorConfiguration.fromConfiguration(new Configuration()));
         FlinkDeployment deployment = TestUtils.buildApplicationCluster();
@@ -136,14 +103,15 @@ public class ObserverTest {
         assertEquals(
                 JobManagerDeploymentStatus.READY,
                 deployment.getStatus().getJobManagerDeploymentStatus());
-        assertEquals(Observer.JOB_STATE_UNKNOWN, deployment.getStatus().getJobStatus().getState());
+        assertEquals(
+                BaseObserver.JOB_STATE_UNKNOWN, deployment.getStatus().getJobStatus().getState());
     }
 
     @Test
     public void observeSavepoint() throws Exception {
         TestingFlinkService flinkService = new TestingFlinkService();
-        Observer observer =
-                new Observer(
+        JobObserver observer =
+                new JobObserver(
                         flinkService,
                         FlinkOperatorConfiguration.fromConfiguration(new Configuration()));
         FlinkDeployment deployment = TestUtils.buildApplicationCluster();
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java
new file mode 100644
index 0000000..6e7cb48
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SessionObserverTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.observer;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.TestingFlinkService;
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** {@link SessionObserver} unit tests. */
+public class SessionObserverTest {
+    private final Context readyContext = TestUtils.createContextWithReadyJobManagerDeployment();
+
+    @Test
+    public void observeSessionCluster() {
+        FlinkService flinkService = new TestingFlinkService();
+        SessionObserver observer =
+                new SessionObserver(
+                        flinkService,
+                        FlinkOperatorConfiguration.fromConfiguration(new Configuration()));
+        FlinkDeployment deployment = TestUtils.buildSessionCluster();
+        deployment
+                .getStatus()
+                .getReconciliationStatus()
+                .setLastReconciledSpec(deployment.getSpec());
+
+        observer.observe(
+                deployment,
+                readyContext,
+                FlinkUtils.getEffectiveConfig(deployment, new Configuration()));
+
+        assertEquals(
+                JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
+                deployment.getStatus().getJobManagerDeploymentStatus());
+
+        observer.observe(
+                deployment,
+                readyContext,
+                FlinkUtils.getEffectiveConfig(deployment, new Configuration()));
+
+        assertEquals(
+                JobManagerDeploymentStatus.READY,
+                deployment.getStatus().getJobManagerDeploymentStatus());
+    }
+}