You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/02/16 10:33:58 UTC

[flink-kubernetes-operator] 05/23: Extract Observer and Reconciler logic from controller

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 0fe31c540d9181d4edab61650f7c068f94169166
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Wed Feb 2 14:34:01 2022 +0100

    Extract Observer and Reconciler logic from controller
---
 examples/basic-checkpoint-ha.yaml                  |  43 +++
 examples/basic.yaml                                |   4 +-
 examples/pod-template.yaml                         |   4 +-
 .../controller/FlinkDeploymentController.java      | 321 ++-------------------
 .../controller/observer/JobStatusObserver.java     |  98 +++++++
 .../controller/reconciler/JobReconciler.java       | 184 ++++++++++++
 .../controller/reconciler/SessionReconciler.java   |  75 +++++
 .../kubernetes/operator/utils/FlinkUtils.java      |  55 +++-
 8 files changed, 476 insertions(+), 308 deletions(-)

diff --git a/examples/basic-checkpoint-ha.yaml b/examples/basic-checkpoint-ha.yaml
new file mode 100644
index 0000000..bb3cf41
--- /dev/null
+++ b/examples/basic-checkpoint-ha.yaml
@@ -0,0 +1,43 @@
+apiVersion: flink.io/v1alpha1
+kind: FlinkDeployment
+metadata:
+  namespace: default
+  name: basic-checkpoint-ha-example
+spec:
+  image: flink:1.14.3
+  flinkVersion: 1.14.3
+  flinkConfiguration:
+    taskmanager.numberOfTaskSlots: "2"
+    state.savepoints.dir: file:///flink-data/savepoints
+    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+    high-availability.storageDir: file:///flink-data/ha
+  jobManager:
+    replicas: 1
+    resource:
+      memory: "2048m"
+      cpu: 1
+  taskManager:
+    taskSlots: 2
+    resource:
+      memory: "2048m"
+      cpu: 1
+  podTemplate:
+    spec:
+      serviceAccount: flink-operator
+      containers:
+        - name: flink-main-container
+          volumeMounts:
+          - mountPath: /flink-data
+            name: flink-volume
+      volumes:
+      - name: flink-volume
+        hostPath:
+          # directory location on host
+          path: /tmp/flink
+          # this field is optional
+          type: Directory
+  job:
+    jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
+    parallelism: 2
+    upgradeMode: savepoint
+    state: running
diff --git a/examples/basic.yaml b/examples/basic.yaml
index 9fdc289..38a5a90 100644
--- a/examples/basic.yaml
+++ b/examples/basic.yaml
@@ -9,7 +9,6 @@ spec:
   flinkConfiguration:
     taskmanager.numberOfTaskSlots: "2"
     kubernetes.jobmanager.service-account: flink-operator
-    kubernetes.container-start-command-template: "%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%"
   jobManager:
     replicas: 1
     resource:
@@ -23,5 +22,4 @@ spec:
   job:
     jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
     parallelism: 2
-    cancelMode: none
-    restoreMode: none
+    upgradeMode: stateless
diff --git a/examples/pod-template.yaml b/examples/pod-template.yaml
index 1ce72cd..8563630 100644
--- a/examples/pod-template.yaml
+++ b/examples/pod-template.yaml
@@ -8,13 +8,13 @@ spec:
   flinkVersion: 1.14.3
   flinkConfiguration:
     taskmanager.numberOfTaskSlots: "2"
-    kubernetes.jobmanager.service-account: flink-operator
   podTemplate:
     apiVersion: v1
     kind: Pod
     metadata:
       name: pod-template
     spec:
+      serviceAccount: flink-operator
       containers:
         # Do not change the main container name
         - name: flink-main-container
@@ -55,5 +55,3 @@ spec:
   job:
     jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
     parallelism: 2
-    cancelMode: none
-    restoreMode: none
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index 9a64f04..9cee674 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -1,28 +1,13 @@
 package org.apache.flink.kubernetes.operator.controller;
 
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.client.cli.ApplicationDeployer;
-import org.apache.flink.client.deployment.ClusterClientFactory;
-import org.apache.flink.client.deployment.ClusterClientServiceLoader;
-import org.apache.flink.client.deployment.ClusterDescriptor;
-import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
-import org.apache.flink.client.deployment.application.ApplicationConfiguration;
-import org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer;
-import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
-import org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient;
+import org.apache.flink.kubernetes.operator.controller.observer.JobStatusObserver;
+import org.apache.flink.kubernetes.operator.controller.reconciler.JobReconciler;
+import org.apache.flink.kubernetes.operator.controller.reconciler.SessionReconciler;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.crd.spec.JobState;
-import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
-import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
-import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
-import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
-import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
 import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
 import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
