You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/02/16 10:34:06 UTC

[flink-kubernetes-operator] 13/23: Introduce FlinkService for cluster interactions

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 5ea10e385ed3a9b16e60cef3d012f4c8ee5a008c
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Mon Feb 7 14:43:04 2022 +0100

    Introduce FlinkService for cluster interactions
    
    * Extract FlinkService for cluster interactions
    
    * Update some dependencies
---
 pom.xml                                            |   8 +-
 .../flink/kubernetes/operator/FlinkOperator.java   |  20 ++-
 .../controller/FlinkDeploymentController.java      |  22 +--
 .../observer/JobStatusObserver.java                |  17 ++-
 .../{controller => }/reconciler/JobReconciler.java |  64 ++-------
 .../reconciler/SessionReconciler.java              |  34 ++---
 .../kubernetes/operator/service/FlinkService.java  | 155 +++++++++++++++++++++
 .../kubernetes/operator/utils/FlinkUtils.java      |  41 ------
 8 files changed, 222 insertions(+), 139 deletions(-)

diff --git a/pom.xml b/pom.xml
index ee19b02..4838678 100644
--- a/pom.xml
+++ b/pom.xml
@@ -32,14 +32,14 @@ under the License.
         <maven-failsafe-plugin.version>3.0.0-M4</maven-failsafe-plugin.version>
 
         <operator.sdk.version>2.0.1</operator.sdk.version>
-        <fabric8.version>5.11.2</fabric8.version>
-        <lombok.version>1.18.10</lombok.version>
+        <fabric8.version>5.12.1</fabric8.version>
+        <lombok.version>1.18.22</lombok.version>
 
         <scala.version>2.12</scala.version>
         <flink.version>1.14.3</flink.version>
 
         <slf4j.version>1.7.15</slf4j.version>
-        <log4j.version>2.17.0</log4j.version>
+        <log4j.version>2.17.1</log4j.version>
 
         <spotless.version>2.4.2</spotless.version>
         <awaitility.version>4.1.0</awaitility.version>