@@ -36,15 +21,10 @@ import io.javaoperatorsdk.operator.processing.event.source.EventSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
 /** Controller that runs the main reconcile loop for Flink deployments. */
 @ControllerConfiguration
@@ -59,300 +39,57 @@ public class FlinkDeploymentController
 
     private final String operatorNamespace;
 
+    private final JobStatusObserver observer = new JobStatusObserver();
+    private final JobReconciler jobReconciler;
+    private final SessionReconciler sessionReconciler;
+
     public FlinkDeploymentController(KubernetesClient kubernetesClient, String namespace) {
         this.kubernetesClient = kubernetesClient;
         this.operatorNamespace = namespace;
+        this.jobReconciler = new JobReconciler(kubernetesClient);
+        this.sessionReconciler = new SessionReconciler(kubernetesClient);
     }
 
     @Override
     public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
         LOG.info("Cleaning up application cluster {}", flinkApp.getMetadata().getName());
-        kubernetesClient
-                .apps()
-                .deployments()
-                .inNamespace(flinkApp.getMetadata().getNamespace())
-                .withName(flinkApp.getMetadata().getName())
-                .cascading(true)
-                .delete();
+        FlinkUtils.deleteCluster(flinkApp, kubernetesClient);
         return DeleteControl.defaultDelete();
     }
 
     @Override
     public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Context context) {
         LOG.info("Reconciling {}", flinkApp.getMetadata().getName());
-        final String namespace = flinkApp.getMetadata().getNamespace();
-        final String clusterId = flinkApp.getMetadata().getName();
 
-        final Configuration effectiveConfig =
-                FlinkUtils.getEffectiveConfig(namespace, clusterId, flinkApp.getSpec());
-        if (flinkApp.getStatus() == null) {
+        Configuration effectiveConfig = FlinkUtils.getEffectiveConfig(flinkApp);
+
+        boolean success = observer.observeFlinkJobStatus(flinkApp, effectiveConfig);
+        if (success) {
             try {
-                flinkApp.setStatus(new FlinkDeploymentStatus());
-                if (flinkApp.getSpec().getJob() != null
-                        && flinkApp.getSpec().getJob().getState().equals(JobState.RUNNING)) {
-                    deployFlinkJob(flinkApp, effectiveConfig, Optional.empty());
-                } else {
-                    deployFlinkSession(flinkApp, effectiveConfig);
-                }
+                success = reconcileFlinkDeployment(flinkApp, effectiveConfig);
             } catch (Exception e) {
-                LOG.error("Error while deploying " + flinkApp.getMetadata().getName());
-                return UpdateControl.<FlinkDeployment>noUpdate()
-                        .rescheduleAfter(JOB_REFRESH_SECONDS, TimeUnit.SECONDS);
-            }
-        } else {
-            boolean validStatus = updateFlinkJobStatus(flinkApp, effectiveConfig);
-            if (validStatus) {
-                try {
-                    reconcileDeploymentChanges(flinkApp, effectiveConfig);
-                } catch (Exception e) {
-                    throw new RuntimeException(
-                            "Could not reconcile deployment change for "
-                                    + flinkApp.getMetadata().getName(),
-                            e);
-                }
+                throw new RuntimeException(
+                        "Error while reconciling deployment change for "
+                                + flinkApp.getMetadata().getName(),
+                        e);
             }
         }
 
-        flinkApp.getStatus().setSpec(flinkApp.getSpec());
+        if (!success) {
+            return UpdateControl.<FlinkDeployment>noUpdate()
+                    .rescheduleAfter(JOB_REFRESH_SECONDS, TimeUnit.SECONDS);
+        }
 
+        flinkApp.getStatus().setSpec(flinkApp.getSpec());
         return UpdateControl.updateStatus(flinkApp)
                 .rescheduleAfter(JOB_REFRESH_SECONDS, TimeUnit.SECONDS);
     }
 
-    private void deployFlinkJob(
-            FlinkDeployment flinkApp, Configuration effectiveConfig, Optional<String> savepoint)
-            throws Exception {
-        LOG.info("Deploying {}", flinkApp.getMetadata().getName());
-        if (savepoint.isPresent()) {
-            effectiveConfig.set(SavepointConfigOptions.SAVEPOINT_PATH, savepoint.get());
-        } else {
-            effectiveConfig.removeConfig(SavepointConfigOptions.SAVEPOINT_PATH);
-        }
-        final ClusterClientServiceLoader clusterClientServiceLoader =
-                new DefaultClusterClientServiceLoader();
-        final ApplicationDeployer deployer =
-                new ApplicationClusterDeployer(clusterClientServiceLoader);
-
-        final ApplicationConfiguration applicationConfiguration =
-                new ApplicationConfiguration(
-                        flinkApp.getSpec().getJob().getArgs(),
-                        flinkApp.getSpec().getJob().getEntryClass());
-
-        deployer.run(effectiveConfig, applicationConfiguration);
-        LOG.info("{} deployed", flinkApp.getMetadata().getName());
-    }
-
-    private void deployFlinkSession(FlinkDeployment flinkApp, Configuration effectiveConfig) {
-        LOG.info("Deploying session cluster {}", flinkApp.getMetadata().getName());
-        final ClusterClientServiceLoader clusterClientServiceLoader =
-                new DefaultClusterClientServiceLoader();
-        final ClusterClientFactory<String> kubernetesClusterClientFactory =
-                clusterClientServiceLoader.getClusterClientFactory(effectiveConfig);
-        try (final ClusterDescriptor<String> kubernetesClusterDescriptor =
-                kubernetesClusterClientFactory.createClusterDescriptor(effectiveConfig)) {
-            kubernetesClusterDescriptor.deploySessionCluster(
-                    kubernetesClusterClientFactory.getClusterSpecification(effectiveConfig));
-        } catch (Exception e) {
-            LOG.error("Failed to deploy {}", flinkApp.getMetadata().getName(), e);
-        }
-        LOG.info("Session cluster {} deployed", flinkApp.getMetadata().getName());
-    }
-
-    private boolean updateFlinkJobStatus(FlinkDeployment flinkApp, Configuration effectiveConfig) {
-        if (flinkApp.getStatus().getSpec().getJob() == null
-                || !flinkApp.getStatus().getSpec().getJob().getState().equals(JobState.RUNNING)) {
-            return true;
-        }
-        LOG.info("Getting job statuses for {}", flinkApp.getMetadata().getName());
-        FlinkDeploymentStatus flinkAppStatus = flinkApp.getStatus();
-        try (ClusterClient<String> clusterClient =
-                FlinkUtils.getRestClusterClient(effectiveConfig)) {
-            Collection<JobStatusMessage> clusterJobStatuses =
-                    clusterClient.listJobs().get(10, TimeUnit.SECONDS);
-            flinkAppStatus.setJobStatuses(
-                    mergeJobStatuses(flinkAppStatus.getJobStatuses(), clusterJobStatuses));
-            if (clusterJobStatuses.isEmpty()) {
-                LOG.info("No jobs found on {} yet, retrying...", flinkApp.getMetadata().getName());
-                return false;
-            } else {
-                LOG.info("Job statuses updated for {}", flinkApp.getMetadata().getName());
-                return true;
-            }
-
-        } catch (Exception e) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Failed to list jobs for {}", flinkApp, e);
-            } else {
-                LOG.warn(
-                        "Failed to list jobs for {}, retrying...",
-                        flinkApp.getMetadata().getName());
-            }
-
-            return false;
-        }
-    }
-
-    /**
-     * Merge previous job statuses with the new ones from the flink job cluster. We match jobs by
-     * their name to preserve savepoint information.
-     *
-     * @return
-     */
-    private List<JobStatus> mergeJobStatuses(
-            List<JobStatus> oldJobStatuses, Collection<JobStatusMessage> clusterJobStatuses) {
-        List<JobStatus> newStatuses =
-                oldJobStatuses != null ? new ArrayList<>(oldJobStatuses) : new ArrayList<>();
-        Map<String, JobStatus> statusMap =
-                newStatuses.stream().collect(Collectors.toMap(JobStatus::getJobName, j -> j));
-
-        clusterJobStatuses.forEach(
-                js -> {
-                    if (statusMap.containsKey(js.getJobName())) {
-                        JobStatus oldStatus = statusMap.get(js.getJobName());
-                        oldStatus.setState(js.getJobState().name());
-                        oldStatus.setJobId(js.getJobId().toHexString());
-                    } else {
-                        newStatuses.add(FlinkUtils.convert(js));
-                    }
-                });
-
-        return newStatuses;
-    }
-
-    private void reconcileDeploymentChanges(FlinkDeployment flinkApp, Configuration effectiveConfig)
-            throws Exception {
-        boolean specChanged = !flinkApp.getSpec().equals(flinkApp.getStatus().getSpec());
-
-        if (specChanged) {
-            if (flinkApp.getStatus().getSpec().getJob() != null) {
-                if (flinkApp.getSpec().getJob() == null) {
-                    throw new RuntimeException("Cannot switch from job to session cluster");
-                }
-                JobState currentJobState = flinkApp.getStatus().getSpec().getJob().getState();
-                JobState desiredJobState = flinkApp.getSpec().getJob().getState();
-
-                UpgradeMode upgradeMode = flinkApp.getSpec().getJob().getUpgradeMode();
-                if (currentJobState == JobState.RUNNING) {
-                    if (desiredJobState == JobState.RUNNING) {
-                        upgradeFlinkJob(flinkApp, effectiveConfig);
-                    }
-                    if (desiredJobState.equals(JobState.SUSPENDED)) {
-                        if (upgradeMode == UpgradeMode.STATELESS) {
-                            cancelJob(flinkApp, effectiveConfig);
-                        } else {
-                            suspendJob(flinkApp, effectiveConfig);
-                        }
-                    }
-                }
-                if (currentJobState == JobState.SUSPENDED) {
-                    if (desiredJobState == JobState.RUNNING) {
-                        if (upgradeMode == UpgradeMode.STATELESS) {
-                            deployFlinkJob(flinkApp, effectiveConfig, Optional.empty());
-                        } else if (upgradeMode == UpgradeMode.SAVEPOINT) {
-                            restoreFromLastSavepoint(flinkApp, effectiveConfig);
-                        } else {
-                            throw new UnsupportedOperationException(
-                                    "Only savepoint and stateless strategies are supported at the moment.");
-                        }
-                    }
-                }
-            } else {
-                if (flinkApp.getSpec().getJob() != null) {
-                    throw new RuntimeException("Cannot switch from session to job cluster");
-                }
-                upgradeSessionCluster(flinkApp, effectiveConfig);
-            }
-        }
-    }
-
-    private void upgradeFlinkJob(FlinkDeployment flinkApp, Configuration effectiveConfig)
-            throws Exception {
-        LOG.info("Upgrading running job");
-        Optional<String> savepoint = cancelJob(flinkApp, effectiveConfig);
-        deployFlinkJob(flinkApp, effectiveConfig, savepoint);
-    }
-
-    private void restoreFromLastSavepoint(FlinkDeployment flinkApp, Configuration effectiveConfig)
-            throws Exception {
-        JobStatus jobStatus = flinkApp.getStatus().getJobStatuses().get(0);
-
-        String savepointLocation = jobStatus.getSavepointLocation();
-        if (savepointLocation == null) {
-            throw new RuntimeException("Cannot perform stateful restore without a valid savepoint");
-        }
-        deployFlinkJob(flinkApp, effectiveConfig, Optional.of(savepointLocation));
-    }
-
-    private Optional<String> suspendJob(FlinkDeployment flinkApp, Configuration effectiveConfig)
-            throws Exception {
-        LOG.info("Suspending {}", flinkApp.getMetadata().getName());
-        JobID jobID = JobID.fromHexString(flinkApp.getStatus().getJobStatuses().get(0).getJobId());
-        return cancelJob(flinkApp, jobID, UpgradeMode.SAVEPOINT, effectiveConfig);
-    }
-
-    private Optional<String> cancelJob(FlinkDeployment flinkApp, Configuration effectiveConfig)
-            throws Exception {
-        LOG.info("Cancelling {}", flinkApp.getMetadata().getName());
-        UpgradeMode upgradeMode = flinkApp.getSpec().getJob().getUpgradeMode();
-        JobID jobID = JobID.fromHexString(flinkApp.getStatus().getJobStatuses().get(0).getJobId());
-        return cancelJob(flinkApp, jobID, upgradeMode, effectiveConfig);
-    }
-
-    private Optional<String> cancelJob(
-            FlinkDeployment flinkApp,
-            JobID jobID,
-            UpgradeMode upgradeMode,
-            Configuration effectiveConfig)
-            throws Exception {
-        Optional<String> ret = Optional.empty();
-        try (ClusterClient<String> clusterClient =
-                FlinkUtils.getRestClusterClient(effectiveConfig)) {
-            switch (upgradeMode) {
-                case STATELESS:
-                    clusterClient.cancel(jobID).get(1, TimeUnit.MINUTES);
-                    break;
-                case SAVEPOINT:
-                    String savepoint =
-                            clusterClient
-                                    .stopWithSavepoint(jobID, false, null)
-                                    .get(1, TimeUnit.MINUTES);
-                    ret = Optional.of(savepoint);
-                    break;
-                default:
-                    throw new RuntimeException("Unsupported upgrade mode " + upgradeMode);
-            }
-        }
-        waitForClusterShutdown(effectiveConfig);
-        JobStatus jobStatus =
-                flinkApp.getStatus().getJobStatuses().stream()
-                        .filter(j -> j.getJobId().equals(jobID.toHexString()))
-                        .findFirst()
-                        .get();
-        jobStatus.setState("suspended");
-        jobStatus.setSavepointLocation(ret.orElse(null));
-        return ret;
-    }
-
-    /** We need this due to the buggy flink kube cluster client behaviour for now. */
-    private void waitForClusterShutdown(Configuration effectiveConfig) throws InterruptedException {
-        Fabric8FlinkKubeClient flinkKubeClient =
-                new Fabric8FlinkKubeClient(
-                        effectiveConfig,
-                        (NamespacedKubernetesClient) kubernetesClient,
-                        Executors.newSingleThreadExecutor());
-        for (int i = 0; i < 60; i++) {
-            if (!flinkKubeClient
-                    .getRestEndpoint(effectiveConfig.get(KubernetesConfigOptions.CLUSTER_ID))
-                    .isPresent()) {
-                break;
-            }
-            LOG.info("Waiting for cluster shutdown... ({})", i);
-            Thread.sleep(1000);
-        }
-    }
-
-    private void upgradeSessionCluster(FlinkDeployment flinkApp, Configuration effectiveConfig) {
-        throw new UnsupportedOperationException("Not implemented yet");
+    private boolean reconcileFlinkDeployment(
+            FlinkDeployment flinkApp, Configuration effectiveConfig) throws Exception {
+        return flinkApp.getSpec().getJob() == null
+                ? sessionReconciler.reconcile(flinkApp, effectiveConfig)
+                : jobReconciler.reconcile(flinkApp, effectiveConfig);
     }
 
     @Override
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/controller/observer/JobStatusObserver.java b/src/main/java/org/apache/flink/kubernetes/operator/controller/observer/JobStatusObserver.java
new file mode 100644
index 0000000..869fec0
--- /dev/null
+++ b/src/main/java/org/apache/flink/kubernetes/operator/controller/observer/JobStatusObserver.java
@@ -0,0 +1,98 @@
+package org.apache.flink.kubernetes.operator.controller.observer;
+
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+import org.apache.flink.runtime.client.JobStatusMessage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/** Observes the actual state of the running jobs on the Flink cluster. */
+public class JobStatusObserver {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JobStatusObserver.class);
+
+    public boolean observeFlinkJobStatus(FlinkDeployment flinkApp, Configuration effectiveConfig) {
+        if (flinkApp.getStatus() == null) {
+            // This is the first run, nothing to observe
+            return true;
+        }
+
+        JobSpec jobSpec = flinkApp.getStatus().getSpec().getJob();
+
+        if (jobSpec == null) {
+            // This is a session cluster, nothing to observe
+            return true;
+        }
+
+        if (!jobSpec.getState().equals(JobState.RUNNING)) {
+            // The job is not running, nothing to observe
+            return true;
+        }
+        LOG.info("Getting job statuses for {}", flinkApp.getMetadata().getName());
+        FlinkDeploymentStatus flinkAppStatus = flinkApp.getStatus();
+        try (ClusterClient<String> clusterClient =
+                FlinkUtils.getRestClusterClient(effectiveConfig)) {
+            Collection<JobStatusMessage> clusterJobStatuses =
+                    clusterClient.listJobs().get(10, TimeUnit.SECONDS);
+            flinkAppStatus.setJobStatuses(
+                    mergeJobStatuses(flinkAppStatus.getJobStatuses(), clusterJobStatuses));
+            if (clusterJobStatuses.isEmpty()) {
+                LOG.info("No jobs found on {} yet, retrying...", flinkApp.getMetadata().getName());
+                return false;
+            } else {
+                LOG.info("Job statuses updated for {}", flinkApp.getMetadata().getName());
+                return true;
+            }
+
+        } catch (Exception e) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Failed to list jobs for {}", flinkApp, e);
+            } else {
+                LOG.warn(
+                        "Failed to list jobs for {}, retrying...",
+                        flinkApp.getMetadata().getName());
+            }
+
+            return false;
+        }
+    }
+
+    /**
+     * Merge previous job statuses with the new ones from the flink job cluster. We match jobs by
+     * their name to preserve savepoint information.
+     */
+    private List<JobStatus> mergeJobStatuses(
+            List<JobStatus> oldJobStatuses, Collection<JobStatusMessage> clusterJobStatuses) {
+        List<JobStatus> newStatuses =
+                oldJobStatuses != null ? new ArrayList<>(oldJobStatuses) : new ArrayList<>();
+        Map<String, JobStatus> statusMap =
+                newStatuses.stream().collect(Collectors.toMap(JobStatus::getJobName, j -> j));
+
+        clusterJobStatuses.forEach(
+                js -> {
+                    if (statusMap.containsKey(js.getJobName())) {
+                        JobStatus oldStatus = statusMap.get(js.getJobName());
+                        oldStatus.setState(js.getJobState().name());
+                        oldStatus.setJobId(js.getJobId().toHexString());
+                    } else {
+                        newStatuses.add(FlinkUtils.convert(js));
+                    }
+                });
+
+        return newStatuses;
+    }
+}
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/JobReconciler.java b/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/JobReconciler.java
new file mode 100644
index 0000000..6c9ea2b
--- /dev/null
+++ b/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/JobReconciler.java
@@ -0,0 +1,184 @@
+package org.apache.flink.kubernetes.operator.controller.reconciler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.cli.ApplicationDeployer;
+import org.apache.flink.client.deployment.ClusterClientServiceLoader;
+import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
+import org.apache.flink.client.deployment.application.ApplicationConfiguration;
+import org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Reconciler responsible for handling the job lifecycle according to the desired and current
+ * states.
+ */
+public class JobReconciler {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JobReconciler.class);
+
+    private final KubernetesClient kubernetesClient;
+
+    public JobReconciler(KubernetesClient kubernetesClient) {
+        this.kubernetesClient = kubernetesClient;
+    }
+
+    public boolean reconcile(FlinkDeployment flinkApp, Configuration effectiveConfig)
+            throws Exception {
+
+        if (flinkApp.getStatus() == null) {
+            flinkApp.setStatus(new FlinkDeploymentStatus());
+            if (!flinkApp.getSpec().getJob().getState().equals(JobState.RUNNING)) {
+                throw new RuntimeException("Job must start in running state");
+            }
+            try {
+                deployFlinkJob(flinkApp, effectiveConfig, Optional.empty());
+                return true;
+            } catch (Exception e) {
+                LOG.error("Error while deploying " + flinkApp.getMetadata().getName());
+                return false;
+            }
+        }
+
+        boolean specChanged = !flinkApp.getSpec().equals(flinkApp.getStatus().getSpec());
+        if (specChanged) {
+            if (flinkApp.getStatus().getSpec().getJob() == null) {
+                throw new RuntimeException("Cannot switch from session to job cluster");
+            }
+            JobState currentJobState = flinkApp.getStatus().getSpec().getJob().getState();
+            JobState desiredJobState = flinkApp.getSpec().getJob().getState();
+
+            UpgradeMode upgradeMode = flinkApp.getSpec().getJob().getUpgradeMode();
+            if (currentJobState == JobState.RUNNING) {
+                if (desiredJobState == JobState.RUNNING) {
+                    upgradeFlinkJob(flinkApp, effectiveConfig);
+                }
+                if (desiredJobState.equals(JobState.SUSPENDED)) {
+                    if (upgradeMode == UpgradeMode.STATELESS) {
+                        cancelJob(flinkApp, effectiveConfig);
+                    } else {
+                        suspendJob(flinkApp, effectiveConfig);
+                    }
+                }
+            }
+            if (currentJobState == JobState.SUSPENDED) {
+                if (desiredJobState == JobState.RUNNING) {
+                    if (upgradeMode == UpgradeMode.STATELESS) {
+                        deployFlinkJob(flinkApp, effectiveConfig, Optional.empty());
+                    } else if (upgradeMode == UpgradeMode.SAVEPOINT) {
+                        restoreFromLastSavepoint(flinkApp, effectiveConfig);
+                    } else {
+                        throw new UnsupportedOperationException(
+                                "Only savepoint and stateless strategies are supported at the moment.");
+                    }
+                }
+            }
+        }
+        return true;
+    }
+
+    private void deployFlinkJob(
+            FlinkDeployment flinkApp, Configuration effectiveConfig, Optional<String> savepoint)
+            throws Exception {
+        LOG.info("Deploying {}", flinkApp.getMetadata().getName());
+        if (savepoint.isPresent()) {
+            effectiveConfig.set(SavepointConfigOptions.SAVEPOINT_PATH, savepoint.get());
+        } else {
+            effectiveConfig.removeConfig(SavepointConfigOptions.SAVEPOINT_PATH);
+        }
+        final ClusterClientServiceLoader clusterClientServiceLoader =
+                new DefaultClusterClientServiceLoader();
+        final ApplicationDeployer deployer =
+                new ApplicationClusterDeployer(clusterClientServiceLoader);
+
+        final ApplicationConfiguration applicationConfiguration =
+                new ApplicationConfiguration(
+                        flinkApp.getSpec().getJob().getArgs(),
+                        flinkApp.getSpec().getJob().getEntryClass());
+
+        deployer.run(effectiveConfig, applicationConfiguration);
+        LOG.info("{} deployed", flinkApp.getMetadata().getName());
+    }
+
+    private void upgradeFlinkJob(FlinkDeployment flinkApp, Configuration effectiveConfig)
+            throws Exception {
+        LOG.info("Upgrading running job");
+        Optional<String> savepoint = cancelJob(flinkApp, effectiveConfig);
+        deployFlinkJob(flinkApp, effectiveConfig, savepoint);
+    }
+
+    private void restoreFromLastSavepoint(FlinkDeployment flinkApp, Configuration effectiveConfig)
+            throws Exception {
+        JobStatus jobStatus = flinkApp.getStatus().getJobStatuses().get(0);
+
+        String savepointLocation = jobStatus.getSavepointLocation();
+        if (savepointLocation == null) {
+            throw new RuntimeException("Cannot perform stateful restore without a valid savepoint");
+        }
+        deployFlinkJob(flinkApp, effectiveConfig, Optional.of(savepointLocation));
+    }
+
+    private Optional<String> suspendJob(FlinkDeployment flinkApp, Configuration effectiveConfig)
+            throws Exception {
+        LOG.info("Suspending {}", flinkApp.getMetadata().getName());
+        JobID jobID = JobID.fromHexString(flinkApp.getStatus().getJobStatuses().get(0).getJobId());
+        return cancelJob(flinkApp, jobID, UpgradeMode.SAVEPOINT, effectiveConfig);
+    }
+
+    private Optional<String> cancelJob(FlinkDeployment flinkApp, Configuration effectiveConfig)
+            throws Exception {
+        LOG.info("Cancelling {}", flinkApp.getMetadata().getName());
+        UpgradeMode upgradeMode = flinkApp.getSpec().getJob().getUpgradeMode();
+        JobID jobID = JobID.fromHexString(flinkApp.getStatus().getJobStatuses().get(0).getJobId());
+        return cancelJob(flinkApp, jobID, upgradeMode, effectiveConfig);
+    }
+
+    private Optional<String> cancelJob(
+            FlinkDeployment flinkApp,
+            JobID jobID,
+            UpgradeMode upgradeMode,
+            Configuration effectiveConfig)
+            throws Exception {
+        Optional<String> ret = Optional.empty();
+        try (ClusterClient<String> clusterClient =
+                FlinkUtils.getRestClusterClient(effectiveConfig)) {
+            switch (upgradeMode) {
+                case STATELESS:
+                    clusterClient.cancel(jobID).get(1, TimeUnit.MINUTES);
+                    break;
+                case SAVEPOINT:
+                    String savepoint =
+                            clusterClient
+                                    .stopWithSavepoint(jobID, false, null)
+                                    .get(1, TimeUnit.MINUTES);
+                    ret = Optional.of(savepoint);
+                    break;
+                default:
+                    throw new RuntimeException("Unsupported upgrade mode " + upgradeMode);
+            }
+        }
+        FlinkUtils.waitForClusterShutdown(kubernetesClient, effectiveConfig);
+        JobStatus jobStatus =
+                flinkApp.getStatus().getJobStatuses().stream()
+                        .filter(j -> j.getJobId().equals(jobID.toHexString()))
+                        .findFirst()
+                        .get();
+        jobStatus.setState("suspended");
+        jobStatus.setSavepointLocation(ret.orElse(null));
+        return ret;
+    }
+}
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/SessionReconciler.java b/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/SessionReconciler.java
new file mode 100644
index 0000000..4984f08
--- /dev/null
+++ b/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/SessionReconciler.java
@@ -0,0 +1,75 @@
+package org.apache.flink.kubernetes.operator.controller.reconciler;
+
+import org.apache.flink.client.deployment.ClusterClientFactory;
+import org.apache.flink.client.deployment.ClusterClientServiceLoader;
+import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Reconciler responsible for handling the session cluster lifecycle according to the desired and
+ * current states.
+ */
+public class SessionReconciler {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SessionReconciler.class);
+
+    private final KubernetesClient kubernetesClient;
+
+    public SessionReconciler(KubernetesClient kubernetesClient) {
+        this.kubernetesClient = kubernetesClient;
+    }
+
+    public boolean reconcile(FlinkDeployment flinkApp, Configuration effectiveConfig)
+            throws Exception {
+        if (flinkApp.getStatus() == null) {
+            flinkApp.setStatus(new FlinkDeploymentStatus());
+            try {
+                deployFlinkSession(flinkApp, effectiveConfig);
+                return true;
+            } catch (Exception e) {
+                LOG.error("Error while deploying " + flinkApp.getMetadata().getName());
+                return false;
+            }
+        }
+
+        boolean specChanged = !flinkApp.getSpec().equals(flinkApp.getStatus().getSpec());
+
+        if (specChanged) {
+            if (flinkApp.getStatus().getSpec().getJob() != null) {
+                throw new RuntimeException("Cannot switch from job to session cluster");
+            }
+            upgradeSessionCluster(flinkApp, effectiveConfig);
+        }
+        return true;
+    }
+
+    private void deployFlinkSession(FlinkDeployment flinkApp, Configuration effectiveConfig)
+            throws Exception {
+        LOG.info("Deploying session cluster {}", flinkApp.getMetadata().getName());
+        final ClusterClientServiceLoader clusterClientServiceLoader =
+                new DefaultClusterClientServiceLoader();
+        final ClusterClientFactory<String> kubernetesClusterClientFactory =
+                clusterClientServiceLoader.getClusterClientFactory(effectiveConfig);
+        try (final ClusterDescriptor<String> kubernetesClusterDescriptor =
+                kubernetesClusterClientFactory.createClusterDescriptor(effectiveConfig)) {
+            kubernetesClusterDescriptor.deploySessionCluster(
+                    kubernetesClusterClientFactory.getClusterSpecification(effectiveConfig));
+        }
+        LOG.info("Session cluster {} deployed", flinkApp.getMetadata().getName());
+    }
+
+    private void upgradeSessionCluster(FlinkDeployment flinkApp, Configuration effectiveConfig)
+            throws Exception {
+        FlinkUtils.deleteCluster(flinkApp, kubernetesClient);
+        FlinkUtils.waitForClusterShutdown(kubernetesClient, effectiveConfig);
+        deployFlinkSession(flinkApp, effectiveConfig);
+    }
+}
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index f31c4d4..007b5af 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -13,6 +13,8 @@ import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
+import org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.runtime.client.JobStatusMessage;
@@ -20,6 +22,8 @@ import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClie
 import org.apache.flink.util.StringUtils;
 
 import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
 import io.fabric8.kubernetes.client.internal.SerializationUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -29,23 +33,24 @@ import java.io.IOException;
 import java.net.URI;
 import java.nio.file.Files;
 import java.util.Collections;