@@ -311,7 +311,7 @@ under the License.
       						<exclude>**/target/**</exclude>
       						<exclude>apache-maven-3.2.5/**</exclude>
       						<!-- PyCharm -->
-      						<exclude>**/.idea/**</exclude>      
+      						<exclude>**/.idea/**</exclude>
       					</excludes>
       				</configuration>
       			</plugin>
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index a930d09..3a52f82 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -18,9 +18,12 @@
 package org.apache.flink.kubernetes.operator;
 
 import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
+import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
+import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
+import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
 
 import io.fabric8.kubernetes.client.DefaultKubernetesClient;
-import io.fabric8.kubernetes.client.KubernetesClient;
 import io.javaoperatorsdk.operator.Operator;
 import io.javaoperatorsdk.operator.api.config.ConfigurationServiceOverrider;
 import io.javaoperatorsdk.operator.config.runtime.DefaultConfigurationService;
@@ -41,7 +44,7 @@ public class FlinkOperator {
 
         LOG.info("Starting Flink Kubernetes Operator");
 
-        KubernetesClient client = new DefaultKubernetesClient();
+        DefaultKubernetesClient client = new DefaultKubernetesClient();
         String namespace = client.getNamespace();
         if (namespace == null) {
             namespace = "default";
@@ -51,7 +54,18 @@ public class FlinkOperator {
                         client,
                         new ConfigurationServiceOverrider(DefaultConfigurationService.instance())
                                 .build());
-        operator.register(new FlinkDeploymentController(client, namespace));
+
+        FlinkService flinkService = new FlinkService(client);
+
+        JobStatusObserver observer = new JobStatusObserver(flinkService);
+        JobReconciler jobReconciler = new JobReconciler(client, flinkService);
+        SessionReconciler sessionReconciler = new SessionReconciler(client, flinkService);
+
+        FlinkDeploymentController controller =
+                new FlinkDeploymentController(
+                        client, namespace, observer, jobReconciler, sessionReconciler);
+
+        operator.register(controller);
         operator.installShutdownHook();
         operator.start();
 
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index 94d5b4d..5ab08e5 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -18,10 +18,10 @@
 package org.apache.flink.kubernetes.operator.controller;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.kubernetes.operator.controller.observer.JobStatusObserver;
-import org.apache.flink.kubernetes.operator.controller.reconciler.JobReconciler;
-import org.apache.flink.kubernetes.operator.controller.reconciler.SessionReconciler;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
+import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
+import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
@@ -56,15 +56,21 @@ public class FlinkDeploymentController
 
     private final String operatorNamespace;
 
-    private final JobStatusObserver observer = new JobStatusObserver();
+    private final JobStatusObserver observer;
     private final JobReconciler jobReconciler;
     private final SessionReconciler sessionReconciler;
 
-    public FlinkDeploymentController(KubernetesClient kubernetesClient, String namespace) {
+    public FlinkDeploymentController(
+            KubernetesClient kubernetesClient,
+            String operatorNamespace,
+            JobStatusObserver observer,
+            JobReconciler jobReconciler,
+            SessionReconciler sessionReconciler) {
         this.kubernetesClient = kubernetesClient;
-        this.operatorNamespace = namespace;
-        this.jobReconciler = new JobReconciler(kubernetesClient);
-        this.sessionReconciler = new SessionReconciler(kubernetesClient);
+        this.operatorNamespace = operatorNamespace;
+        this.observer = observer;
+        this.jobReconciler = jobReconciler;
+        this.sessionReconciler = sessionReconciler;
     }
 
     @Override
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/controller/observer/JobStatusObserver.java b/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
similarity index 91%
rename from src/main/java/org/apache/flink/kubernetes/operator/controller/observer/JobStatusObserver.java
rename to src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
index e1e6504..115d7f5 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/controller/observer/JobStatusObserver.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
@@ -15,16 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.controller.observer;
+package org.apache.flink.kubernetes.operator.observer;
 
-import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
-import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.runtime.client.JobStatusMessage;
 
 import org.slf4j.Logger;
@@ -34,13 +33,18 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 
 /** Observes the actual state of the running jobs on the Flink cluster. */
 public class JobStatusObserver {
 
     private static final Logger LOG = LoggerFactory.getLogger(JobStatusObserver.class);
 
+    private final FlinkService flinkService;
+
+    public JobStatusObserver(FlinkService flinkService) {
+        this.flinkService = flinkService;
+    }
+
     public boolean observeFlinkJobStatus(FlinkDeployment flinkApp, Configuration effectiveConfig) {
         if (flinkApp.getStatus() == null) {
             // This is the first run, nothing to observe
@@ -60,10 +64,9 @@ public class JobStatusObserver {
         }
         LOG.info("Getting job statuses for {}", flinkApp.getMetadata().getName());
         FlinkDeploymentStatus flinkAppStatus = flinkApp.getStatus();
-        try (ClusterClient<String> clusterClient =
-                FlinkUtils.getRestClusterClient(effectiveConfig)) {
+        try {
             Collection<JobStatusMessage> clusterJobStatuses =
-                    clusterClient.listJobs().get(10, TimeUnit.SECONDS);
+                    flinkService.listJobs(effectiveConfig);
             if (clusterJobStatuses.isEmpty()) {
                 LOG.info("No jobs found on {} yet, retrying...", flinkApp.getMetadata().getName());
                 return false;
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/JobReconciler.java b/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
similarity index 70%
rename from src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/JobReconciler.java
rename to src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
index 2ae431d..1dc7184 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/JobReconciler.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
@@ -15,15 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.controller.reconciler;
+package org.apache.flink.kubernetes.operator.reconciler;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.client.cli.ApplicationDeployer;
-import org.apache.flink.client.deployment.ClusterClientServiceLoader;
-import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
-import org.apache.flink.client.deployment.application.ApplicationConfiguration;
-import org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer;
-import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
@@ -31,7 +25,7 @@ import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
-import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.KubernetesUtils;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
 
@@ -40,7 +34,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Optional;
-import java.util.concurrent.TimeUnit;
 
 /**
  * Reconciler responsible for handling the job lifecycle according to the desired and current
@@ -51,9 +44,11 @@ public class JobReconciler {
     private static final Logger LOG = LoggerFactory.getLogger(JobReconciler.class);
 
     private final KubernetesClient kubernetesClient;
+    private final FlinkService flinkService;
 
-    public JobReconciler(KubernetesClient kubernetesClient) {
+    public JobReconciler(KubernetesClient kubernetesClient, FlinkService flinkService) {
         this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
     }
 
     public boolean reconcile(FlinkDeployment flinkApp, Configuration effectiveConfig)
@@ -118,24 +113,12 @@ public class JobReconciler {
     private void deployFlinkJob(
             FlinkDeployment flinkApp, Configuration effectiveConfig, Optional<String> savepoint)
             throws Exception {
-        LOG.info("Deploying {}", flinkApp.getMetadata().getName());
         if (savepoint.isPresent()) {
             effectiveConfig.set(SavepointConfigOptions.SAVEPOINT_PATH, savepoint.get());
         } else {
             effectiveConfig.removeConfig(SavepointConfigOptions.SAVEPOINT_PATH);
         }
-        final ClusterClientServiceLoader clusterClientServiceLoader =
-                new DefaultClusterClientServiceLoader();
-        final ApplicationDeployer deployer =
-                new ApplicationClusterDeployer(clusterClientServiceLoader);
-
-        final ApplicationConfiguration applicationConfiguration =
-                new ApplicationConfiguration(
-                        flinkApp.getSpec().getJob().getArgs(),
-                        flinkApp.getSpec().getJob().getEntryClass());
-
-        deployer.run(effectiveConfig, applicationConfiguration);
-        LOG.info("{} deployed", flinkApp.getMetadata().getName());
+        flinkService.submitApplicationCluster(flinkApp, effectiveConfig);
     }
 
     private void upgradeFlinkJob(FlinkDeployment flinkApp, Configuration effectiveConfig)
@@ -159,43 +142,24 @@ public class JobReconciler {
     private Optional<String> suspendJob(FlinkDeployment flinkApp, Configuration effectiveConfig)
             throws Exception {
         LOG.info("Suspending {}", flinkApp.getMetadata().getName());
-        JobID jobID = JobID.fromHexString(flinkApp.getStatus().getJobStatus().getJobId());
-        return cancelJob(flinkApp, jobID, UpgradeMode.SAVEPOINT, effectiveConfig);
+        return cancelJob(flinkApp, UpgradeMode.SAVEPOINT, effectiveConfig);
     }
 
     private Optional<String> cancelJob(FlinkDeployment flinkApp, Configuration effectiveConfig)
             throws Exception {
         LOG.info("Cancelling {}", flinkApp.getMetadata().getName());
         UpgradeMode upgradeMode = flinkApp.getSpec().getJob().getUpgradeMode();
-        JobID jobID = JobID.fromHexString(flinkApp.getStatus().getJobStatus().getJobId());
-        return cancelJob(flinkApp, jobID, upgradeMode, effectiveConfig);
+        return cancelJob(flinkApp, upgradeMode, effectiveConfig);
     }
 
     private Optional<String> cancelJob(
-            FlinkDeployment flinkApp,
-            JobID jobID,
-            UpgradeMode upgradeMode,
-            Configuration effectiveConfig)
+            FlinkDeployment flinkApp, UpgradeMode upgradeMode, Configuration effectiveConfig)
             throws Exception {
-        Optional<String> savepointOpt = Optional.empty();
-        try (ClusterClient<String> clusterClient =
-                FlinkUtils.getRestClusterClient(effectiveConfig)) {
-            switch (upgradeMode) {
-                case STATELESS:
-                    clusterClient.cancel(jobID).get(1, TimeUnit.MINUTES);
-                    break;
-                case SAVEPOINT:
-                    String savepoint =
-                            clusterClient
-                                    .stopWithSavepoint(jobID, false, null)
-                                    .get(1, TimeUnit.MINUTES);
-                    savepointOpt = Optional.of(savepoint);
-                    break;
-                default:
-                    throw new RuntimeException("Unsupported upgrade mode " + upgradeMode);
-            }
-        }
-        FlinkUtils.waitForClusterShutdown(kubernetesClient, effectiveConfig);
+        Optional<String> savepointOpt =
+                flinkService.cancelJob(
+                        JobID.fromHexString(flinkApp.getStatus().getJobStatus().getJobId()),
+                        upgradeMode,
+                        effectiveConfig);
         JobStatus jobStatus = flinkApp.getStatus().getJobStatus();
         jobStatus.setState("suspended");
         savepointOpt.ifPresent(jobStatus::setSavepointLocation);
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/SessionReconciler.java b/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
similarity index 61%
rename from src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/SessionReconciler.java
rename to src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
index 04a0568..374ee24 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/SessionReconciler.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
@@ -15,16 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.controller.reconciler;
+package org.apache.flink.kubernetes.operator.reconciler;
 
-import org.apache.flink.client.deployment.ClusterClientFactory;
-import org.apache.flink.client.deployment.ClusterClientServiceLoader;
-import org.apache.flink.client.deployment.ClusterDescriptor;
-import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
-import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.KubernetesUtils;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
@@ -40,9 +36,11 @@ public class SessionReconciler {
     private static final Logger LOG = LoggerFactory.getLogger(SessionReconciler.class);
 
     private final KubernetesClient kubernetesClient;
+    private final FlinkService flinkService;
 
-    public SessionReconciler(KubernetesClient kubernetesClient) {
+    public SessionReconciler(KubernetesClient kubernetesClient, FlinkService flinkService) {
         this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
     }
 
     public boolean reconcile(FlinkDeployment flinkApp, Configuration effectiveConfig)
@@ -50,7 +48,7 @@ public class SessionReconciler {
         if (flinkApp.getStatus() == null) {
             flinkApp.setStatus(new FlinkDeploymentStatus());
             try {
-                deployFlinkSession(flinkApp, effectiveConfig);
+                flinkService.submitSessionCluster(flinkApp, effectiveConfig);
                 KubernetesUtils.deployIngress(flinkApp, effectiveConfig, kubernetesClient);
                 return true;
             } catch (Exception e) {
@@ -70,25 +68,9 @@ public class SessionReconciler {
         return true;
     }
 
-    private void deployFlinkSession(FlinkDeployment flinkApp, Configuration effectiveConfig)
-            throws Exception {
-        LOG.info("Deploying session cluster {}", flinkApp.getMetadata().getName());
-        final ClusterClientServiceLoader clusterClientServiceLoader =
-                new DefaultClusterClientServiceLoader();
-        final ClusterClientFactory<String> kubernetesClusterClientFactory =
-                clusterClientServiceLoader.getClusterClientFactory(effectiveConfig);
-        try (final ClusterDescriptor<String> kubernetesClusterDescriptor =
-                kubernetesClusterClientFactory.createClusterDescriptor(effectiveConfig)) {
-            kubernetesClusterDescriptor.deploySessionCluster(
-                    kubernetesClusterClientFactory.getClusterSpecification(effectiveConfig));
-        }
-        LOG.info("Session cluster {} deployed", flinkApp.getMetadata().getName());
-    }
-
     private void upgradeSessionCluster(FlinkDeployment flinkApp, Configuration effectiveConfig)
             throws Exception {
-        FlinkUtils.deleteCluster(flinkApp, kubernetesClient);
-        FlinkUtils.waitForClusterShutdown(kubernetesClient, effectiveConfig);
-        deployFlinkSession(flinkApp, effectiveConfig);
+        flinkService.stopSessionCluster(flinkApp, effectiveConfig);
+        flinkService.submitSessionCluster(flinkApp, effectiveConfig);
     }
 }
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
new file mode 100644
index 0000000..d17ce1e
--- /dev/null
+++ b/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.service;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.cli.ApplicationDeployer;
+import org.apache.flink.client.deployment.ClusterClientFactory;
+import org.apache.flink.client.deployment.ClusterClientServiceLoader;
+import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
+import org.apache.flink.client.deployment.application.ApplicationConfiguration;
+import org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
+
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/** Service for submitting and interacting with Flink clusters and jobs. */
+public class FlinkService {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkService.class);
+
+    private final NamespacedKubernetesClient kubernetesClient;
+
+    public FlinkService(NamespacedKubernetesClient kubernetesClient) {
+        this.kubernetesClient = kubernetesClient;
+    }
+
+    public void submitApplicationCluster(FlinkDeployment deployment, Configuration conf)
+            throws Exception {
+        LOG.info("Deploying application cluster {}", deployment.getMetadata().getName());
+        final ClusterClientServiceLoader clusterClientServiceLoader =
+                new DefaultClusterClientServiceLoader();
+        final ApplicationDeployer deployer =
+                new ApplicationClusterDeployer(clusterClientServiceLoader);
+
+        final ApplicationConfiguration applicationConfiguration =
+                new ApplicationConfiguration(
+                        deployment.getSpec().getJob().getArgs(),
+                        deployment.getSpec().getJob().getEntryClass());
+
+        deployer.run(conf, applicationConfiguration);
+        LOG.info("Application cluster {} deployed", deployment.getMetadata().getName());
+    }
+
+    public void submitSessionCluster(FlinkDeployment deployment, Configuration conf)
+            throws Exception {
+        LOG.info("Deploying session cluster {}", deployment.getMetadata().getName());
+        final ClusterClientServiceLoader clusterClientServiceLoader =
+                new DefaultClusterClientServiceLoader();
+        final ClusterClientFactory<String> kubernetesClusterClientFactory =
+                clusterClientServiceLoader.getClusterClientFactory(conf);
+        try (final ClusterDescriptor<String> kubernetesClusterDescriptor =
+                kubernetesClusterClientFactory.createClusterDescriptor(conf)) {
+            kubernetesClusterDescriptor.deploySessionCluster(
+                    kubernetesClusterClientFactory.getClusterSpecification(conf));
+        }
+        LOG.info("Session cluster {} deployed", deployment.getMetadata().getName());
+    }
+
+    public Collection<JobStatusMessage> listJobs(Configuration conf) throws Exception {
+        try (ClusterClient<String> clusterClient = getClusterClient(conf)) {
+            return clusterClient.listJobs().get(10, TimeUnit.SECONDS);
+        }
+    }
+
+    private ClusterClient<String> getClusterClient(Configuration config) throws Exception {
+        final String clusterId = config.get(KubernetesConfigOptions.CLUSTER_ID);
+        final String namespace = config.get(KubernetesConfigOptions.NAMESPACE);
+        final int port = config.getInteger(RestOptions.PORT);
+        final String host =
+                config.getString(
+                        RestOptions.ADDRESS, String.format("%s-rest.%s", clusterId, namespace));
+        final String restServerAddress = String.format("http://%s:%s", host, port);
+        LOG.debug("Creating RestClusterClient({})", restServerAddress);
+        return new RestClusterClient<>(
+                config, clusterId, (c, e) -> new StandaloneClientHAServices(restServerAddress));
+    }
+
+    public Optional<String> cancelJob(JobID jobID, UpgradeMode upgradeMode, Configuration conf)
+            throws Exception {
+        Optional<String> savepointOpt = Optional.empty();
+        try (ClusterClient<String> clusterClient = getClusterClient(conf)) {
+            switch (upgradeMode) {
+                case STATELESS:
+                    clusterClient.cancel(jobID).get(1, TimeUnit.MINUTES);
+                    break;
+                case SAVEPOINT:
+                    String savepoint =
+                            clusterClient
+                                    .stopWithSavepoint(jobID, false, null)
+                                    .get(1, TimeUnit.MINUTES);
+                    savepointOpt = Optional.of(savepoint);
+                    break;
+                default:
+                    throw new RuntimeException("Unsupported upgrade mode " + upgradeMode);
+            }
+        }
+        waitForClusterShutdown(conf);
+        return savepointOpt;
+    }
+
+    public void stopSessionCluster(FlinkDeployment deployment, Configuration conf)
+            throws Exception {
+        FlinkUtils.deleteCluster(deployment, kubernetesClient);
+        waitForClusterShutdown(conf);
+    }
+
+    /** We need this due to the buggy flink kube cluster client behaviour for now. */
+    private void waitForClusterShutdown(Configuration conf) throws Exception {
+        Fabric8FlinkKubeClient flinkKubeClient =
+                new Fabric8FlinkKubeClient(
+                        conf, kubernetesClient, Executors.newSingleThreadExecutor());
+        for (int i = 0; i < 60; i++) {
+            if (!flinkKubeClient
+                    .getRestEndpoint(conf.get(KubernetesConfigOptions.CLUSTER_ID))
+                    .isPresent()) {
+                break;
+            }
+            LOG.info("Waiting for cluster shutdown... ({})", i);
+            Thread.sleep(1000);
+        }
+    }
+}
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index 18fd3c1..e9fbac0 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -17,8 +17,6 @@
 
 package org.apache.flink.kubernetes.operator.utils;
 
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
@@ -26,14 +24,11 @@ import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.PipelineOptions;
-import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
-import org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
-import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
 import org.apache.flink.util.StringUtils;
 
 import com.fasterxml.jackson.databind.JsonNode;
@@ -42,7 +37,6 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.client.KubernetesClient;
-import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
 import io.fabric8.kubernetes.client.internal.SerializationUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,7 +47,6 @@ import java.net.URI;
 import java.nio.file.Files;
 import java.util.Collections;
 import java.util.Iterator;
-import java.util.concurrent.Executors;
 
 /** Flink Utility methods used by the operator. */
 public class FlinkUtils {
@@ -224,20 +217,6 @@ public class FlinkUtils {
         }
     }
 
-    public static ClusterClient<String> getRestClusterClient(Configuration config)
-            throws Exception {
-        final String clusterId = config.get(KubernetesConfigOptions.CLUSTER_ID);
-        final String namespace = config.get(KubernetesConfigOptions.NAMESPACE);
-        final int port = config.getInteger(RestOptions.PORT);
-        final String host =
-                config.getString(
-                        RestOptions.ADDRESS, String.format("%s-rest.%s", clusterId, namespace));
-        final String restServerAddress = String.format("http://%s:%s", host, port);
-        LOG.info("Creating RestClusterClient({})", restServerAddress);
-        return new RestClusterClient<>(
-                config, clusterId, (c, e) -> new StandaloneClientHAServices(restServerAddress));
-    }
-
     public static void deleteCluster(FlinkDeployment flinkApp, KubernetesClient kubernetesClient) {
         kubernetesClient
                 .apps()
@@ -247,24 +226,4 @@ public class FlinkUtils {
                 .cascading(true)
                 .delete();
     }
-
-    /** We need this due to the buggy flink kube cluster client behaviour for now. */
-    public static void waitForClusterShutdown(
-            KubernetesClient kubernetesClient, Configuration effectiveConfig)
-            throws InterruptedException {
-        Fabric8FlinkKubeClient flinkKubeClient =
-                new Fabric8FlinkKubeClient(
-                        effectiveConfig,
-                        (NamespacedKubernetesClient) kubernetesClient,
-                        Executors.newSingleThreadExecutor());
-        for (int i = 0; i < 60; i++) {
-            if (!flinkKubeClient
-                    .getRestEndpoint(effectiveConfig.get(KubernetesConfigOptions.CLUSTER_ID))
-                    .isPresent()) {
-                break;
-            }
-            LOG.info("Waiting for cluster shutdown... ({})", i);
-            Thread.sleep(1000);
-        }
-    }
 }