+import java.util.concurrent.Executors;
 
 /** Flink Utility methods used by the operator. */
 public class FlinkUtils {
 
     private static final Logger LOG = LoggerFactory.getLogger(FlinkUtils.class);
 
-    public static Configuration getEffectiveConfig(
-            String namespace, String clusterId, FlinkDeploymentSpec spec) {
-        try {
-            final String flinkConfDir = System.getenv().get(ConfigConstants.ENV_FLINK_CONF_DIR);
-            final Configuration effectiveConfig;
+    public static Configuration getEffectiveConfig(FlinkDeployment flinkApp) {
+        String namespace = flinkApp.getMetadata().getNamespace();
+        String clusterId = flinkApp.getMetadata().getName();
+        FlinkDeploymentSpec spec = flinkApp.getSpec();
 
-            if (flinkConfDir != null) {
-                effectiveConfig = GlobalConfiguration.loadConfiguration(flinkConfDir);
-            } else {
-                effectiveConfig = new Configuration();
-            }
+        try {
+            String flinkConfDir = System.getenv().get(ConfigConstants.ENV_FLINK_CONF_DIR);
+            Configuration effectiveConfig =
+                    flinkConfDir != null
+                            ? GlobalConfiguration.loadConfiguration(flinkConfDir)
+                            : new Configuration();
 
             effectiveConfig.setString(KubernetesConfigOptions.NAMESPACE, namespace);
             effectiveConfig.setString(KubernetesConfigOptions.CLUSTER_ID, clusterId);
@@ -164,4 +169,34 @@ public class FlinkUtils {
         jobStatus.setState(message.getJobState().name());
         return jobStatus;
     }
+
+    public static void deleteCluster(FlinkDeployment flinkApp, KubernetesClient kubernetesClient) {
+        kubernetesClient
+                .apps()
+                .deployments()
+                .inNamespace(flinkApp.getMetadata().getNamespace())
+                .withName(flinkApp.getMetadata().getName())
+                .cascading(true)
+                .delete();
+    }
+
+    /** We need this due to the buggy flink kube cluster client behaviour for now. */
+    public static void waitForClusterShutdown(
+            KubernetesClient kubernetesClient, Configuration effectiveConfig)
+            throws InterruptedException {
+        Fabric8FlinkKubeClient flinkKubeClient =
+                new Fabric8FlinkKubeClient(
+                        effectiveConfig,
+                        (NamespacedKubernetesClient) kubernetesClient,
+                        Executors.newSingleThreadExecutor());
+        for (int i = 0; i < 60; i++) {
+            if (!flinkKubeClient
+                    .getRestEndpoint(effectiveConfig.get(KubernetesConfigOptions.CLUSTER_ID))
+                    .isPresent()) {
+                break;
+            }
+            LOG.info("Waiting for cluster shutdown... ({})", i);
+            Thread.sleep(1000);
+        }
+    }
 }