You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/07/20 15:59:10 UTC

[flink-kubernetes-operator] 04/05: [FLINK-27445] Create Flink Standalone Service

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 0d05d0ab514387a314179785bb850d41c7dd1e85
Author: Usamah Jassat <us...@amazon.com>
AuthorDate: Fri Jun 17 18:14:38 2022 +0100

    [FLINK-27445] Create Flink Standalone Service
---
 .../flink/kubernetes/operator/FlinkOperator.java   |  16 +-
 .../operator/config/FlinkConfigBuilder.java        |   2 +-
 .../operator/crd/spec/FlinkDeploymentSpec.java     |   3 +
 .../crd/spec/KubernetesDeploymentMode.java         |  47 ++
 .../observer/deployment/ObserverFactory.java       |  20 +-
 .../observer/sessionjob/SessionJobObserver.java    |  85 +--
 .../AbstractFlinkResourceReconciler.java           |  34 +-
 .../deployment/AbstractJobReconciler.java          |  28 +-
 .../deployment/ApplicationReconciler.java          |  38 +-
 .../reconciler/deployment/ReconcilerFactory.java   |  12 +-
 .../reconciler/deployment/SessionReconciler.java   |  31 +-
 .../sessionjob/SessionJobReconciler.java           |  27 +-
 ...FlinkService.java => AbstractFlinkService.java} | 507 +++++++-------
 .../kubernetes/operator/service/FlinkService.java  | 729 +--------------------
 .../operator/service/FlinkServiceFactory.java      |  66 ++
 .../operator/service/NativeFlinkService.java       | 178 +++++
 .../operator/service/StandaloneFlinkService.java   | 163 +++++
 .../kubernetes/operator/utils/FlinkUtils.java      | 135 ----
 .../kubernetes/operator/TestingFlinkService.java   |  22 +-
 .../operator/TestingFlinkServiceFactory.java       |  52 ++
 .../TestingFlinkDeploymentController.java          |   7 +-
 .../sessionjob/SessionJobObserverTest.java         |   9 +-
 .../deployment/ApplicationReconcilerTest.java      |   6 +-
 .../sessionjob/SessionJobReconcilerTest.java       |  13 +-
 ...erviceTest.java => NativeFlinkServiceTest.java} |  12 +-
 .../service/StandaloneFlinkServiceTest.java        | 138 ++++
 .../crds/flinkdeployments.flink.apache.org-v1.yml  |   5 +
 27 files changed, 1157 insertions(+), 1228 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 8933a6c7..4d2f4674 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -37,7 +37,7 @@ import org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory;
 import org.apache.flink.kubernetes.operator.observer.sessionjob.SessionJobObserver;
 import org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
 import org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler;
-import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.service.FlinkServiceFactory;
 import org.apache.flink.kubernetes.operator.utils.EnvUtils;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.KubernetesClientUtils;
@@ -68,7 +68,7 @@ public class FlinkOperator {
     private final Operator operator;
 
     private final KubernetesClient client;
-    private final FlinkService flinkService;
+    private final FlinkServiceFactory flinkServiceFactory;
     private final FlinkConfigManager configManager;
     private final Set<FlinkResourceValidator> validators;
     @VisibleForTesting final Set<RegisteredController> registeredControllers = new HashSet<>();
@@ -86,7 +86,7 @@ public class FlinkOperator {
                 KubernetesClientUtils.getKubernetesClient(
                         configManager.getOperatorConfiguration(), this.metricGroup);
         this.operator = new Operator(client, this::overrideOperatorConfigs);
-        this.flinkService = new FlinkService(client, configManager);
+        this.flinkServiceFactory = new FlinkServiceFactory(client, configManager);
         this.validators = ValidatorUtils.discoverValidators(configManager);
         this.listeners = ListenerUtils.discoverListeners(configManager);
         PluginManager pluginManager =
@@ -126,9 +126,10 @@ public class FlinkOperator {
         var eventRecorder = EventRecorder.create(client, listeners);
         var reconcilerFactory =
                 new ReconcilerFactory(
-                        client, flinkService, configManager, eventRecorder, statusRecorder);
+                        client, flinkServiceFactory, configManager, eventRecorder, statusRecorder);
         var observerFactory =
-                new ObserverFactory(flinkService, configManager, statusRecorder, eventRecorder);
+                new ObserverFactory(
+                        flinkServiceFactory, configManager, statusRecorder, eventRecorder);
 
         var controller =
                 new FlinkDeploymentController(
@@ -149,9 +150,10 @@ public class FlinkOperator {
                         client, new MetricManager<>(metricGroup, configManager), listeners);
         var reconciler =
                 new SessionJobReconciler(
-                        client, flinkService, configManager, eventRecorder, statusRecorder);
+                        client, flinkServiceFactory, configManager, eventRecorder, statusRecorder);
         var observer =
-                new SessionJobObserver(flinkService, configManager, statusRecorder, eventRecorder);
+                new SessionJobObserver(
+                        flinkServiceFactory, configManager, statusRecorder, eventRecorder);
         var controller =
                 new FlinkSessionJobController(
                         configManager, validators, reconciler, observer, statusRecorder);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
index 2dfd9fd9..8941dd51 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
@@ -260,7 +260,7 @@ public class FlinkConfigBuilder {
         return effectiveConfig;
     }
 
-    protected static Configuration buildFrom(
+    public static Configuration buildFrom(
             String namespace, String clusterId, FlinkDeploymentSpec spec, Configuration flinkConfig)
             throws IOException, URISyntaxException {
         return new FlinkConfigBuilder(namespace, clusterId, spec, flinkConfig)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
index 14f50afa..4b81f1b8 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
@@ -70,4 +70,7 @@ public class FlinkDeploymentSpec extends AbstractFlinkSpec {
      * configContent.
      */
     private Map<String, String> logConfiguration;
+
+    /** Deployment mode of the Flink cluster, native or standalone. */
+    private KubernetesDeploymentMode mode;
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/KubernetesDeploymentMode.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/KubernetesDeploymentMode.java
new file mode 100644
index 00000000..5537be61
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/KubernetesDeploymentMode.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.crd.spec;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/** Enum to control Flink deployment mode on Kubernetes. */
+@Experimental
+public enum KubernetesDeploymentMode {
+
+    /**
+     * Deploys Flink using Flinks native Kubernetes support. Only supported for newer versions of
+     * Flink
+     */
+    @JsonProperty("native")
+    NATIVE,
+
+    /** Deploys Flink on-top of kubernetes in standalone mode. */
+    @JsonProperty("standalone")
+    STANDALONE;
+
+    public static KubernetesDeploymentMode getDeploymentMode(FlinkDeployment flinkDeployment) {
+        return getDeploymentMode(flinkDeployment.getSpec());
+    }
+
+    public static KubernetesDeploymentMode getDeploymentMode(FlinkDeploymentSpec spec) {
+        return spec.getMode() == null ? KubernetesDeploymentMode.NATIVE : spec.getMode();
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ObserverFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ObserverFactory.java
index 605c2769..6c2c203d 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ObserverFactory.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ObserverFactory.java
@@ -22,28 +22,28 @@ import org.apache.flink.kubernetes.operator.config.Mode;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.observer.Observer;
-import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.service.FlinkServiceFactory;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
-/** The factory to create the observer based ob the {@link FlinkDeployment} mode. */
+/** The factory to create the observer based on the {@link FlinkDeployment} mode. */
 public class ObserverFactory {
 
-    private final FlinkService flinkService;
+    private final FlinkServiceFactory flinkServiceFactory;
     private final FlinkConfigManager configManager;
     private final StatusRecorder<FlinkDeploymentStatus> statusRecorder;
     private final EventRecorder eventRecorder;
     private final Map<Mode, Observer<FlinkDeployment>> observerMap;
 
     public ObserverFactory(
-            FlinkService flinkService,
+            FlinkServiceFactory flinkServiceFactory,
             FlinkConfigManager configManager,
             StatusRecorder<FlinkDeploymentStatus> statusRecorder,
             EventRecorder eventRecorder) {
-        this.flinkService = flinkService;
+        this.flinkServiceFactory = flinkServiceFactory;
         this.configManager = configManager;
         this.statusRecorder = statusRecorder;
         this.eventRecorder = eventRecorder;
@@ -56,10 +56,16 @@ public class ObserverFactory {
                 mode -> {
                     switch (mode) {
                         case SESSION:
-                            return new SessionObserver(flinkService, configManager, eventRecorder);
+                            return new SessionObserver(
+                                    flinkServiceFactory.getOrCreate(flinkApp),
+                                    configManager,
+                                    eventRecorder);
                         case APPLICATION:
                             return new ApplicationObserver(
-                                    flinkService, configManager, statusRecorder, eventRecorder);
+                                    flinkServiceFactory.getOrCreate(flinkApp),
+                                    configManager,
+                                    statusRecorder,
+                                    eventRecorder);
                         default:
                             throw new UnsupportedOperationException(
                                     String.format("Unsupported running mode: %s", mode));
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
index 5ba397c4..c3aeec46 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
@@ -32,6 +32,7 @@ import org.apache.flink.kubernetes.operator.observer.context.VoidObserverContext
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.service.FlinkServiceFactory;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
@@ -52,52 +53,51 @@ import java.util.stream.Collectors;
 public class SessionJobObserver implements Observer<FlinkSessionJob> {
 
     private static final Logger LOG = LoggerFactory.getLogger(SessionJobObserver.class);
+    private final FlinkServiceFactory flinkServiceFactory;
     private final FlinkConfigManager configManager;
     private final EventRecorder eventRecorder;
-    private final SavepointObserver<FlinkSessionJobStatus> savepointObserver;
-    private final JobStatusObserver<VoidObserverContext> jobStatusObserver;
-    private final FlinkService flinkService;
+    private final StatusRecorder<FlinkSessionJobStatus> statusRecorder;
 
     public SessionJobObserver(
-            FlinkService flinkService,
+            FlinkServiceFactory flinkServiceFactory,
             FlinkConfigManager configManager,
             StatusRecorder<FlinkSessionJobStatus> statusRecorder,
             EventRecorder eventRecorder) {
+        this.flinkServiceFactory = flinkServiceFactory;
         this.configManager = configManager;
         this.eventRecorder = eventRecorder;
-        this.flinkService = flinkService;
-        this.savepointObserver =
-                new SavepointObserver<>(flinkService, configManager, statusRecorder, eventRecorder);
-        this.jobStatusObserver =
-                new JobStatusObserver<>(flinkService, eventRecorder) {
-                    @Override
-                    protected void onTimeout(VoidObserverContext sessionJobObserverContext) {}
-
-                    @Override
-                    protected Optional<JobStatusMessage> filterTargetJob(
-                            JobStatus status, List<JobStatusMessage> clusterJobStatuses) {
-                        var jobId =
-                                Preconditions.checkNotNull(
-                                        status.getJobId(),
-                                        "The jobID to be observed should not be null");
-                        var matchedList =
-                                clusterJobStatuses.stream()
-                                        .filter(job -> job.getJobId().toHexString().equals(jobId))
-                                        .collect(Collectors.toList());
-                        Preconditions.checkArgument(
-                                matchedList.size() <= 1,
-                                String.format(
-                                        "Expected one job for JobID: %s, but %d founded",
-                                        status.getJobId(), matchedList.size()));
-
-                        if (matchedList.size() == 0) {
-                            LOG.info("No job found for JobID: {}", jobId);
-                            return Optional.empty();
-                        } else {
-                            return Optional.of(matchedList.get(0));
-                        }
-                    }
-                };
+        this.statusRecorder = statusRecorder;
+    }
+
+    private JobStatusObserver<VoidObserverContext> getJobStatusObserver(FlinkService flinkService) {
+        return new JobStatusObserver<>(flinkService, eventRecorder) {
+            @Override
+            protected void onTimeout(VoidObserverContext sessionJobObserverContext) {}
+
+            @Override
+            protected Optional<JobStatusMessage> filterTargetJob(
+                    JobStatus status, List<JobStatusMessage> clusterJobStatuses) {
+                var jobId =
+                        Preconditions.checkNotNull(
+                                status.getJobId(), "The jobID to be observed should not be null");
+                var matchedList =
+                        clusterJobStatuses.stream()
+                                .filter(job -> job.getJobId().toHexString().equals(jobId))
+                                .collect(Collectors.toList());
+                Preconditions.checkArgument(
+                        matchedList.size() <= 1,
+                        String.format(
+                                "Expected one job for JobID: %s, but %d founded",
+                                status.getJobId(), matchedList.size()));
+
+                if (matchedList.size() == 0) {
+                    LOG.info("No job found for JobID: {}", jobId);
+                    return Optional.empty();
+                } else {
+                    return Optional.of(matchedList.get(0));
+                }
+            }
+        };
     }
 
     @Override
@@ -112,11 +112,13 @@ public class SessionJobObserver implements Observer<FlinkSessionJob> {
             return;
         }
 
+        FlinkService flinkService = flinkServiceFactory.getOrCreate(flinkDepOpt.get());
+        var jobStatusObserver = getJobStatusObserver(flinkService);
         var deployedConfig =
                 configManager.getSessionJobConfig(flinkDepOpt.get(), flinkSessionJob.getSpec());
         var reconciliationStatus = flinkSessionJob.getStatus().getReconciliationStatus();
         if (reconciliationStatus.getState() == ReconciliationState.UPGRADING) {
-            checkIfAlreadyUpgraded(flinkSessionJob, deployedConfig);
+            checkIfAlreadyUpgraded(flinkSessionJob, deployedConfig, flinkService);
             if (reconciliationStatus.getState() == ReconciliationState.UPGRADING) {
                 return;
             }
@@ -127,13 +129,18 @@ public class SessionJobObserver implements Observer<FlinkSessionJob> {
                         flinkSessionJob, deployedConfig, VoidObserverContext.INSTANCE);
 
         if (jobFound) {
+            SavepointObserver savepointObserver =
+                    new SavepointObserver<>(
+                            flinkService, configManager, statusRecorder, eventRecorder);
             savepointObserver.observeSavepointStatus(flinkSessionJob, deployedConfig);
         }
         SavepointUtils.resetTriggerIfJobNotRunning(flinkSessionJob, eventRecorder);
     }
 
     private void checkIfAlreadyUpgraded(
-            FlinkSessionJob flinkSessionJob, Configuration deployedConfig) {
+            FlinkSessionJob flinkSessionJob,
+            Configuration deployedConfig,
+            FlinkService flinkService) {
         var uid = flinkSessionJob.getMetadata().getUid();
         Collection<JobStatusMessage> jobStatusMessages;
         try {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index 7758ac6f..3d458432 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -65,7 +65,6 @@ public abstract class AbstractFlinkResourceReconciler<
     protected final EventRecorder eventRecorder;
     protected final StatusRecorder<STATUS> statusRecorder;
     protected final KubernetesClient kubernetesClient;
-    protected final FlinkService flinkService;
 
     public static final String MSG_SUSPENDED = "Suspending existing deployment.";
     public static final String MSG_SPEC_CHANGED = "Detected spec change, starting reconciliation.";
@@ -74,12 +73,10 @@ public abstract class AbstractFlinkResourceReconciler<
 
     public AbstractFlinkResourceReconciler(
             KubernetesClient kubernetesClient,
-            FlinkService flinkService,
             FlinkConfigManager configManager,
             EventRecorder eventRecorder,
             StatusRecorder<STATUS> statusRecorder) {
         this.kubernetesClient = kubernetesClient;
-        this.flinkService = flinkService;
         this.configManager = configManager;
         this.eventRecorder = eventRecorder;
         this.statusRecorder = statusRecorder;
@@ -112,6 +109,7 @@ public abstract class AbstractFlinkResourceReconciler<
                     cr,
                     spec,
                     status,
+                    ctx,
                     deployConfig,
                     Optional.ofNullable(spec.getJob()).map(JobSpec::getInitialSavepointPath),
                     false);
@@ -128,7 +126,7 @@ public abstract class AbstractFlinkResourceReconciler<
                 reconciliationStatus.getState() == ReconciliationState.UPGRADING
                         || !currentDeploySpec.equals(lastReconciledSpec);
         var observeConfig = getObserveConfig(cr, ctx);
-
+        var flinkService = getFlinkService(cr, ctx);
         if (specChanged) {
             if (checkNewSpecAlreadyDeployed(cr, deployConfig)) {
                 return;
@@ -142,8 +140,8 @@ public abstract class AbstractFlinkResourceReconciler<
                         EventRecorder.Component.JobManagerDeployment,
                         MSG_SPEC_CHANGED);
             }
-            reconcileSpecChange(cr, observeConfig, deployConfig);
-        } else if (shouldRollBack(cr, observeConfig)) {
+            reconcileSpecChange(cr, ctx, observeConfig, deployConfig);
+        } else if (shouldRollBack(cr, observeConfig, flinkService)) {
             // Rollbacks are executed in two steps, we initiate it first then return
             if (initiateRollBack(status)) {
                 return;
@@ -156,7 +154,7 @@ public abstract class AbstractFlinkResourceReconciler<
                     EventRecorder.Component.JobManagerDeployment,
                     MSG_ROLLBACK);
             rollback(cr, ctx, observeConfig);
-        } else if (!reconcileOtherChanges(cr, observeConfig)) {
+        } else if (!reconcileOtherChanges(cr, ctx, observeConfig)) {
             LOG.info("Resource fully reconciled, nothing to do...");
         }
     }
@@ -202,7 +200,8 @@ public abstract class AbstractFlinkResourceReconciler<
      * @throws Exception Error during spec upgrade.
      */
     protected abstract void reconcileSpecChange(
-            CR cr, Configuration observeConfig, Configuration deployConfig) throws Exception;
+            CR cr, Context ctx, Configuration observeConfig, Configuration deployConfig)
+            throws Exception;
 
     /**
      * Rollback deployed resource to the last stable spec.
@@ -224,8 +223,8 @@ public abstract class AbstractFlinkResourceReconciler<
      * @return True if any further reconciliation action was taken.
      * @throws Exception Error during reconciliation.
      */
-    protected abstract boolean reconcileOtherChanges(CR cr, Configuration observeConfig)
-            throws Exception;
+    protected abstract boolean reconcileOtherChanges(
+            CR cr, Context context, Configuration observeConfig) throws Exception;
 
     @Override
     public final DeleteControl cleanup(CR resource, Context context) {
@@ -239,6 +238,7 @@ public abstract class AbstractFlinkResourceReconciler<
      * @param spec Spec that should be deployed to Kubernetes.
      * @param status Status object of the resource
      * @param deployConfig Flink conf for the deployment.
+     * @param ctx Reconciliation context.
      * @param savepoint Optional savepoint path for applications and session jobs.
      * @param requireHaMetadata Flag used by application deployments to validate HA metadata
      * @throws Exception Error during deployment.
@@ -247,6 +247,7 @@ public abstract class AbstractFlinkResourceReconciler<
             CR relatedResource,
             SPEC spec,
             STATUS status,
+            Context ctx,
             Configuration deployConfig,
             Optional<String> savepoint,
             boolean requireHaMetadata)
@@ -261,6 +262,15 @@ public abstract class AbstractFlinkResourceReconciler<
      */
     protected abstract DeleteControl cleanupInternal(CR resource, Context context);
 
+    /**
+     * Get the Flink service related to the resource and context.
+     *
+     * @param resource
+     * @param context
+     * @return
+     */
+    protected abstract FlinkService getFlinkService(CR resource, Context context);
+
     /**
      * Checks whether the desired spec already matches the currently deployed spec. If they match
      * the resource status is updated to reflect successful reconciliation.
@@ -296,7 +306,9 @@ public abstract class AbstractFlinkResourceReconciler<
      * @return True if the resource should be rolled back.
      */
     private boolean shouldRollBack(
-            AbstractFlinkResource<SPEC, STATUS> resource, Configuration configuration) {
+            AbstractFlinkResource<SPEC, STATUS> resource,
+            Configuration configuration,
+            FlinkService flinkService) {
 
         var reconciliationStatus = resource.getStatus().getReconciliationStatus();
         if (reconciliationStatus.getState() == ReconciliationState.ROLLING_BACK) {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index 7f4cbdf1..e3a72a05 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -28,7 +28,6 @@ import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
-import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
@@ -54,11 +53,10 @@ public abstract class AbstractJobReconciler<
 
     public AbstractJobReconciler(
             KubernetesClient kubernetesClient,
-            FlinkService flinkService,
             FlinkConfigManager configManager,
             EventRecorder eventRecorder,
             StatusRecorder<STATUS> statusRecorder) {
-        super(kubernetesClient, flinkService, configManager, eventRecorder, statusRecorder);
+        super(kubernetesClient, configManager, eventRecorder, statusRecorder);
     }
 
     @Override
@@ -80,8 +78,8 @@ public abstract class AbstractJobReconciler<
 
     @Override
     protected void reconcileSpecChange(
-            CR resource, Configuration observeConfig, Configuration deployConfig) throws Exception {
-        var deployMeta = resource.getMetadata();
+            CR resource, Context ctx, Configuration observeConfig, Configuration deployConfig)
+            throws Exception {
         STATUS status = resource.getStatus();
         var reconciliationStatus = status.getReconciliationStatus();
         SPEC lastReconciledSpec = reconciliationStatus.deserializeLastReconciledSpec();
@@ -107,7 +105,7 @@ public abstract class AbstractJobReconciler<
                     MSG_SUSPENDED);
             // We must record the upgrade mode used to the status later
             currentDeploySpec.getJob().setUpgradeMode(availableUpgradeMode.get());
-            cancelJob(resource, availableUpgradeMode.get(), observeConfig);
+            cancelJob(resource, ctx, availableUpgradeMode.get(), observeConfig);
             if (desiredJobState == JobState.RUNNING) {
                 ReconciliationUtils.updateStatusBeforeDeploymentAttempt(resource, deployConfig);
             } else {
@@ -123,6 +121,7 @@ public abstract class AbstractJobReconciler<
                     resource,
                     currentDeploySpec,
                     status,
+                    ctx,
                     deployConfig,
                     // We decide to enforce HA based on how job was previously suspended
                     lastReconciledSpec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE);
@@ -174,6 +173,7 @@ public abstract class AbstractJobReconciler<
             CR resource,
             SPEC spec,
             STATUS status,
+            Context ctx,
             Configuration deployConfig,
             boolean requireHaMetadata)
             throws Exception {
@@ -185,11 +185,11 @@ public abstract class AbstractJobReconciler<
                             .flatMap(s -> Optional.ofNullable(s.getLocation()));
         }
 
-        deploy(resource, spec, status, deployConfig, savepointOpt, requireHaMetadata);
+        deploy(resource, spec, status, ctx, deployConfig, savepointOpt, requireHaMetadata);
     }
 
     @Override
-    protected void rollback(CR resource, Context context, Configuration observeConfig)
+    protected void rollback(CR resource, Context ctx, Configuration observeConfig)
             throws Exception {
         var reconciliationStatus = resource.getStatus().getReconciliationStatus();
         var rollbackSpec = reconciliationStatus.deserializeLastStableSpec();
@@ -198,6 +198,7 @@ public abstract class AbstractJobReconciler<
 
         cancelJob(
                 resource,
+                ctx,
                 upgradeMode == UpgradeMode.STATELESS
                         ? UpgradeMode.STATELESS
                         : UpgradeMode.LAST_STATE,
@@ -207,16 +208,18 @@ public abstract class AbstractJobReconciler<
                 resource,
                 rollbackSpec,
                 resource.getStatus(),
-                getDeployConfig(resource.getMetadata(), rollbackSpec, context),
+                ctx,
+                getDeployConfig(resource.getMetadata(), rollbackSpec, ctx),
                 upgradeMode != UpgradeMode.STATELESS);
 
         reconciliationStatus.setState(ReconciliationState.ROLLED_BACK);
     }
 
     @Override
-    public boolean reconcileOtherChanges(CR resource, Configuration observeConfig)
+    public boolean reconcileOtherChanges(CR resource, Context context, Configuration observeConfig)
             throws Exception {
-        return SavepointUtils.triggerSavepointIfNeeded(flinkService, resource, observeConfig);
+        return SavepointUtils.triggerSavepointIfNeeded(
+                getFlinkService(resource, context), resource, observeConfig);
     }
 
     /**
@@ -228,5 +231,6 @@ public abstract class AbstractJobReconciler<
      * @throws Exception Error during cancellation.
      */
     protected abstract void cancelJob(
-            CR resource, UpgradeMode upgradeMode, Configuration observeConfig) throws Exception;
+            CR resource, Context ctx, UpgradeMode upgradeMode, Configuration observeConfig)
+            throws Exception;
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index df0efe37..c03ddc98 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -33,6 +33,7 @@ import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.IngressUtils;
+import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 import org.apache.flink.runtime.highavailability.JobResultStoreOptions;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
@@ -53,6 +54,7 @@ public class ApplicationReconciler
         extends AbstractJobReconciler<FlinkDeployment, FlinkDeploymentSpec, FlinkDeploymentStatus> {
 
     private static final Logger LOG = LoggerFactory.getLogger(ApplicationReconciler.class);
+    protected final FlinkService flinkService;
 
     public ApplicationReconciler(
             KubernetesClient kubernetesClient,
@@ -60,7 +62,13 @@ public class ApplicationReconciler
             FlinkConfigManager configManager,
             EventRecorder eventRecorder,
             StatusRecorder<FlinkDeploymentStatus> statusRecorder) {
-        super(kubernetesClient, flinkService, configManager, eventRecorder, statusRecorder);
+        super(kubernetesClient, configManager, eventRecorder, statusRecorder);
+        this.flinkService = flinkService;
+    }
+
+    @Override
+    protected FlinkService getFlinkService(FlinkDeployment resource, Context context) {
+        return flinkService;
     }
 
     @Override
@@ -118,6 +126,7 @@ public class ApplicationReconciler
             FlinkDeployment relatedResource,
             FlinkDeploymentSpec spec,
             FlinkDeploymentStatus status,
+            Context ctx,
             Configuration deployConfig,
             Optional<String> savepoint,
             boolean requireHaMetadata)
@@ -138,13 +147,7 @@ public class ApplicationReconciler
             }
             LOG.info("Deleting deployment with terminated application before new deployment");
             flinkService.deleteClusterDeployment(relatedResource.getMetadata(), status, true);
-            FlinkUtils.waitForClusterShutdown(
-                    kubernetesClient,
-                    deployConfig,
-                    configManager
-                            .getOperatorConfiguration()
-                            .getFlinkShutdownClusterTimeout()
-                            .toSeconds());
+            flinkService.waitForClusterShutdown(deployConfig);
         }
         eventRecorder.triggerEvent(
                 relatedResource,
@@ -162,7 +165,10 @@ public class ApplicationReconciler
 
     @Override
     protected void cancelJob(
-            FlinkDeployment deployment, UpgradeMode upgradeMode, Configuration observeConfig)
+            FlinkDeployment deployment,
+            Context ctx,
+            UpgradeMode upgradeMode,
+            Configuration observeConfig)
             throws Exception {
         flinkService.cancelJob(deployment, upgradeMode, observeConfig);
     }
@@ -186,24 +192,24 @@ public class ApplicationReconciler
     }
 
     @Override
-    public boolean reconcileOtherChanges(FlinkDeployment deployment, Configuration observeConfig)
-            throws Exception {
-        if (super.reconcileOtherChanges(deployment, observeConfig)) {
+    public boolean reconcileOtherChanges(
+            FlinkDeployment deployment, Context ctx, Configuration observeConfig) throws Exception {
+        if (SavepointUtils.triggerSavepointIfNeeded(flinkService, deployment, observeConfig)) {
             return true;
         }
 
         if (shouldRecoverDeployment(observeConfig, deployment)) {
-            recoverJmDeployment(deployment, observeConfig);
+            recoverJmDeployment(deployment, ctx, observeConfig);
             return true;
         }
         return false;
     }
 
-    private void recoverJmDeployment(FlinkDeployment deployment, Configuration observeConfig)
-            throws Exception {
+    private void recoverJmDeployment(
+            FlinkDeployment deployment, Context ctx, Configuration observeConfig) throws Exception {
         LOG.info("Missing Flink Cluster deployment, trying to recover...");
         FlinkDeploymentSpec specToRecover = ReconciliationUtils.getDeployedSpec(deployment);
-        restoreJob(deployment, specToRecover, deployment.getStatus(), observeConfig, true);
+        restoreJob(deployment, specToRecover, deployment.getStatus(), ctx, observeConfig, true);
     }
 
     @Override
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
index 2e556fd1..419e3e41 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
@@ -22,7 +22,7 @@ import org.apache.flink.kubernetes.operator.config.Mode;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
-import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.service.FlinkServiceFactory;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 
@@ -35,7 +35,7 @@ import java.util.concurrent.ConcurrentHashMap;
 public class ReconcilerFactory {
 
     private final KubernetesClient kubernetesClient;
-    private final FlinkService flinkService;
+    private final FlinkServiceFactory flinkServiceFactory;
     private final FlinkConfigManager configManager;
     private final EventRecorder eventRecorder;
     private final StatusRecorder<FlinkDeploymentStatus> deploymentStatusRecorder;
@@ -43,12 +43,12 @@ public class ReconcilerFactory {
 
     public ReconcilerFactory(
             KubernetesClient kubernetesClient,
-            FlinkService flinkService,
+            FlinkServiceFactory flinkServiceFactory,
             FlinkConfigManager configManager,
             EventRecorder eventRecorder,
             StatusRecorder<FlinkDeploymentStatus> deploymentStatusRecorder) {
         this.kubernetesClient = kubernetesClient;
-        this.flinkService = flinkService;
+        this.flinkServiceFactory = flinkServiceFactory;
         this.configManager = configManager;
         this.eventRecorder = eventRecorder;
         this.deploymentStatusRecorder = deploymentStatusRecorder;
@@ -63,14 +63,14 @@ public class ReconcilerFactory {
                         case SESSION:
                             return new SessionReconciler(
                                     kubernetesClient,
-                                    flinkService,
+                                    flinkServiceFactory.getOrCreate(flinkApp),
                                     configManager,
                                     eventRecorder,
                                     deploymentStatusRecorder);
                         case APPLICATION:
                             return new ApplicationReconciler(
                                     kubernetesClient,
-                                    flinkService,
+                                    flinkServiceFactory.getOrCreate(flinkApp),
                                     configManager,
                                     eventRecorder,
                                     deploymentStatusRecorder);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
index 791db2fe..527cb8b5 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
@@ -29,7 +29,6 @@ import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
-import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.IngressUtils;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 
@@ -52,6 +51,8 @@ public class SessionReconciler
         extends AbstractFlinkResourceReconciler<
                 FlinkDeployment, FlinkDeploymentSpec, FlinkDeploymentStatus> {
 
+    protected final FlinkService flinkService;
+
     private static final Logger LOG = LoggerFactory.getLogger(SessionReconciler.class);
 
     public SessionReconciler(
@@ -60,7 +61,13 @@ public class SessionReconciler
             FlinkConfigManager configManager,
             EventRecorder eventRecorder,
             StatusRecorder<FlinkDeploymentStatus> statusRecorder) {
-        super(kubernetesClient, flinkService, configManager, eventRecorder, statusRecorder);
+        super(kubernetesClient, configManager, eventRecorder, statusRecorder);
+        this.flinkService = flinkService;
+    }
+
+    @Override
+    protected FlinkService getFlinkService(FlinkDeployment resource, Context context) {
+        return flinkService;
     }
 
     @Override
@@ -82,7 +89,10 @@ public class SessionReconciler
 
     @Override
     protected void reconcileSpecChange(
-            FlinkDeployment deployment, Configuration observeConfig, Configuration deployConfig)
+            FlinkDeployment deployment,
+            Context ctx,
+            Configuration observeConfig,
+            Configuration deployConfig)
             throws Exception {
         deleteSessionCluster(deployment, observeConfig);
 
@@ -94,6 +104,7 @@ public class SessionReconciler
                 deployment,
                 deployment.getSpec(),
                 deployment.getStatus(),
+                ctx,
                 deployConfig,
                 Optional.empty(),
                 false);
@@ -103,13 +114,7 @@ public class SessionReconciler
     private void deleteSessionCluster(FlinkDeployment deployment, Configuration effectiveConfig) {
         flinkService.deleteClusterDeployment(
                 deployment.getMetadata(), deployment.getStatus(), false);
-        FlinkUtils.waitForClusterShutdown(
-                kubernetesClient,
-                effectiveConfig,
-                configManager
-                        .getOperatorConfiguration()
-                        .getFlinkShutdownClusterTimeout()
-                        .toSeconds());
+        flinkService.waitForClusterShutdown(effectiveConfig);
     }
 
     @Override
@@ -117,6 +122,7 @@ public class SessionReconciler
             FlinkDeployment cr,
             FlinkDeploymentSpec spec,
             FlinkDeploymentStatus status,
+            Context ctx,
             Configuration deployConfig,
             Optional<String> savepoint,
             boolean requireHaMetadata)
@@ -141,6 +147,7 @@ public class SessionReconciler
                 deployment,
                 rollbackSpec,
                 deployment.getStatus(),
+                ctx,
                 rollbackConfig,
                 Optional.empty(),
                 false);
@@ -149,8 +156,8 @@ public class SessionReconciler
     }
 
     @Override
-    public boolean reconcileOtherChanges(FlinkDeployment flinkApp, Configuration observeConfig)
-            throws Exception {
+    public boolean reconcileOtherChanges(
+            FlinkDeployment flinkApp, Context ctx, Configuration observeConfig) throws Exception {
         if (shouldRecoverDeployment(observeConfig, flinkApp)) {
             recoverSession(flinkApp, observeConfig);
             return true;
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
index e04d3bd1..f0dd2520 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
@@ -28,6 +28,7 @@ import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatu
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.service.FlinkServiceFactory;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 
@@ -45,14 +46,27 @@ public class SessionJobReconciler
         extends AbstractJobReconciler<FlinkSessionJob, FlinkSessionJobSpec, FlinkSessionJobStatus> {
 
     private static final Logger LOG = LoggerFactory.getLogger(SessionJobReconciler.class);
+    private final FlinkServiceFactory flinkServiceFactory;
 
     public SessionJobReconciler(
             KubernetesClient kubernetesClient,
-            FlinkService flinkService,
+            FlinkServiceFactory flinkServiceFactory,
             FlinkConfigManager configManager,
             EventRecorder eventRecorder,
             StatusRecorder<FlinkSessionJobStatus> statusRecorder) {
-        super(kubernetesClient, flinkService, configManager, eventRecorder, statusRecorder);
+        super(kubernetesClient, configManager, eventRecorder, statusRecorder);
+        this.flinkServiceFactory = flinkServiceFactory;
+    }
+
+    @Override
+    protected FlinkService getFlinkService(FlinkSessionJob resource, Context context) {
+        Optional<FlinkDeployment> deploymentOpt =
+                context.getSecondaryResource(FlinkDeployment.class);
+
+        if (sessionClusterReady(deploymentOpt)) {
+            return flinkServiceFactory.getOrCreate(deploymentOpt.get());
+        }
+        return null;
     }
 
     @Override
@@ -84,10 +98,12 @@ public class SessionJobReconciler
             FlinkSessionJob cr,
             FlinkSessionJobSpec sessionJobSpec,
             FlinkSessionJobStatus status,
+            Context ctx,
             Configuration deployConfig,
             Optional<String> savepoint,
             boolean requireHaMetadata)
             throws Exception {
+        FlinkService flinkService = getFlinkService(cr, ctx);
         var jobID =
                 flinkService.submitJobToSessionCluster(
                         cr.getMetadata(), sessionJobSpec, deployConfig, savepoint.orElse(null));
@@ -101,8 +117,12 @@ public class SessionJobReconciler
 
     @Override
     protected void cancelJob(
-            FlinkSessionJob resource, UpgradeMode upgradeMode, Configuration observeConfig)
+            FlinkSessionJob resource,
+            Context ctx,
+            UpgradeMode upgradeMode,
+            Configuration observeConfig)
             throws Exception {
+        FlinkService flinkService = getFlinkService(resource, ctx);
         flinkService.cancelSessionJob(resource, upgradeMode, observeConfig);
     }
 
@@ -117,6 +137,7 @@ public class SessionJobReconciler
                 try {
                     cancelJob(
                             sessionJob,
+                            context,
                             UpgradeMode.STATELESS,
                             getObserveConfig(sessionJob, context));
                 } catch (Exception e) {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
similarity index 87%
copy from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
copy to flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index 2deccaee..afb7bf27 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -20,13 +20,6 @@ package org.apache.flink.kubernetes.operator.service;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.client.cli.ApplicationDeployer;
-import org.apache.flink.client.deployment.ClusterClientFactory;
-import org.apache.flink.client.deployment.ClusterClientServiceLoader;
-import org.apache.flink.client.deployment.ClusterDescriptor;
-import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
-import org.apache.flink.client.deployment.application.ApplicationConfiguration;
-import org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.CheckpointingOptions;
@@ -43,7 +36,6 @@ import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
-import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
 import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
@@ -94,6 +86,7 @@ import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
 import io.fabric8.kubernetes.api.model.ObjectMeta;
 import io.fabric8.kubernetes.api.model.PodList;
+import io.fabric8.kubernetes.api.model.Service;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import org.apache.commons.lang3.ObjectUtils;
 import org.slf4j.Logger;
@@ -106,6 +99,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketAddress;
+import java.net.SocketTimeoutException;
 import java.net.URI;
 import java.util.Arrays;
 import java.util.Collection;
@@ -123,112 +117,42 @@ import java.util.stream.Collectors;
 
 import static org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.FLINK_VERSION;
 
-/** Service for submitting and interacting with Flink clusters and jobs. */
-public class FlinkService {
+/**
+ * An abstract {@link FlinkService} containing some common implementations for the native and
+ * standalone Flink Services.
+ */
+public abstract class AbstractFlinkService implements FlinkService {
 
-    private static final Logger LOG = LoggerFactory.getLogger(FlinkService.class);
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractFlinkService.class);
 
-    private final KubernetesClient kubernetesClient;
-    private final ArtifactManager artifactManager;
-    private final FlinkConfigManager configManager;
+    protected final KubernetesClient kubernetesClient;
+    protected final FlinkConfigManager configManager;
     private final ExecutorService executorService;
+    protected final ArtifactManager artifactManager;
 
-    public FlinkService(KubernetesClient kubernetesClient, FlinkConfigManager configManager) {
+    public AbstractFlinkService(
+            KubernetesClient kubernetesClient, FlinkConfigManager configManager) {
         this.kubernetesClient = kubernetesClient;
-        this.artifactManager = new ArtifactManager(configManager);
         this.configManager = configManager;
+        this.artifactManager = new ArtifactManager(configManager);
         this.executorService =
                 Executors.newFixedThreadPool(
                         4, new ExecutorThreadFactory("Flink-RestClusterClient-IO"));
     }
 
-    private static List<JobStatusMessage> toJobStatusMessage(
-            MultipleJobsDetails multipleJobsDetails) {
-        return multipleJobsDetails.getJobs().stream()
-                .map(
-                        details ->
-                                new JobStatusMessage(
-                                        details.getJobId(),
-                                        details.getJobName(),
-                                        getEffectiveStatus(details),
-                                        details.getStartTime()))
-                .collect(Collectors.toList());
-    }
-
-    @VisibleForTesting
-    protected static JobStatus getEffectiveStatus(JobDetails details) {
-        int numRunning = details.getTasksPerState()[ExecutionState.RUNNING.ordinal()];
-        int numFinished = details.getTasksPerState()[ExecutionState.FINISHED.ordinal()];
-        boolean allRunningOrFinished = details.getNumTasks() == (numRunning + numFinished);
-        JobStatus effectiveStatus = details.getStatus();
-        if (JobStatus.RUNNING.equals(effectiveStatus) && !allRunningOrFinished) {
-            effectiveStatus = JobStatus.CREATED;
-            LOG.debug("Adjusting job state from {} to {}", JobStatus.RUNNING, effectiveStatus);
-        }
-        return effectiveStatus;
-    }
+    protected abstract PodList getJmPodList(String namespace, String clusterId);
 
+    @Override
     public KubernetesClient getKubernetesClient() {
         return kubernetesClient;
     }
 
-    public void submitApplicationCluster(
-            JobSpec jobSpec, Configuration conf, boolean requireHaMetadata) throws Exception {
-        LOG.info(
-                "Deploying application cluster{}",
-                requireHaMetadata ? " requiring last-state from HA metadata" : "");
-        if (FlinkUtils.isKubernetesHAActivated(conf)) {
-            final String clusterId = conf.get(KubernetesConfigOptions.CLUSTER_ID);
-            final String namespace = conf.get(KubernetesConfigOptions.NAMESPACE);
-            // Delete the job graph in the HA ConfigMaps so that the newly changed job config(e.g.
-            // parallelism) could take effect
-            FlinkUtils.deleteJobGraphInKubernetesHA(clusterId, namespace, kubernetesClient);
-        }
-        if (requireHaMetadata) {
-            validateHaMetadataExists(conf);
-        }
-        final ClusterClientServiceLoader clusterClientServiceLoader =
-                new DefaultClusterClientServiceLoader();
-        final ApplicationDeployer deployer =
-                new ApplicationClusterDeployer(clusterClientServiceLoader);
-
-        final ApplicationConfiguration applicationConfiguration =
-                new ApplicationConfiguration(
-                        jobSpec.getArgs() != null ? jobSpec.getArgs() : new String[0],
-                        jobSpec.getEntryClass());
-
-        deployer.run(conf, applicationConfiguration);
-        LOG.info("Application cluster successfully deployed");
-    }
-
+    @Override
     public boolean isHaMetadataAvailable(Configuration conf) {
         return FlinkUtils.isHaMetadataAvailable(conf, kubernetesClient);
     }
 
-    protected void validateHaMetadataExists(Configuration conf) {
-        if (!isHaMetadataAvailable(conf)) {
-            throw new DeploymentFailedException(
-                    "HA metadata not available to restore from last state. "
-                            + "It is possible that the job has finished or terminally failed, or the configmaps have been deleted. "
-                            + "Manual restore required.",
-                    "RestoreFailed");
-        }
-    }
-
-    public void submitSessionCluster(Configuration conf) throws Exception {
-        LOG.info("Deploying session cluster");
-        final ClusterClientServiceLoader clusterClientServiceLoader =
-                new DefaultClusterClientServiceLoader();
-        final ClusterClientFactory<String> kubernetesClusterClientFactory =
-                clusterClientServiceLoader.getClusterClientFactory(conf);
-        try (final ClusterDescriptor<String> kubernetesClusterDescriptor =
-                kubernetesClusterClientFactory.createClusterDescriptor(conf)) {
-            kubernetesClusterDescriptor.deploySessionCluster(
-                    kubernetesClusterClientFactory.getClusterSpecification(conf));
-        }
-        LOG.info("Session cluster successfully deployed");
-    }
-
+    @Override
     public JobID submitJobToSessionCluster(
             ObjectMeta meta,
             FlinkSessionJobSpec spec,
@@ -242,108 +166,7 @@ public class FlinkService {
         return jobID;
     }
 
-    private JarRunResponseBody runJar(
-            JobSpec job,
-            JobID jobID,
-            JarUploadResponseBody response,
-            Configuration conf,
-            String savepoint) {
-        String jarId =
-                response.getFilename().substring(response.getFilename().lastIndexOf("/") + 1);
-        try (RestClusterClient<String> clusterClient =
-                (RestClusterClient<String>) getClusterClient(conf)) {
-            JarRunHeaders headers = JarRunHeaders.getInstance();
-            JarRunMessageParameters parameters = headers.getUnresolvedMessageParameters();
-            parameters.jarIdPathParameter.resolve(jarId);
-            JarRunRequestBody runRequestBody =
-                    new JarRunRequestBody(
-                            job.getEntryClass(),
-                            null,
-                            job.getArgs() == null ? null : Arrays.asList(job.getArgs()),
-                            job.getParallelism() > 0 ? job.getParallelism() : null,
-                            jobID,
-                            job.getAllowNonRestoredState(),
-                            savepoint,
-                            conf.get(FLINK_VERSION).isNewerVersionThan(FlinkVersion.v1_14)
-                                    ? RestoreMode.DEFAULT
-                                    : null);
-            LOG.info("Submitting job: {} to session cluster.", jobID.toHexString());
-            return clusterClient
-                    .sendRequest(headers, parameters, runRequestBody)
-                    .get(
-                            configManager
-                                    .getOperatorConfiguration()
-                                    .getFlinkClientTimeout()
-                                    .toSeconds(),
-                            TimeUnit.SECONDS);
-        } catch (Exception e) {
-            LOG.error("Failed to submit job to session cluster.", e);
-            throw new FlinkRuntimeException(e);
-        } finally {
-            deleteJar(conf, jarId);
-        }
-    }
-
-    private JarUploadResponseBody uploadJar(
-            ObjectMeta objectMeta, FlinkSessionJobSpec spec, Configuration conf) throws Exception {
-        String targetDir = artifactManager.generateJarDir(objectMeta, spec);
-        File jarFile = artifactManager.fetch(spec.getJob().getJarURI(), conf, targetDir);
-        Preconditions.checkArgument(
-                jarFile.exists(),
-                String.format("The jar file %s not exists", jarFile.getAbsolutePath()));
-        JarUploadHeaders headers = JarUploadHeaders.getInstance();
-        String clusterId = spec.getDeploymentName();
-        String namespace = objectMeta.getNamespace();
-        int port = conf.getInteger(RestOptions.PORT);
-        String host =
-                ObjectUtils.firstNonNull(
-                        configManager.getOperatorConfiguration().getFlinkServiceHostOverride(),
-                        ExternalServiceDecorator.getNamespacedExternalServiceName(
-                                clusterId, namespace));
-        try (RestClient restClient = new RestClient(conf, executorService)) {
-            // TODO add method in flink#RestClusterClient to support upload jar.
-            return restClient
-                    .sendRequest(
-                            host,
-                            port,
-                            headers,
-                            EmptyMessageParameters.getInstance(),
-                            EmptyRequestBody.getInstance(),
-                            Collections.singletonList(
-                                    new FileUpload(
-                                            jarFile.toPath(), RestConstants.CONTENT_TYPE_JAR)))
-                    .get(
-                            configManager
-                                    .getOperatorConfiguration()
-                                    .getFlinkClientTimeout()
-                                    .toSeconds(),
-                            TimeUnit.SECONDS);
-        } finally {
-            LOG.debug("Deleting the jar file {}", jarFile);
-            FileUtils.deleteFileOrDirectory(jarFile);
-        }
-    }
-
-    private void deleteJar(Configuration conf, String jarId) {
-        LOG.debug("Deleting the jar: {}", jarId);
-        try (RestClusterClient<String> clusterClient =
-                (RestClusterClient<String>) getClusterClient(conf)) {
-            JarDeleteHeaders headers = JarDeleteHeaders.getInstance();
-            JarDeleteMessageParameters parameters = headers.getUnresolvedMessageParameters();
-            parameters.jarIdPathParameter.resolve(jarId);
-            clusterClient
-                    .sendRequest(headers, parameters, EmptyRequestBody.getInstance())
-                    .get(
-                            configManager
-                                    .getOperatorConfiguration()
-                                    .getFlinkClientTimeout()
-                                    .toSeconds(),
-                            TimeUnit.SECONDS);
-        } catch (Exception e) {
-            LOG.error("Failed to delete the jar: {}.", jarId, e);
-        }
-    }
-
+    @Override
     public boolean isJobManagerPortReady(Configuration config) {
         final URI uri;
         try (ClusterClient<String> clusterClient = getClusterClient(config)) {
@@ -357,11 +180,13 @@ public class FlinkService {
             socket.connect(socketAddress, 1000);
             socket.close();
             return true;
+        } catch (SocketTimeoutException ste) {
         } catch (IOException e) {
         }
         return false;
     }
 
+    @Override
     public Collection<JobStatusMessage> listJobs(Configuration conf) throws Exception {
         try (RestClusterClient<String> clusterClient =
                 (RestClusterClient<String>) getClusterClient(conf)) {
@@ -370,7 +195,7 @@ public class FlinkService {
                             JobsOverviewHeaders.getInstance(),
                             EmptyMessageParameters.getInstance(),
                             EmptyRequestBody.getInstance())
-                    .thenApply(FlinkService::toJobStatusMessage)
+                    .thenApply(AbstractFlinkService::toJobStatusMessage)
                     .get(
                             configManager
                                     .getOperatorConfiguration()
@@ -380,6 +205,7 @@ public class FlinkService {
         }
     }
 
+    @Override
     public JobResult requestJobResult(Configuration conf, JobID jobID) throws Exception {
         try (ClusterClient<String> clusterClient = getClusterClient(conf)) {
             return clusterClient
@@ -393,22 +219,7 @@ public class FlinkService {
         }
     }
 
-    @VisibleForTesting
-    protected ClusterClient<String> getClusterClient(Configuration config) throws Exception {
-        final String clusterId = config.get(KubernetesConfigOptions.CLUSTER_ID);
-        final String namespace = config.get(KubernetesConfigOptions.NAMESPACE);
-        final int port = config.getInteger(RestOptions.PORT);
-        final String host =
-                ObjectUtils.firstNonNull(
-                        configManager.getOperatorConfiguration().getFlinkServiceHostOverride(),
-                        ExternalServiceDecorator.getNamespacedExternalServiceName(
-                                clusterId, namespace));
-        final String restServerAddress = String.format("http://%s:%s", host, port);
-        LOG.debug("Creating RestClusterClient({})", restServerAddress);
-        return new RestClusterClient<>(
-                config, clusterId, (c, e) -> new StandaloneClientHAServices(restServerAddress));
-    }
-
+    @Override
     public void cancelJob(FlinkDeployment deployment, UpgradeMode upgradeMode, Configuration conf)
             throws Exception {
         var deploymentStatus = deployment.getStatus();
@@ -504,30 +315,7 @@ public class FlinkService {
         }
     }
 
-    @VisibleForTesting
-    protected void waitForClusterShutdown(Configuration conf) {
-        FlinkUtils.waitForClusterShutdown(
-                kubernetesClient,
-                conf,
-                configManager
-                        .getOperatorConfiguration()
-                        .getFlinkShutdownClusterTimeout()
-                        .toSeconds());
-    }
-
-    public void deleteClusterDeployment(
-            ObjectMeta meta, FlinkDeploymentStatus status, boolean deleteHaData) {
-        FlinkUtils.deleteCluster(
-                status,
-                meta,
-                kubernetesClient,
-                deleteHaData,
-                configManager
-                        .getOperatorConfiguration()
-                        .getFlinkShutdownClusterTimeout()
-                        .toSeconds());
-    }
-
+    @Override
     public void cancelSessionJob(
             FlinkSessionJob sessionJob, UpgradeMode upgradeMode, Configuration conf)
             throws Exception {
@@ -536,7 +324,6 @@ public class FlinkService {
         var jobIdString = jobStatus.getJobId();
         Preconditions.checkNotNull(jobIdString, "The job to be suspend should not be null");
         var jobId = JobID.fromHexString(jobIdString);
-
         Optional<String> savepointOpt = Optional.empty();
         try (ClusterClient<String> clusterClient = getClusterClient(conf)) {
             final String clusterId = clusterClient.getClusterId();
@@ -592,7 +379,6 @@ public class FlinkService {
                     throw new RuntimeException("Unsupported upgrade mode " + upgradeMode);
             }
         }
-
         jobStatus.setState(JobStatus.FINISHED.name());
         savepointOpt.ifPresent(
                 location -> {
@@ -601,6 +387,7 @@ public class FlinkService {
                 });
     }
 
+    @Override
     public void triggerSavepoint(
             String jobId,
             SavepointTriggerType triggerType,
@@ -639,6 +426,7 @@ public class FlinkService {
         }
     }
 
+    @Override
     public Optional<Savepoint> getLastCheckpoint(JobID jobId, Configuration conf) throws Exception {
         try (RestClusterClient<String> clusterClient =
                 (RestClusterClient<String>) getClusterClient(conf)) {
@@ -675,6 +463,25 @@ public class FlinkService {
         }
     }
 
+    @Override
+    public void disposeSavepoint(String savepointPath, Configuration conf) throws Exception {
+        try (RestClusterClient<String> clusterClient =
+                (RestClusterClient<String>) getClusterClient(conf)) {
+            clusterClient
+                    .sendRequest(
+                            SavepointDisposalTriggerHeaders.getInstance(),
+                            EmptyMessageParameters.getInstance(),
+                            new SavepointDisposalRequest(savepointPath))
+                    .get(
+                            configManager
+                                    .getOperatorConfiguration()
+                                    .getFlinkClientTimeout()
+                                    .getSeconds(),
+                            TimeUnit.SECONDS);
+        }
+    }
+
+    @Override
     public SavepointFetchResult fetchSavepointInfo(
             String triggerId, String jobId, Configuration conf) {
         LOG.info("Fetching savepoint result with triggerId: " + triggerId);
@@ -716,23 +523,7 @@ public class FlinkService {
         }
     }
 
-    public void disposeSavepoint(String savepointPath, Configuration conf) throws Exception {
-        try (RestClusterClient<String> clusterClient =
-                (RestClusterClient<String>) getClusterClient(conf)) {
-            clusterClient
-                    .sendRequest(
-                            SavepointDisposalTriggerHeaders.getInstance(),
-                            EmptyMessageParameters.getInstance(),
-                            new SavepointDisposalRequest(savepointPath))
-                    .get(
-                            configManager
-                                    .getOperatorConfiguration()
-                                    .getFlinkClientTimeout()
-                                    .getSeconds(),
-                            TimeUnit.SECONDS);
-        }
-    }
-
+    @Override
     public Map<String, String> getClusterInfo(Configuration conf) throws Exception {
         Map<String, String> runtimeVersion = new HashMap<>();
 
@@ -762,6 +553,7 @@ public class FlinkService {
         return runtimeVersion;
     }
 
+    @Override
     public PodList getJmPodList(FlinkDeployment deployment, Configuration conf) {
         final String namespace = conf.getString(KubernetesConfigOptions.NAMESPACE);
         final String clusterId;
@@ -770,6 +562,207 @@ public class FlinkService {
         } catch (Exception ex) {
             throw new RuntimeException(ex);
         }
-        return FlinkUtils.getJmPodList(kubernetesClient, namespace, clusterId);
+        return getJmPodList(namespace, clusterId);
+    }
+
+    @Override
+    public void waitForClusterShutdown(Configuration conf) {
+        waitForClusterShutdown(
+                conf.getString(KubernetesConfigOptions.NAMESPACE),
+                conf.getString(KubernetesConfigOptions.CLUSTER_ID),
+                configManager
+                        .getOperatorConfiguration()
+                        .getFlinkShutdownClusterTimeout()
+                        .toSeconds());
+    }
+
+    @VisibleForTesting
+    protected ClusterClient<String> getClusterClient(Configuration conf) throws Exception {
+        final String clusterId = conf.get(KubernetesConfigOptions.CLUSTER_ID);
+        final String namespace = conf.get(KubernetesConfigOptions.NAMESPACE);
+        final int port = conf.getInteger(RestOptions.PORT);
+        final String host =
+                ObjectUtils.firstNonNull(
+                        configManager.getOperatorConfiguration().getFlinkServiceHostOverride(),
+                        ExternalServiceDecorator.getNamespacedExternalServiceName(
+                                clusterId, namespace));
+        final String restServerAddress = String.format("http://%s:%s", host, port);
+        LOG.debug("Creating RestClusterClient({})", restServerAddress);
+        return new RestClusterClient<>(
+                conf, clusterId, (c, e) -> new StandaloneClientHAServices(restServerAddress));
+    }
+
+    private JarRunResponseBody runJar(
+            JobSpec job,
+            JobID jobID,
+            JarUploadResponseBody response,
+            Configuration conf,
+            String savepoint) {
+        String jarId =
+                response.getFilename().substring(response.getFilename().lastIndexOf("/") + 1);
+        try (RestClusterClient<String> clusterClient =
+                (RestClusterClient<String>) getClusterClient(conf)) {
+            JarRunHeaders headers = JarRunHeaders.getInstance();
+            JarRunMessageParameters parameters = headers.getUnresolvedMessageParameters();
+            parameters.jarIdPathParameter.resolve(jarId);
+            JarRunRequestBody runRequestBody =
+                    new JarRunRequestBody(
+                            job.getEntryClass(),
+                            null,
+                            job.getArgs() == null ? null : Arrays.asList(job.getArgs()),
+                            job.getParallelism() > 0 ? job.getParallelism() : null,
+                            jobID,
+                            job.getAllowNonRestoredState(),
+                            savepoint,
+                            conf.get(FLINK_VERSION).isNewerVersionThan(FlinkVersion.v1_14)
+                                    ? RestoreMode.DEFAULT
+                                    : null);
+            LOG.info("Submitting job: {} to session cluster.", jobID.toHexString());
+            return clusterClient
+                    .sendRequest(headers, parameters, runRequestBody)
+                    .get(
+                            configManager
+                                    .getOperatorConfiguration()
+                                    .getFlinkClientTimeout()
+                                    .toSeconds(),
+                            TimeUnit.SECONDS);
+        } catch (Exception e) {
+            LOG.error("Failed to submit job to session cluster.", e);
+            throw new FlinkRuntimeException(e);
+        } finally {
+            deleteJar(conf, jarId);
+        }
+    }
+
+    private JarUploadResponseBody uploadJar(
+            ObjectMeta objectMeta, FlinkSessionJobSpec spec, Configuration conf) throws Exception {
+        String targetDir = artifactManager.generateJarDir(objectMeta, spec);
+        File jarFile = artifactManager.fetch(spec.getJob().getJarURI(), conf, targetDir);
+        Preconditions.checkArgument(
+                jarFile.exists(),
+                String.format("The jar file %s not exists", jarFile.getAbsolutePath()));
+        JarUploadHeaders headers = JarUploadHeaders.getInstance();
+        String clusterId = spec.getDeploymentName();
+        String namespace = objectMeta.getNamespace();
+        int port = conf.getInteger(RestOptions.PORT);
+        String host =
+                ObjectUtils.firstNonNull(
+                        configManager.getOperatorConfiguration().getFlinkServiceHostOverride(),
+                        ExternalServiceDecorator.getNamespacedExternalServiceName(
+                                clusterId, namespace));
+        try (RestClient restClient = new RestClient(conf, executorService)) {
+            // TODO add method in flink#RestClusterClient to support upload jar.
+            return restClient
+                    .sendRequest(
+                            host,
+                            port,
+                            headers,
+                            EmptyMessageParameters.getInstance(),
+                            EmptyRequestBody.getInstance(),
+                            Collections.singletonList(
+                                    new FileUpload(
+                                            jarFile.toPath(), RestConstants.CONTENT_TYPE_JAR)))
+                    .get(
+                            configManager
+                                    .getOperatorConfiguration()
+                                    .getFlinkClientTimeout()
+                                    .toSeconds(),
+                            TimeUnit.SECONDS);
+        } finally {
+            LOG.debug("Deleting the jar file {}", jarFile);
+            FileUtils.deleteFileOrDirectory(jarFile);
+        }
+    }
+
+    private void deleteJar(Configuration conf, String jarId) {
+        LOG.debug("Deleting the jar: {}", jarId);
+        try (RestClusterClient<String> clusterClient =
+                (RestClusterClient<String>) getClusterClient(conf)) {
+            JarDeleteHeaders headers = JarDeleteHeaders.getInstance();
+            JarDeleteMessageParameters parameters = headers.getUnresolvedMessageParameters();
+            parameters.jarIdPathParameter.resolve(jarId);
+            clusterClient
+                    .sendRequest(headers, parameters, EmptyRequestBody.getInstance())
+                    .get(
+                            configManager
+                                    .getOperatorConfiguration()
+                                    .getFlinkClientTimeout()
+                                    .toSeconds(),
+                            TimeUnit.SECONDS);
+        } catch (Exception e) {
+            LOG.error("Failed to delete the jar: {}.", jarId, e);
+        }
+    }
+
+    /** Wait until the FLink cluster has completely shut down. */
+    @VisibleForTesting
+    void waitForClusterShutdown(String namespace, String clusterId, long shutdownTimeout) {
+
+        boolean jobManagerRunning = true;
+        boolean serviceRunning = true;
+
+        for (int i = 0; i < shutdownTimeout; i++) {
+            if (jobManagerRunning) {
+                PodList jmPodList = getJmPodList(namespace, clusterId);
+
+                if (jmPodList == null || jmPodList.getItems().isEmpty()) {
+                    jobManagerRunning = false;
+                }
+            }
+
+            if (serviceRunning) {
+                Service service =
+                        kubernetesClient
+                                .services()
+                                .inNamespace(namespace)
+                                .withName(
+                                        ExternalServiceDecorator.getExternalServiceName(clusterId))
+                                .fromServer()
+                                .get();
+                if (service == null) {
+                    serviceRunning = false;
+                }
+            }
+
+            if (!jobManagerRunning && !serviceRunning) {
+                break;
+            }
+            // log a message waiting to shutdown Flink cluster every 5 seconds.
+            if ((i + 1) % 5 == 0) {
+                LOG.info("Waiting for cluster shutdown... ({}s)", i + 1);
+            }
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        LOG.info("Cluster shutdown completed.");
+    }
+
+    private static List<JobStatusMessage> toJobStatusMessage(
+            MultipleJobsDetails multipleJobsDetails) {
+        return multipleJobsDetails.getJobs().stream()
+                .map(
+                        details ->
+                                new JobStatusMessage(
+                                        details.getJobId(),
+                                        details.getJobName(),
+                                        getEffectiveStatus(details),
+                                        details.getStartTime()))
+                .collect(Collectors.toList());
+    }
+
+    @VisibleForTesting
+    protected static JobStatus getEffectiveStatus(JobDetails details) {
+        int numRunning = details.getTasksPerState()[ExecutionState.RUNNING.ordinal()];
+        int numFinished = details.getTasksPerState()[ExecutionState.FINISHED.ordinal()];
+        boolean allRunningOrFinished = details.getNumTasks() == (numRunning + numFinished);
+        JobStatus effectiveStatus = details.getStatus();
+        if (JobStatus.RUNNING.equals(effectiveStatus) && !allRunningOrFinished) {
+            effectiveStatus = JobStatus.CREATED;
+            LOG.debug("Adjusting job state from {} to {}", JobStatus.RUNNING, effectiveStatus);
+        }
+        return effectiveStatus;
     }
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index 2deccaee..d7aa1b98 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -17,759 +17,80 @@
 
 package org.apache.flink.kubernetes.operator.service;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.client.cli.ApplicationDeployer;
-import org.apache.flink.client.deployment.ClusterClientFactory;
-import org.apache.flink.client.deployment.ClusterClientServiceLoader;
-import org.apache.flink.client.deployment.ClusterDescriptor;
-import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
-import org.apache.flink.client.deployment.application.ApplicationConfiguration;
-import org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer;
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.rest.RestClusterClient;
-import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.core.execution.SavepointFormatType;
-import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
-import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
-import org.apache.flink.kubernetes.operator.artifact.ArtifactManager;
-import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec;
-import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
-import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
 import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
-import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
 import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
-import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
-import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
-import org.apache.flink.runtime.jobgraph.RestoreMode;
 import org.apache.flink.runtime.jobmaster.JobResult;
-import org.apache.flink.runtime.messages.webmonitor.JobDetails;
-import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
-import org.apache.flink.runtime.rest.FileUpload;
-import org.apache.flink.runtime.rest.RestClient;
-import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
-import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
-import org.apache.flink.runtime.rest.messages.DashboardConfiguration;
-import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
-import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
-import org.apache.flink.runtime.rest.messages.TriggerId;
-import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest;
-import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalTriggerHeaders;
-import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo;
-import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders;
-import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters;
-import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders;
-import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters;
-import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
-import org.apache.flink.runtime.rest.util.RestConstants;
-import org.apache.flink.runtime.state.memory.NonPersistentMetadataCheckpointStorageLocation;
-import org.apache.flink.runtime.webmonitor.handlers.JarDeleteHeaders;
-import org.apache.flink.runtime.webmonitor.handlers.JarDeleteMessageParameters;
-import org.apache.flink.runtime.webmonitor.handlers.JarRunHeaders;
-import org.apache.flink.runtime.webmonitor.handlers.JarRunMessageParameters;
-import org.apache.flink.runtime.webmonitor.handlers.JarRunRequestBody;
-import org.apache.flink.runtime.webmonitor.handlers.JarRunResponseBody;
-import org.apache.flink.runtime.webmonitor.handlers.JarUploadHeaders;
-import org.apache.flink.runtime.webmonitor.handlers.JarUploadResponseBody;
-import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
-import org.apache.flink.util.FileUtils;
-import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
 import io.fabric8.kubernetes.api.model.ObjectMeta;
 import io.fabric8.kubernetes.api.model.PodList;
 import io.fabric8.kubernetes.client.KubernetesClient;
-import org.apache.commons.lang3.ObjectUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketAddress;
-import java.net.URI;
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.FLINK_VERSION;
 
 /** Service for submitting and interacting with Flink clusters and jobs. */
-public class FlinkService {
-
-    private static final Logger LOG = LoggerFactory.getLogger(FlinkService.class);
-
-    private final KubernetesClient kubernetesClient;
-    private final ArtifactManager artifactManager;
-    private final FlinkConfigManager configManager;
-    private final ExecutorService executorService;
-
-    public FlinkService(KubernetesClient kubernetesClient, FlinkConfigManager configManager) {
-        this.kubernetesClient = kubernetesClient;
-        this.artifactManager = new ArtifactManager(configManager);
-        this.configManager = configManager;
-        this.executorService =
-                Executors.newFixedThreadPool(
-                        4, new ExecutorThreadFactory("Flink-RestClusterClient-IO"));
-    }
-
-    private static List<JobStatusMessage> toJobStatusMessage(
-            MultipleJobsDetails multipleJobsDetails) {
-        return multipleJobsDetails.getJobs().stream()
-                .map(
-                        details ->
-                                new JobStatusMessage(
-                                        details.getJobId(),
-                                        details.getJobName(),
-                                        getEffectiveStatus(details),
-                                        details.getStartTime()))
-                .collect(Collectors.toList());
-    }
-
-    @VisibleForTesting
-    protected static JobStatus getEffectiveStatus(JobDetails details) {
-        int numRunning = details.getTasksPerState()[ExecutionState.RUNNING.ordinal()];
-        int numFinished = details.getTasksPerState()[ExecutionState.FINISHED.ordinal()];
-        boolean allRunningOrFinished = details.getNumTasks() == (numRunning + numFinished);
-        JobStatus effectiveStatus = details.getStatus();
-        if (JobStatus.RUNNING.equals(effectiveStatus) && !allRunningOrFinished) {
-            effectiveStatus = JobStatus.CREATED;
-            LOG.debug("Adjusting job state from {} to {}", JobStatus.RUNNING, effectiveStatus);
-        }
-        return effectiveStatus;
-    }
-
-    public KubernetesClient getKubernetesClient() {
-        return kubernetesClient;
-    }
-
-    public void submitApplicationCluster(
-            JobSpec jobSpec, Configuration conf, boolean requireHaMetadata) throws Exception {
-        LOG.info(
-                "Deploying application cluster{}",
-                requireHaMetadata ? " requiring last-state from HA metadata" : "");
-        if (FlinkUtils.isKubernetesHAActivated(conf)) {
-            final String clusterId = conf.get(KubernetesConfigOptions.CLUSTER_ID);
-            final String namespace = conf.get(KubernetesConfigOptions.NAMESPACE);
-            // Delete the job graph in the HA ConfigMaps so that the newly changed job config(e.g.
-            // parallelism) could take effect
-            FlinkUtils.deleteJobGraphInKubernetesHA(clusterId, namespace, kubernetesClient);
-        }
-        if (requireHaMetadata) {
-            validateHaMetadataExists(conf);
-        }
-        final ClusterClientServiceLoader clusterClientServiceLoader =
-                new DefaultClusterClientServiceLoader();
-        final ApplicationDeployer deployer =
-                new ApplicationClusterDeployer(clusterClientServiceLoader);
+public interface FlinkService {
 
-        final ApplicationConfiguration applicationConfiguration =
-                new ApplicationConfiguration(
-                        jobSpec.getArgs() != null ? jobSpec.getArgs() : new String[0],
-                        jobSpec.getEntryClass());
+    KubernetesClient getKubernetesClient();
 
-        deployer.run(conf, applicationConfiguration);
-        LOG.info("Application cluster successfully deployed");
-    }
+    void submitApplicationCluster(JobSpec jobSpec, Configuration conf, boolean requireHaMetadata)
+            throws Exception;
 
-    public boolean isHaMetadataAvailable(Configuration conf) {
-        return FlinkUtils.isHaMetadataAvailable(conf, kubernetesClient);
-    }
+    boolean isHaMetadataAvailable(Configuration conf);
 
-    protected void validateHaMetadataExists(Configuration conf) {
-        if (!isHaMetadataAvailable(conf)) {
-            throw new DeploymentFailedException(
-                    "HA metadata not available to restore from last state. "
-                            + "It is possible that the job has finished or terminally failed, or the configmaps have been deleted. "
-                            + "Manual restore required.",
-                    "RestoreFailed");
-        }
-    }
+    void submitSessionCluster(Configuration conf) throws Exception;
 
-    public void submitSessionCluster(Configuration conf) throws Exception {
-        LOG.info("Deploying session cluster");
-        final ClusterClientServiceLoader clusterClientServiceLoader =
-                new DefaultClusterClientServiceLoader();
-        final ClusterClientFactory<String> kubernetesClusterClientFactory =
-                clusterClientServiceLoader.getClusterClientFactory(conf);
-        try (final ClusterDescriptor<String> kubernetesClusterDescriptor =
-                kubernetesClusterClientFactory.createClusterDescriptor(conf)) {
-            kubernetesClusterDescriptor.deploySessionCluster(
-                    kubernetesClusterClientFactory.getClusterSpecification(conf));
-        }
-        LOG.info("Session cluster successfully deployed");
-    }
-
-    public JobID submitJobToSessionCluster(
+    JobID submitJobToSessionCluster(
             ObjectMeta meta,
             FlinkSessionJobSpec spec,
             Configuration conf,
             @Nullable String savepoint)
-            throws Exception {
-        // we generate jobID in advance to help deduplicate job submission.
-        var jobID = FlinkUtils.generateSessionJobFixedJobID(meta);
-        runJar(spec.getJob(), jobID, uploadJar(meta, spec, conf), conf, savepoint);
-        LOG.info("Submitted job: {} to session cluster.", jobID);
-        return jobID;
-    }
-
-    private JarRunResponseBody runJar(
-            JobSpec job,
-            JobID jobID,
-            JarUploadResponseBody response,
-            Configuration conf,
-            String savepoint) {
-        String jarId =
-                response.getFilename().substring(response.getFilename().lastIndexOf("/") + 1);
-        try (RestClusterClient<String> clusterClient =
-                (RestClusterClient<String>) getClusterClient(conf)) {
-            JarRunHeaders headers = JarRunHeaders.getInstance();
-            JarRunMessageParameters parameters = headers.getUnresolvedMessageParameters();
-            parameters.jarIdPathParameter.resolve(jarId);
-            JarRunRequestBody runRequestBody =
-                    new JarRunRequestBody(
-                            job.getEntryClass(),
-                            null,
-                            job.getArgs() == null ? null : Arrays.asList(job.getArgs()),
-                            job.getParallelism() > 0 ? job.getParallelism() : null,
-                            jobID,
-                            job.getAllowNonRestoredState(),
-                            savepoint,
-                            conf.get(FLINK_VERSION).isNewerVersionThan(FlinkVersion.v1_14)
-                                    ? RestoreMode.DEFAULT
-                                    : null);
-            LOG.info("Submitting job: {} to session cluster.", jobID.toHexString());
-            return clusterClient
-                    .sendRequest(headers, parameters, runRequestBody)
-                    .get(
-                            configManager
-                                    .getOperatorConfiguration()
-                                    .getFlinkClientTimeout()
-                                    .toSeconds(),
-                            TimeUnit.SECONDS);
-        } catch (Exception e) {
-            LOG.error("Failed to submit job to session cluster.", e);
-            throw new FlinkRuntimeException(e);
-        } finally {
-            deleteJar(conf, jarId);
-        }
-    }
-
-    private JarUploadResponseBody uploadJar(
-            ObjectMeta objectMeta, FlinkSessionJobSpec spec, Configuration conf) throws Exception {
-        String targetDir = artifactManager.generateJarDir(objectMeta, spec);
-        File jarFile = artifactManager.fetch(spec.getJob().getJarURI(), conf, targetDir);
-        Preconditions.checkArgument(
-                jarFile.exists(),
-                String.format("The jar file %s not exists", jarFile.getAbsolutePath()));
-        JarUploadHeaders headers = JarUploadHeaders.getInstance();
-        String clusterId = spec.getDeploymentName();
-        String namespace = objectMeta.getNamespace();
-        int port = conf.getInteger(RestOptions.PORT);
-        String host =
-                ObjectUtils.firstNonNull(
-                        configManager.getOperatorConfiguration().getFlinkServiceHostOverride(),
-                        ExternalServiceDecorator.getNamespacedExternalServiceName(
-                                clusterId, namespace));
-        try (RestClient restClient = new RestClient(conf, executorService)) {
-            // TODO add method in flink#RestClusterClient to support upload jar.
-            return restClient
-                    .sendRequest(
-                            host,
-                            port,
-                            headers,
-                            EmptyMessageParameters.getInstance(),
-                            EmptyRequestBody.getInstance(),
-                            Collections.singletonList(
-                                    new FileUpload(
-                                            jarFile.toPath(), RestConstants.CONTENT_TYPE_JAR)))
-                    .get(
-                            configManager
-                                    .getOperatorConfiguration()
-                                    .getFlinkClientTimeout()
-                                    .toSeconds(),
-                            TimeUnit.SECONDS);
-        } finally {
-            LOG.debug("Deleting the jar file {}", jarFile);
-            FileUtils.deleteFileOrDirectory(jarFile);
-        }
-    }
-
-    private void deleteJar(Configuration conf, String jarId) {
-        LOG.debug("Deleting the jar: {}", jarId);
-        try (RestClusterClient<String> clusterClient =
-                (RestClusterClient<String>) getClusterClient(conf)) {
-            JarDeleteHeaders headers = JarDeleteHeaders.getInstance();
-            JarDeleteMessageParameters parameters = headers.getUnresolvedMessageParameters();
-            parameters.jarIdPathParameter.resolve(jarId);
-            clusterClient
-                    .sendRequest(headers, parameters, EmptyRequestBody.getInstance())
-                    .get(
-                            configManager
-                                    .getOperatorConfiguration()
-                                    .getFlinkClientTimeout()
-                                    .toSeconds(),
-                            TimeUnit.SECONDS);
-        } catch (Exception e) {
-            LOG.error("Failed to delete the jar: {}.", jarId, e);
-        }
-    }
-
-    public boolean isJobManagerPortReady(Configuration config) {
-        final URI uri;
-        try (ClusterClient<String> clusterClient = getClusterClient(config)) {
-            uri = URI.create(clusterClient.getWebInterfaceURL());
-        } catch (Exception ex) {
-            throw new RuntimeException(ex);
-        }
-        SocketAddress socketAddress = new InetSocketAddress(uri.getHost(), uri.getPort());
-        Socket socket = new Socket();
-        try {
-            socket.connect(socketAddress, 1000);
-            socket.close();
-            return true;
-        } catch (IOException e) {
-        }
-        return false;
-    }
-
-    public Collection<JobStatusMessage> listJobs(Configuration conf) throws Exception {
-        try (RestClusterClient<String> clusterClient =
-                (RestClusterClient<String>) getClusterClient(conf)) {
-            return clusterClient
-                    .sendRequest(
-                            JobsOverviewHeaders.getInstance(),
-                            EmptyMessageParameters.getInstance(),
-                            EmptyRequestBody.getInstance())
-                    .thenApply(FlinkService::toJobStatusMessage)
-                    .get(
-                            configManager
-                                    .getOperatorConfiguration()
-                                    .getFlinkClientTimeout()
-                                    .toSeconds(),
-                            TimeUnit.SECONDS);
-        }
-    }
-
-    public JobResult requestJobResult(Configuration conf, JobID jobID) throws Exception {
-        try (ClusterClient<String> clusterClient = getClusterClient(conf)) {
-            return clusterClient
-                    .requestJobResult(jobID)
-                    .get(
-                            configManager
-                                    .getOperatorConfiguration()
-                                    .getFlinkClientTimeout()
-                                    .getSeconds(),
-                            TimeUnit.SECONDS);
-        }
-    }
+            throws Exception;
 
-    @VisibleForTesting
-    protected ClusterClient<String> getClusterClient(Configuration config) throws Exception {
-        final String clusterId = config.get(KubernetesConfigOptions.CLUSTER_ID);
-        final String namespace = config.get(KubernetesConfigOptions.NAMESPACE);
-        final int port = config.getInteger(RestOptions.PORT);
-        final String host =
-                ObjectUtils.firstNonNull(
-                        configManager.getOperatorConfiguration().getFlinkServiceHostOverride(),
-                        ExternalServiceDecorator.getNamespacedExternalServiceName(
-                                clusterId, namespace));
-        final String restServerAddress = String.format("http://%s:%s", host, port);
-        LOG.debug("Creating RestClusterClient({})", restServerAddress);
-        return new RestClusterClient<>(
-                config, clusterId, (c, e) -> new StandaloneClientHAServices(restServerAddress));
-    }
+    boolean isJobManagerPortReady(Configuration config);
 
-    public void cancelJob(FlinkDeployment deployment, UpgradeMode upgradeMode, Configuration conf)
-            throws Exception {
-        var deploymentStatus = deployment.getStatus();
-        var jobIdString = deploymentStatus.getJobStatus().getJobId();
-        var jobId = jobIdString != null ? JobID.fromHexString(jobIdString) : null;
+    Collection<JobStatusMessage> listJobs(Configuration conf) throws Exception;
 
-        Optional<String> savepointOpt = Optional.empty();
-        try (ClusterClient<String> clusterClient = getClusterClient(conf)) {
-            var clusterId = clusterClient.getClusterId();
-            switch (upgradeMode) {
-                case STATELESS:
-                    if (ReconciliationUtils.isJobRunning(deployment.getStatus())) {
-                        LOG.info("Job is running, cancelling job.");
-                        try {
-                            clusterClient
-                                    .cancel(Preconditions.checkNotNull(jobId))
-                                    .get(
-                                            configManager
-                                                    .getOperatorConfiguration()
-                                                    .getFlinkCancelJobTimeout()
-                                                    .toSeconds(),
-                                            TimeUnit.SECONDS);
-                            LOG.info("Job successfully cancelled.");
-                        } catch (Exception e) {
-                            LOG.error("Could not shut down cluster gracefully, deleting...", e);
-                        }
-                    }
-                    deleteClusterDeployment(deployment.getMetadata(), deploymentStatus, true);
-                    break;
-                case SAVEPOINT:
-                    final String savepointDirectory =
-                            Preconditions.checkNotNull(
-                                    conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY));
-                    final long timeout =
-                            conf.get(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT)
-                                    .getSeconds();
-                    try {
-                        if (ReconciliationUtils.isJobRunning(deploymentStatus)) {
-                            LOG.info("Suspending job with savepoint.");
-                            String savepoint =
-                                    clusterClient
-                                            .stopWithSavepoint(
-                                                    Preconditions.checkNotNull(jobId),
-                                                    false,
-                                                    savepointDirectory,
-                                                    conf.get(FLINK_VERSION)
-                                                                    .isNewerVersionThan(
-                                                                            FlinkVersion.v1_14)
-                                                            ? SavepointFormatType.DEFAULT
-                                                            : null)
-                                            .get(timeout, TimeUnit.SECONDS);
-                            savepointOpt = Optional.of(savepoint);
-                            LOG.info("Job successfully suspended with savepoint {}.", savepoint);
-                        } else if (ReconciliationUtils.isJobInTerminalState(deploymentStatus)) {
-                            LOG.info(
-                                    "Job is already in terminal state skipping cancel-with-savepoint operation.");
-                        } else {
-                            throw new RuntimeException(
-                                    "Unexpected non-terminal status: " + deploymentStatus);
-                        }
-                    } catch (TimeoutException exception) {
-                        throw new FlinkException(
-                                String.format(
-                                        "Timed out stopping the job %s in Flink cluster %s with savepoint, "
-                                                + "please configure a larger timeout via '%s'",
-                                        jobId,
-                                        clusterId,
-                                        ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT.key()),
-                                exception);
-                    }
-                    break;
-                case LAST_STATE:
-                    deleteClusterDeployment(deployment.getMetadata(), deploymentStatus, false);
-                    break;
-                default:
-                    throw new RuntimeException("Unsupported upgrade mode " + upgradeMode);
-            }
-        }
-        deploymentStatus.getJobStatus().setState(JobStatus.FINISHED.name());
-        savepointOpt.ifPresent(
-                location -> {
-                    Savepoint sp = Savepoint.of(location, SavepointTriggerType.UPGRADE);
-                    deploymentStatus.getJobStatus().getSavepointInfo().updateLastSavepoint(sp);
-                });
+    JobResult requestJobResult(Configuration conf, JobID jobID) throws Exception;
 
-        var shutdownDisabled =
-                upgradeMode != UpgradeMode.LAST_STATE
-                        && FlinkUtils.clusterShutdownDisabled(
-                                ReconciliationUtils.getDeployedSpec(deployment));
-        if (!shutdownDisabled) {
-            waitForClusterShutdown(conf);
-            deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
-        }
-    }
+    void cancelJob(FlinkDeployment deployment, UpgradeMode upgradeMode, Configuration conf)
+            throws Exception;
 
-    @VisibleForTesting
-    protected void waitForClusterShutdown(Configuration conf) {
-        FlinkUtils.waitForClusterShutdown(
-                kubernetesClient,
-                conf,
-                configManager
-                        .getOperatorConfiguration()
-                        .getFlinkShutdownClusterTimeout()
-                        .toSeconds());
-    }
+    void deleteClusterDeployment(
+            ObjectMeta meta, FlinkDeploymentStatus status, boolean deleteHaData);
 
-    public void deleteClusterDeployment(
-            ObjectMeta meta, FlinkDeploymentStatus status, boolean deleteHaData) {
-        FlinkUtils.deleteCluster(
-                status,
-                meta,
-                kubernetesClient,
-                deleteHaData,
-                configManager
-                        .getOperatorConfiguration()
-                        .getFlinkShutdownClusterTimeout()
-                        .toSeconds());
-    }
+    void cancelSessionJob(FlinkSessionJob sessionJob, UpgradeMode upgradeMode, Configuration conf)
+            throws Exception;
 
-    public void cancelSessionJob(
-            FlinkSessionJob sessionJob, UpgradeMode upgradeMode, Configuration conf)
-            throws Exception {
-
-        var jobStatus = sessionJob.getStatus().getJobStatus();
-        var jobIdString = jobStatus.getJobId();
-        Preconditions.checkNotNull(jobIdString, "The job to be suspend should not be null");
-        var jobId = JobID.fromHexString(jobIdString);
-
-        Optional<String> savepointOpt = Optional.empty();
-        try (ClusterClient<String> clusterClient = getClusterClient(conf)) {
-            final String clusterId = clusterClient.getClusterId();
-            switch (upgradeMode) {
-                case STATELESS:
-                    LOG.info("Cancelling job.");
-                    clusterClient
-                            .cancel(jobId)
-                            .get(
-                                    configManager
-                                            .getOperatorConfiguration()
-                                            .getFlinkCancelJobTimeout()
-                                            .toSeconds(),
-                                    TimeUnit.SECONDS);
-                    LOG.info("Job successfully cancelled.");
-                    break;
-                case SAVEPOINT:
-                    LOG.info("Suspending job with savepoint.");
-                    final String savepointDirectory =
-                            Preconditions.checkNotNull(
-                                    conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY));
-                    final long timeout =
-                            conf.get(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT)
-                                    .getSeconds();
-                    try {
-                        String savepoint =
-                                clusterClient
-                                        .stopWithSavepoint(
-                                                jobId,
-                                                false,
-                                                savepointDirectory,
-                                                conf.get(FLINK_VERSION)
-                                                                .isNewerVersionThan(
-                                                                        FlinkVersion.v1_14)
-                                                        ? SavepointFormatType.DEFAULT
-                                                        : null)
-                                        .get(timeout, TimeUnit.SECONDS);
-                        savepointOpt = Optional.of(savepoint);
-                        LOG.info("Job successfully suspended with savepoint {}.", savepoint);
-                    } catch (TimeoutException exception) {
-                        throw new FlinkException(
-                                String.format(
-                                        "Timed out stopping the job %s in Flink cluster %s with savepoint, "
-                                                + "please configure a larger timeout via '%s'",
-                                        jobId,
-                                        clusterId,
-                                        ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT.key()),
-                                exception);
-                    }
-                    break;
-                case LAST_STATE:
-                default:
-                    throw new RuntimeException("Unsupported upgrade mode " + upgradeMode);
-            }
-        }
-
-        jobStatus.setState(JobStatus.FINISHED.name());
-        savepointOpt.ifPresent(
-                location -> {
-                    Savepoint sp = Savepoint.of(location, SavepointTriggerType.UPGRADE);
-                    jobStatus.getSavepointInfo().updateLastSavepoint(sp);
-                });
-    }
-
-    public void triggerSavepoint(
+    void triggerSavepoint(
             String jobId,
             SavepointTriggerType triggerType,
             org.apache.flink.kubernetes.operator.crd.status.SavepointInfo savepointInfo,
             Configuration conf)
-            throws Exception {
-        LOG.info("Triggering new savepoint");
-        try (RestClusterClient<String> clusterClient =
-                (RestClusterClient<String>) getClusterClient(conf)) {
-            SavepointTriggerHeaders savepointTriggerHeaders = SavepointTriggerHeaders.getInstance();
-            SavepointTriggerMessageParameters savepointTriggerMessageParameters =
-                    savepointTriggerHeaders.getUnresolvedMessageParameters();
-            savepointTriggerMessageParameters.jobID.resolve(JobID.fromHexString(jobId));
-
-            final String savepointDirectory =
-                    Preconditions.checkNotNull(conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY));
-            final long timeout =
-                    configManager.getOperatorConfiguration().getFlinkClientTimeout().getSeconds();
-            TriggerResponse response =
-                    clusterClient
-                            .sendRequest(
-                                    savepointTriggerHeaders,
-                                    savepointTriggerMessageParameters,
-                                    new SavepointTriggerRequestBody(
-                                            savepointDirectory,
-                                            false,
-                                            conf.get(FLINK_VERSION)
-                                                            .isNewerVersionThan(FlinkVersion.v1_14)
-                                                    ? SavepointFormatType.DEFAULT
-                                                    : null,
-                                            null))
-                            .get(timeout, TimeUnit.SECONDS);
-            LOG.info("Savepoint successfully triggered: " + response.getTriggerId().toHexString());
-
-            savepointInfo.setTrigger(response.getTriggerId().toHexString(), triggerType);
-        }
-    }
-
-    public Optional<Savepoint> getLastCheckpoint(JobID jobId, Configuration conf) throws Exception {
-        try (RestClusterClient<String> clusterClient =
-                (RestClusterClient<String>) getClusterClient(conf)) {
-
-            var headers = CustomCheckpointingStatisticsHeaders.getInstance();
-            var params = headers.getUnresolvedMessageParameters();
-            params.jobPathParameter.resolve(jobId);
-
-            CompletableFuture<CheckpointHistoryWrapper> response =
-                    clusterClient.sendRequest(headers, params, EmptyRequestBody.getInstance());
-
-            var checkpoints =
-                    response.get(
-                            configManager
-                                    .getOperatorConfiguration()
-                                    .getFlinkClientTimeout()
-                                    .getSeconds(),
-                            TimeUnit.SECONDS);
-
-            var latestCheckpointOpt = checkpoints.getLatestCheckpointPath();
-
-            if (latestCheckpointOpt.isPresent()
-                    && latestCheckpointOpt
-                            .get()
-                            .equals(
-                                    NonPersistentMetadataCheckpointStorageLocation
-                                            .EXTERNAL_POINTER)) {
-                throw new DeploymentFailedException(
-                        "Latest checkpoint not externally addressable, manual recovery required.",
-                        "CheckpointNotFound");
-            }
-            return latestCheckpointOpt.map(
-                    pointer -> Savepoint.of(pointer, SavepointTriggerType.UNKNOWN));
-        }
-    }
-
-    public SavepointFetchResult fetchSavepointInfo(
-            String triggerId, String jobId, Configuration conf) {
-        LOG.info("Fetching savepoint result with triggerId: " + triggerId);
-        try (RestClusterClient<String> clusterClient =
-                (RestClusterClient<String>) getClusterClient(conf)) {
-            SavepointStatusHeaders savepointStatusHeaders = SavepointStatusHeaders.getInstance();
-            SavepointStatusMessageParameters savepointStatusMessageParameters =
-                    savepointStatusHeaders.getUnresolvedMessageParameters();
-            savepointStatusMessageParameters.jobIdPathParameter.resolve(JobID.fromHexString(jobId));
-            savepointStatusMessageParameters.triggerIdPathParameter.resolve(
-                    TriggerId.fromHexString(triggerId));
-            CompletableFuture<AsynchronousOperationResult<SavepointInfo>> response =
-                    clusterClient.sendRequest(
-                            savepointStatusHeaders,
-                            savepointStatusMessageParameters,
-                            EmptyRequestBody.getInstance());
-
-            if (response.get() == null || response.get().resource() == null) {
-                return SavepointFetchResult.pending();
-            }
-
-            if (response.get().resource().getLocation() == null) {
-                if (response.get().resource().getFailureCause() != null) {
-                    LOG.error(
-                            "Failure occurred while fetching the savepoint result",
-                            response.get().resource().getFailureCause());
-                    return SavepointFetchResult.error(
-                            response.get().resource().getFailureCause().toString());
-                } else {
-                    return SavepointFetchResult.pending();
-                }
-            }
-            String location = response.get().resource().getLocation();
-            LOG.info("Savepoint result: {}", location);
-            return SavepointFetchResult.completed(location);
-        } catch (Exception e) {
-            LOG.error("Exception while fetching the savepoint result", e);
-            return SavepointFetchResult.error(e.getMessage());
-        }
-    }
+            throws Exception;
 
-    public void disposeSavepoint(String savepointPath, Configuration conf) throws Exception {
-        try (RestClusterClient<String> clusterClient =
-                (RestClusterClient<String>) getClusterClient(conf)) {
-            clusterClient
-                    .sendRequest(
-                            SavepointDisposalTriggerHeaders.getInstance(),
-                            EmptyMessageParameters.getInstance(),
-                            new SavepointDisposalRequest(savepointPath))
-                    .get(
-                            configManager
-                                    .getOperatorConfiguration()
-                                    .getFlinkClientTimeout()
-                                    .getSeconds(),
-                            TimeUnit.SECONDS);
-        }
-    }
+    Optional<Savepoint> getLastCheckpoint(JobID jobId, Configuration conf) throws Exception;
 
-    public Map<String, String> getClusterInfo(Configuration conf) throws Exception {
-        Map<String, String> runtimeVersion = new HashMap<>();
+    SavepointFetchResult fetchSavepointInfo(String triggerId, String jobId, Configuration conf);
 
-        try (RestClusterClient<String> clusterClient =
-                (RestClusterClient<String>) getClusterClient(conf)) {
+    void disposeSavepoint(String savepointPath, Configuration conf) throws Exception;
 
-            CustomDashboardConfiguration dashboardConfiguration =
-                    clusterClient
-                            .sendRequest(
-                                    CustomDashboardConfigurationHeaders.getInstance(),
-                                    EmptyMessageParameters.getInstance(),
-                                    EmptyRequestBody.getInstance())
-                            .get(
-                                    configManager
-                                            .getOperatorConfiguration()
-                                            .getFlinkClientTimeout()
-                                            .toSeconds(),
-                                    TimeUnit.SECONDS);
+    Map<String, String> getClusterInfo(Configuration conf) throws Exception;
 
-            runtimeVersion.put(
-                    DashboardConfiguration.FIELD_NAME_FLINK_VERSION,
-                    dashboardConfiguration.getFlinkVersion());
-            runtimeVersion.put(
-                    DashboardConfiguration.FIELD_NAME_FLINK_REVISION,
-                    dashboardConfiguration.getFlinkRevision());
-        }
-        return runtimeVersion;
-    }
+    PodList getJmPodList(FlinkDeployment deployment, Configuration conf);
 
-    public PodList getJmPodList(FlinkDeployment deployment, Configuration conf) {
-        final String namespace = conf.getString(KubernetesConfigOptions.NAMESPACE);
-        final String clusterId;
-        try (ClusterClient<String> clusterClient = getClusterClient(conf)) {
-            clusterId = clusterClient.getClusterId();
-        } catch (Exception ex) {
-            throw new RuntimeException(ex);
-        }
-        return FlinkUtils.getJmPodList(kubernetesClient, namespace, clusterId);
-    }
+    void waitForClusterShutdown(Configuration conf);
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkServiceFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkServiceFactory.java
new file mode 100644
index 00000000..761f1e68
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkServiceFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.service;
+
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.KubernetesDeploymentMode;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** The factory to create the FlinkService based on the {@link FlinkDeployment} mode. */
+public class FlinkServiceFactory {
+
+    private final KubernetesClient kubernetesClient;
+    private final FlinkConfigManager configManager;
+    private final Map<KubernetesDeploymentMode, FlinkService> serviceMap;
+
+    public FlinkServiceFactory(
+            KubernetesClient kubernetesClient, FlinkConfigManager configManager) {
+        this.kubernetesClient = kubernetesClient;
+        this.configManager = configManager;
+        this.serviceMap = new ConcurrentHashMap<>();
+    }
+
+    public FlinkService getOrCreate(KubernetesDeploymentMode deploymentMode) {
+        return serviceMap.computeIfAbsent(
+                deploymentMode,
+                mode -> {
+                    switch (mode) {
+                        case NATIVE:
+                            return new NativeFlinkService(kubernetesClient, configManager);
+                        case STANDALONE:
+                            return new StandaloneFlinkService(kubernetesClient, configManager);
+                        default:
+                            throw new UnsupportedOperationException(
+                                    String.format("Unsupported deployment mode: %s", mode));
+                    }
+                });
+    }
+
+    public FlinkService getOrCreate(FlinkDeployment deployment) {
+        return getOrCreate(getDeploymentMode(deployment));
+    }
+
+    private KubernetesDeploymentMode getDeploymentMode(FlinkDeployment deployment) {
+        return KubernetesDeploymentMode.getDeploymentMode(deployment);
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
new file mode 100644
index 00000000..60b5ee3b
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.service;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.client.cli.ApplicationDeployer;
+import org.apache.flink.client.deployment.ClusterClientFactory;
+import org.apache.flink.client.deployment.ClusterClientServiceLoader;
+import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
+import org.apache.flink.client.deployment.application.ApplicationConfiguration;
+import org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.PodList;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
+
+/**
+ * Implementation of {@link FlinkService} submitting and interacting with Native Kubernetes Flink
+ * clusters and jobs.
+ */
+public class NativeFlinkService extends AbstractFlinkService {
+
+    private static final Logger LOG = LoggerFactory.getLogger(NativeFlinkService.class);
+
+    public NativeFlinkService(KubernetesClient kubernetesClient, FlinkConfigManager configManager) {
+        super(kubernetesClient, configManager);
+    }
+
+    @Override
+    public void submitApplicationCluster(
+            JobSpec jobSpec, Configuration conf, boolean requireHaMetadata) throws Exception {
+        LOG.info(
+                "Deploying application cluster{}",
+                requireHaMetadata ? " requiring last-state from HA metadata" : "");
+        if (FlinkUtils.isKubernetesHAActivated(conf)) {
+            final String clusterId = conf.get(KubernetesConfigOptions.CLUSTER_ID);
+            final String namespace = conf.get(KubernetesConfigOptions.NAMESPACE);
+            // Delete the job graph in the HA ConfigMaps so that the newly changed job config(e.g.
+            // parallelism) could take effect
+            FlinkUtils.deleteJobGraphInKubernetesHA(clusterId, namespace, kubernetesClient);
+        }
+        if (requireHaMetadata) {
+            validateHaMetadataExists(conf);
+        }
+        final ClusterClientServiceLoader clusterClientServiceLoader =
+                new DefaultClusterClientServiceLoader();
+        final ApplicationDeployer deployer =
+                new ApplicationClusterDeployer(clusterClientServiceLoader);
+
+        final ApplicationConfiguration applicationConfiguration =
+                new ApplicationConfiguration(
+                        jobSpec.getArgs() != null ? jobSpec.getArgs() : new String[0],
+                        jobSpec.getEntryClass());
+
+        deployer.run(conf, applicationConfiguration);
+        LOG.info("Application cluster successfully deployed");
+    }
+
+    private void validateHaMetadataExists(Configuration conf) {
+        if (!isHaMetadataAvailable(conf)) {
+            throw new DeploymentFailedException(
+                    "HA metadata not available to restore from last state. "
+                            + "It is possible that the job has finished or terminally failed, or the configmaps have been deleted. "
+                            + "Manual restore required.",
+                    "RestoreFailed");
+        }
+    }
+
+    @Override
+    public void submitSessionCluster(Configuration conf) throws Exception {
+        LOG.info("Deploying session cluster");
+        final ClusterClientServiceLoader clusterClientServiceLoader =
+                new DefaultClusterClientServiceLoader();
+        final ClusterClientFactory<String> kubernetesClusterClientFactory =
+                clusterClientServiceLoader.getClusterClientFactory(conf);
+        try (final ClusterDescriptor<String> kubernetesClusterDescriptor =
+                kubernetesClusterClientFactory.createClusterDescriptor(conf)) {
+            kubernetesClusterDescriptor.deploySessionCluster(
+                    kubernetesClusterClientFactory.getClusterSpecification(conf));
+        }
+        LOG.info("Session cluster successfully deployed");
+    }
+
+    @Override
+    public void deleteClusterDeployment(
+            ObjectMeta meta, FlinkDeploymentStatus status, boolean deleteHaData) {
+        deleteCluster(
+                status,
+                meta,
+                deleteHaData,
+                configManager
+                        .getOperatorConfiguration()
+                        .getFlinkShutdownClusterTimeout()
+                        .toSeconds());
+    }
+
+    @Override
+    protected PodList getJmPodList(String namespace, String clusterId) {
+        return kubernetesClient
+                .pods()
+                .inNamespace(namespace)
+                .withLabels(KubernetesUtils.getJobManagerSelectors(clusterId))
+                .list();
+    }
+
+    /**
+     * Delete Flink kubernetes cluster by deleting the kubernetes resources directly. Optionally
+     * allows deleting the native kubernetes HA resources as well.
+     *
+     * @param status Deployment status object
+     * @param meta ObjectMeta of the deployment
+     * @param deleteHaConfigmaps Flag to indicate whether k8s HA metadata should be removed as well
+     * @param shutdownTimeout maximum time allowed for cluster shutdown
+     */
+    private void deleteCluster(
+            FlinkDeploymentStatus status,
+            ObjectMeta meta,
+            boolean deleteHaConfigmaps,
+            long shutdownTimeout) {
+
+        String namespace = meta.getNamespace();
+        String clusterId = meta.getName();
+
+        LOG.info(
+                "Deleting JobManager deployment {}.",
+                deleteHaConfigmaps ? "and HA metadata" : "while preserving HA metadata");
+        kubernetesClient
+                .apps()
+                .deployments()
+                .inNamespace(namespace)
+                .withName(KubernetesUtils.getDeploymentName(clusterId))
+                .cascading(true)
+                .delete();
+
+        if (deleteHaConfigmaps) {
+            // We need to wait for cluster shutdown otherwise HA configmaps might be recreated
+            waitForClusterShutdown(namespace, clusterId, shutdownTimeout);
+            kubernetesClient
+                    .configMaps()
+                    .inNamespace(namespace)
+                    .withLabels(
+                            KubernetesUtils.getConfigMapLabels(
+                                    clusterId, LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY))
+                    .delete();
+        }
+        status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
+        status.getJobStatus().setState(JobStatus.FINISHED.name());
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
new file mode 100644
index 00000000..5a0b85eb
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.service;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.client.deployment.ClusterDeploymentException;
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.KubernetesClusterClientFactory;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.kubeclient.Fabric8FlinkStandaloneKubeClient;
+import org.apache.flink.kubernetes.operator.kubeclient.FlinkStandaloneKubeClient;
+import org.apache.flink.kubernetes.operator.standalone.KubernetesStandaloneClusterDescriptor;
+import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.PodList;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
+
+/**
+ * Implementation of {@link FlinkService} submitting and interacting with Standalone Kubernetes
+ * Flink clusters and jobs.
+ */
+public class StandaloneFlinkService extends AbstractFlinkService {
+
+    private static final Logger LOG = LoggerFactory.getLogger(StandaloneFlinkService.class);
+
+    public StandaloneFlinkService(
+            KubernetesClient kubernetesClient, FlinkConfigManager configManager) {
+        super(kubernetesClient, configManager);
+    }
+
+    @Override
+    public void submitApplicationCluster(
+            JobSpec jobSpec, Configuration conf, boolean requireHaMetadata) throws Exception {
+        LOG.info("Deploying application cluster");
+        // TODO some HA stuff?
+        submitClusterInternal(conf);
+        LOG.info("Application cluster successfully deployed");
+    }
+
+    @Override
+    public void submitSessionCluster(Configuration conf) throws Exception {
+        LOG.info("Deploying session cluster");
+        // TODO some HA stuff?
+        submitClusterInternal(conf);
+        LOG.info("Session cluster successfully deployed");
+    }
+
+    @Override
+    public void deleteClusterDeployment(
+            ObjectMeta meta, FlinkDeploymentStatus status, boolean deleteHaData) {
+        deleteClusterInternal(meta, deleteHaData);
+    }
+
+    @Override
+    protected PodList getJmPodList(String namespace, String clusterId) {
+        return kubernetesClient
+                .pods()
+                .inNamespace(namespace)
+                .withLabels(StandaloneKubernetesUtils.getJobManagerSelectors(clusterId))
+                .list();
+    }
+
+    @VisibleForTesting
+    protected FlinkStandaloneKubeClient createNamespacedKubeClient(
+            Configuration configuration, String namespace) {
+        final int poolSize =
+                configuration.get(KubernetesConfigOptions.KUBERNETES_CLIENT_IO_EXECUTOR_POOL_SIZE);
+
+        ExecutorService executorService =
+                Executors.newFixedThreadPool(
+                        poolSize,
+                        new ExecutorThreadFactory("flink-kubeclient-io-for-standalone-service"));
+
+        return new Fabric8FlinkStandaloneKubeClient(
+                configuration,
+                Fabric8FlinkStandaloneKubeClient.createNamespacedKubeClient(namespace),
+                executorService);
+    }
+
+    private void submitClusterInternal(Configuration conf) throws ClusterDeploymentException {
+        final String namespace = conf.get(KubernetesConfigOptions.NAMESPACE);
+
+        FlinkStandaloneKubeClient client = createNamespacedKubeClient(conf, namespace);
+        try (final KubernetesStandaloneClusterDescriptor kubernetesClusterDescriptor =
+                new KubernetesStandaloneClusterDescriptor(conf, client)) {
+            kubernetesClusterDescriptor.deploySessionCluster(getClusterSpecification(conf));
+        }
+    }
+
+    private ClusterSpecification getClusterSpecification(Configuration conf) {
+        return new KubernetesClusterClientFactory().getClusterSpecification(conf);
+    }
+
+    private void deleteClusterInternal(ObjectMeta meta, boolean deleteHaConfigmaps) {
+        final String clusterId = meta.getName();
+        final String namespace = meta.getNamespace();
+
+        LOG.info("Deleting Flink Standalone cluster TM resources");
+        kubernetesClient
+                .apps()
+                .deployments()
+                .inNamespace(namespace)
+                .withName(StandaloneKubernetesUtils.getTaskManagerDeploymentName(clusterId))
+                .cascading(true)
+                .delete();
+
+        LOG.info("Deleting Flink Standalone cluster JM resources");
+        kubernetesClient
+                .apps()
+                .deployments()
+                .inNamespace(namespace)
+                .withName(StandaloneKubernetesUtils.getJobManagerDeploymentName(clusterId))
+                .cascading(true)
+                .delete();
+
+        if (deleteHaConfigmaps) {
+            // We need to wait for cluster shutdown otherwise HA configmaps might be recreated
+            waitForClusterShutdown(
+                    namespace,
+                    clusterId,
+                    configManager
+                            .getOperatorConfiguration()
+                            .getFlinkShutdownClusterTimeout()
+                            .toSeconds());
+            kubernetesClient
+                    .configMaps()
+                    .inNamespace(namespace)
+                    .withLabels(
+                            KubernetesUtils.getConfigMapLabels(
+                                    clusterId, LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY))
+                    .delete();
+        }
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index c1de80e6..05cb7e03 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -18,18 +18,14 @@
 package org.apache.flink.kubernetes.operator.utils;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
-import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
-import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
-import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.utils.Constants;
 import org.apache.flink.kubernetes.utils.KubernetesUtils;
 import org.apache.flink.util.Preconditions;
@@ -42,8 +38,6 @@ import io.fabric8.kubernetes.api.model.ConfigMap;
 import io.fabric8.kubernetes.api.model.ConfigMapList;
 import io.fabric8.kubernetes.api.model.ObjectMeta;
 import io.fabric8.kubernetes.api.model.Pod;
-import io.fabric8.kubernetes.api.model.PodList;
-import io.fabric8.kubernetes.api.model.Service;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,8 +48,6 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.function.Predicate;
 
-import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
-
 /** Flink Utility methods used by the operator. */
 public class FlinkUtils {
 
@@ -107,133 +99,6 @@ public class FlinkUtils {
         }
     }
 
-    /**
-     * Delete Flink kubernetes cluster by deleting the kubernetes resources directly. Optionally
-     * allows deleting the native kubernetes HA resources as well.
-     *
-     * @param status Deployment status object
-     * @param meta ObjectMeta of the deployment
-     * @param kubernetesClient Kubernetes client
-     * @param deleteHaConfigmaps Flag to indicate whether k8s HA metadata should be removed as well
-     * @param shutdownTimeout maximum time allowed for cluster shutdown
-     */
-    public static void deleteCluster(
-            FlinkDeploymentStatus status,
-            ObjectMeta meta,
-            KubernetesClient kubernetesClient,
-            boolean deleteHaConfigmaps,
-            long shutdownTimeout) {
-
-        String namespace = meta.getNamespace();
-        String clusterId = meta.getName();
-
-        LOG.info(
-                "Deleting JobManager deployment {}.",
-                deleteHaConfigmaps ? "and HA metadata" : "while preserving HA metadata");
-        kubernetesClient
-                .apps()
-                .deployments()
-                .inNamespace(namespace)
-                .withName(KubernetesUtils.getDeploymentName(clusterId))
-                .cascading(true)
-                .delete();
-
-        if (deleteHaConfigmaps) {
-            // We need to wait for cluster shutdown otherwise HA configmaps might be recreated
-            waitForClusterShutdown(kubernetesClient, namespace, clusterId, shutdownTimeout);
-            kubernetesClient
-                    .configMaps()
-                    .inNamespace(namespace)
-                    .withLabels(
-                            KubernetesUtils.getConfigMapLabels(
-                                    clusterId, LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY))
-                    .delete();
-        }
-        status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
-        status.getJobStatus().setState(JobStatus.FINISHED.name());
-    }
-
-    /**
-     * Wait until the FLink cluster has completely shut down.
-     *
-     * @param kubernetesClient Kubernetes client.
-     * @param namespace Resource namespace.
-     * @param clusterId Flink clusterId.
-     * @param shutdownTimeout Max time allowed for shutdown.
-     */
-    public static void waitForClusterShutdown(
-            KubernetesClient kubernetesClient,
-            String namespace,
-            String clusterId,
-            long shutdownTimeout) {
-
-        boolean jobManagerRunning = true;
-        boolean serviceRunning = true;
-
-        for (int i = 0; i < shutdownTimeout; i++) {
-            if (jobManagerRunning) {
-                PodList jmPodList = getJmPodList(kubernetesClient, namespace, clusterId);
-
-                if (jmPodList == null || jmPodList.getItems().isEmpty()) {
-                    jobManagerRunning = false;
-                }
-            }
-
-            if (serviceRunning) {
-                Service service =
-                        kubernetesClient
-                                .services()
-                                .inNamespace(namespace)
-                                .withName(
-                                        ExternalServiceDecorator.getExternalServiceName(clusterId))
-                                .fromServer()
-                                .get();
-                if (service == null) {
-                    serviceRunning = false;
-                }
-            }
-
-            if (!jobManagerRunning && !serviceRunning) {
-                break;
-            }
-            // log a message waiting to shutdown Flink cluster every 5 seconds.
-            if ((i + 1) % 5 == 0) {
-                LOG.info("Waiting for cluster shutdown... ({}s)", i + 1);
-            }
-            try {
-                Thread.sleep(1000);
-            } catch (InterruptedException e) {
-                throw new RuntimeException(e);
-            }
-        }
-        LOG.info("Cluster shutdown completed.");
-    }
-
-    /**
-     * Wait until the FLink cluster has completely shut down.
-     *
-     * @param kubernetesClient Kubernetes client.
-     * @param conf Flink configuration.
-     * @param shutdownTimeout Max time allowed for shutdown.
-     */
-    public static void waitForClusterShutdown(
-            KubernetesClient kubernetesClient, Configuration conf, long shutdownTimeout) {
-        FlinkUtils.waitForClusterShutdown(
-                kubernetesClient,
-                conf.getString(KubernetesConfigOptions.NAMESPACE),
-                conf.getString(KubernetesConfigOptions.CLUSTER_ID),
-                shutdownTimeout);
-    }
-
-    public static PodList getJmPodList(
-            KubernetesClient kubernetesClient, String namespace, String clusterId) {
-        return kubernetesClient
-                .pods()
-                .inNamespace(namespace)
-                .withLabels(KubernetesUtils.getJobManagerSelectors(clusterId))
-                .list();
-    }
-
     public static void deleteJobGraphInKubernetesHA(
             String clusterId, String namespace, KubernetesClient kubernetesClient) {
         // The HA ConfigMap names have been changed from 1.15, so we use the labels to filter out
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index 64a0b6de..2e99c53f 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -36,8 +36,9 @@ import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatu
 import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
 import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo;
 import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
+import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
 import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
-import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.service.AbstractFlinkService;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -75,7 +76,7 @@ import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 /** Flink service mock for tests. */
-public class TestingFlinkService extends FlinkService {
+public class TestingFlinkService extends AbstractFlinkService {
 
     public static final Map<String, String> CLUSTER_INFO =
             Map.of(
@@ -172,6 +173,16 @@ public class TestingFlinkService extends FlinkService {
         jobs.add(Tuple2.of(conf.get(SavepointConfigOptions.SAVEPOINT_PATH), jobStatusMessage));
     }
 
+    protected void validateHaMetadataExists(Configuration conf) {
+        if (!isHaMetadataAvailable(conf)) {
+            throw new DeploymentFailedException(
+                    "HA metadata not available to restore from last state. "
+                            + "It is possible that the job has finished or terminally failed, or the configmaps have been deleted. "
+                            + "Manual restore required.",
+                    "RestoreFailed");
+        }
+    }
+
     @Override
     public boolean isHaMetadataAvailable(Configuration conf) {
         return FlinkUtils.isKubernetesHAActivated(conf) && haDataAvailable;
@@ -384,7 +395,7 @@ public class TestingFlinkService extends FlinkService {
     }
 
     @Override
-    protected void waitForClusterShutdown(Configuration conf) {}
+    public void waitForClusterShutdown(Configuration conf) {}
 
     @Override
     public void disposeSavepoint(String savepointPath, Configuration conf) {
@@ -430,6 +441,11 @@ public class TestingFlinkService extends FlinkService {
         return podList;
     }
 
+    @Override
+    protected PodList getJmPodList(String namespace, String clusterId) {
+        return podList;
+    }
+
     public void setJmPodList(PodList podList) {
         this.podList = podList;
     }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkServiceFactory.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkServiceFactory.java
new file mode 100644
index 00000000..e4e4d5e0
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkServiceFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator;
+
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.KubernetesDeploymentMode;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.service.FlinkServiceFactory;
+
+import java.lang.module.Configuration;
+
+/** Flink service factory mock for tests. */
+public class TestingFlinkServiceFactory extends FlinkServiceFactory {
+    private final FlinkService flinkService;
+
+    public TestingFlinkServiceFactory() {
+        super(null, null);
+        flinkService = new TestingFlinkService();
+    }
+
+    public TestingFlinkServiceFactory(FlinkService flinkService) {
+        super(null, null);
+        this.flinkService = flinkService;
+    }
+
+    public FlinkService getOrCreate(KubernetesDeploymentMode deploymentMode) {
+        return flinkService;
+    }
+
+    public FlinkService getOrCreate(FlinkDeployment deployment) {
+        return flinkService;
+    }
+
+    public FlinkService getOrCreate(Configuration configuration) {
+        return flinkService;
+    }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
index 2def2b4a..c169e1d9 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.controller;
 
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
+import org.apache.flink.kubernetes.operator.TestingFlinkServiceFactory;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
@@ -26,6 +27,7 @@ import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
+import org.apache.flink.kubernetes.operator.service.FlinkServiceFactory;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
@@ -67,6 +69,7 @@ public class TestingFlinkDeploymentController
             FlinkConfigManager configManager,
             KubernetesClient kubernetesClient,
             TestingFlinkService flinkService) {
+        FlinkServiceFactory flinkServiceFactory = new TestingFlinkServiceFactory(flinkService);
         eventRecorder = new EventRecorder(kubernetesClient, eventCollector);
         statusRecorder =
                 new StatusRecorder<>(
@@ -79,12 +82,12 @@ public class TestingFlinkDeploymentController
                         ValidatorUtils.discoverValidators(configManager),
                         new ReconcilerFactory(
                                 kubernetesClient,
-                                flinkService,
+                                flinkServiceFactory,
                                 configManager,
                                 eventRecorder,
                                 statusRecorder),
                         new ObserverFactory(
-                                flinkService, configManager, statusRecorder, eventRecorder),
+                                flinkServiceFactory, configManager, statusRecorder, eventRecorder),
                         statusRecorder,
                         eventRecorder);
     }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
index e7270bd7..f801bfbc 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
@@ -24,12 +24,14 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
+import org.apache.flink.kubernetes.operator.TestingFlinkServiceFactory;
 import org.apache.flink.kubernetes.operator.TestingStatusRecorder;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
 import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
 import org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler;
+import org.apache.flink.kubernetes.operator.service.FlinkServiceFactory;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
@@ -52,6 +54,7 @@ public class SessionJobObserverTest {
     private TestingFlinkService flinkService;
     private SessionJobObserver observer;
     private SessionJobReconciler reconciler;
+    private FlinkServiceFactory flinkServiceFactory;
 
     @BeforeEach
     public void before() {
@@ -59,12 +62,14 @@ public class SessionJobObserverTest {
         var eventRecorder = new EventRecorder(kubernetesClient, (r, e) -> {});
         var statusRecorder = new TestingStatusRecorder<FlinkSessionJobStatus>();
         flinkService = new TestingFlinkService();
+        flinkServiceFactory = flinkServiceFactory = new TestingFlinkServiceFactory(flinkService);
         observer =
-                new SessionJobObserver(flinkService, configManager, statusRecorder, eventRecorder);
+                new SessionJobObserver(
+                        flinkServiceFactory, configManager, statusRecorder, eventRecorder);
         reconciler =
                 new SessionJobReconciler(
                         kubernetesClient,
-                        flinkService,
+                        flinkServiceFactory,
                         configManager,
                         eventRecorder,
                         statusRecorder);
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index b7122159..2bb1f089 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -488,6 +488,8 @@ public class ApplicationReconcilerTest {
     @Test
     public void testRandomJobResultStorePath() throws Exception {
         FlinkDeployment flinkApp = TestUtils.buildApplicationCluster();
+        Context context = flinkService.getContext();
+
         final String haStoragePath = "file:///flink-data/ha";
         flinkApp.getSpec()
                 .getFlinkConfiguration()
@@ -500,14 +502,14 @@ public class ApplicationReconcilerTest {
 
         status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
         status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
-        reconciler.deploy(flinkApp, spec, status, deployConfig, Optional.empty(), false);
+        reconciler.deploy(flinkApp, spec, status, context, deployConfig, Optional.empty(), false);
 
         String path1 = deployConfig.get(JobResultStoreOptions.STORAGE_PATH);
         Assertions.assertTrue(path1.startsWith(haStoragePath));
 
         status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
         status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
-        reconciler.deploy(flinkApp, spec, status, deployConfig, Optional.empty(), false);
+        reconciler.deploy(flinkApp, spec, status, context, deployConfig, Optional.empty(), false);
         String path2 = deployConfig.get(JobResultStoreOptions.STORAGE_PATH);
         Assertions.assertTrue(path2.startsWith(haStoragePath));
         assertNotEquals(path1, path2);
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
index a28e2f19..998fa34d 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
+import org.apache.flink.kubernetes.operator.TestingFlinkServiceFactory;
 import org.apache.flink.kubernetes.operator.TestingStatusRecorder;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
@@ -33,6 +34,7 @@ import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.service.FlinkServiceFactory;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
@@ -69,6 +71,8 @@ public class SessionJobReconcilerTest {
 
     @BeforeEach
     public void before() {
+        flinkService = new TestingFlinkService();
+        flinkServiceFactory = flinkServiceFactory = new TestingFlinkServiceFactory(flinkService);
         eventRecorder =
                 new EventRecorder(null, (r, e) -> {}) {
                     @Override
@@ -83,14 +87,17 @@ public class SessionJobReconcilerTest {
                 };
         reconciler =
                 new SessionJobReconciler(
-                        null, flinkService, configManager, eventRecorder, statusRecoder);
+                        null, flinkServiceFactory, configManager, eventRecorder, statusRecoder);
         kubernetesClient.resource(TestUtils.buildSessionJob()).createOrReplace();
         statusRecoder = new TestingStatusRecorder<>();
         reconciler =
                 new SessionJobReconciler(
-                        null, flinkService, configManager, eventRecorder, statusRecoder);
+                        null, flinkServiceFactory, configManager, eventRecorder, statusRecoder);
     }
 
+    private FlinkServiceFactory flinkServiceFactory;
+    private FlinkSessionJob sessionJob;
+
     @Test
     public void testSubmitAndCleanUp() throws Exception {
         FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
@@ -460,7 +467,7 @@ public class SessionJobReconcilerTest {
         // Force upgrade when savepoint is in progress.
         reconciler =
                 new SessionJobReconciler(
-                        null, flinkService, configManager, eventRecorder, statusRecoder);
+                        null, flinkServiceFactory, configManager, eventRecorder, statusRecoder);
         spSessionJob.getSpec().getJob().setParallelism(100);
         reconciler.reconcile(spSessionJob, readyContext);
         assertEquals(
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
similarity index 98%
rename from flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
rename to flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
index aebd5749..8e151f04 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
@@ -68,7 +68,7 @@ import static org.junit.jupiter.api.Assertions.fail;
 
 /** @link FlinkService unit tests */
 @EnableKubernetesMockClient(crud = true)
-public class FlinkServiceTest {
+public class NativeFlinkServiceTest {
     KubernetesClient client;
     private final Configuration configuration = new Configuration();
     private final FlinkConfigManager configManager = new FlinkConfigManager(configuration);
@@ -308,7 +308,7 @@ public class FlinkServiceTest {
                         Tuple2.of(ExecutionState.RUNNING, 4));
         assertEquals(
                 org.apache.flink.api.common.JobStatus.RUNNING,
-                FlinkService.getEffectiveStatus(allRunning));
+                AbstractFlinkService.getEffectiveStatus(allRunning));
 
         JobDetails allRunningOrFinished =
                 getJobDetails(
@@ -317,7 +317,7 @@ public class FlinkServiceTest {
                         Tuple2.of(ExecutionState.FINISHED, 2));
         assertEquals(
                 org.apache.flink.api.common.JobStatus.RUNNING,
-                FlinkService.getEffectiveStatus(allRunningOrFinished));
+                AbstractFlinkService.getEffectiveStatus(allRunningOrFinished));
 
         JobDetails allRunningOrScheduled =
                 getJobDetails(
@@ -326,7 +326,7 @@ public class FlinkServiceTest {
                         Tuple2.of(ExecutionState.SCHEDULED, 2));
         assertEquals(
                 org.apache.flink.api.common.JobStatus.CREATED,
-                FlinkService.getEffectiveStatus(allRunningOrScheduled));
+                AbstractFlinkService.getEffectiveStatus(allRunningOrScheduled));
 
         JobDetails allFinished =
                 getJobDetails(
@@ -334,7 +334,7 @@ public class FlinkServiceTest {
                         Tuple2.of(ExecutionState.FINISHED, 4));
         assertEquals(
                 org.apache.flink.api.common.JobStatus.FINISHED,
-                FlinkService.getEffectiveStatus(allFinished));
+                AbstractFlinkService.getEffectiveStatus(allFinished));
     }
 
     private JobDetails getJobDetails(
@@ -358,7 +358,7 @@ public class FlinkServiceTest {
     }
 
     private FlinkService createFlinkService(ClusterClient<String> clusterClient) {
-        return new FlinkService(client, new FlinkConfigManager(configuration)) {
+        return new NativeFlinkService(client, new FlinkConfigManager(configuration)) {
             @Override
             protected ClusterClient<String> getClusterClient(Configuration config) {
                 return clusterClient;
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
new file mode 100644
index 00000000..bbffabbd
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.service;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.assertEquals;
+
+/** @link StandaloneFlinkService unit tests */
+@EnableKubernetesMockClient(crud = true)
+public class StandaloneFlinkServiceTest {
+    KubernetesMockServer mockServer;
+
+    private NamespacedKubernetesClient kubernetesClient;
+    StandaloneFlinkService flinkStandaloneService;
+    Configuration configuration = new Configuration();
+
+    @BeforeEach
+    public void setup() {
+        configuration.set(KubernetesConfigOptions.CLUSTER_ID, TestUtils.TEST_DEPLOYMENT_NAME);
+        configuration.set(KubernetesConfigOptions.NAMESPACE, TestUtils.TEST_NAMESPACE);
+
+        kubernetesClient = mockServer.createClient().inAnyNamespace();
+        flinkStandaloneService =
+                new StandaloneFlinkService(kubernetesClient, new FlinkConfigManager(configuration));
+
+        ExecutorService executorService =
+                Executors.newFixedThreadPool(
+                        1, new ExecutorThreadFactory("flink-kubeclient-io-for-standalone-service"));
+    }
+
+    @Test
+    public void testDeleteClusterDeployment() throws Exception {
+        FlinkDeployment flinkDeployment = TestUtils.buildSessionCluster();
+        configuration = buildConfig(flinkDeployment, configuration);
+
+        createDeployments();
+
+        List<Deployment> deployments = kubernetesClient.apps().deployments().list().getItems();
+
+        assertEquals(2, deployments.size());
+
+        flinkStandaloneService.deleteClusterDeployment(
+                flinkDeployment.getMetadata(), flinkDeployment.getStatus(), false);
+
+        deployments = kubernetesClient.apps().deployments().list().getItems();
+
+        assertEquals(0, deployments.size());
+    }
+
+    @Test
+    public void testDeleteClusterDeploymentWithHADelete() throws Exception {
+        FlinkDeployment flinkDeployment = TestUtils.buildSessionCluster();
+        configuration = buildConfig(flinkDeployment, configuration);
+
+        createDeployments();
+
+        List<Deployment> deployments = kubernetesClient.apps().deployments().list().getItems();
+        assertEquals(2, deployments.size());
+
+        flinkStandaloneService.deleteClusterDeployment(
+                flinkDeployment.getMetadata(), flinkDeployment.getStatus(), true);
+
+        deployments = kubernetesClient.apps().deployments().list().getItems();
+
+        assertEquals(0, deployments.size());
+    }
+
+    private Configuration buildConfig(FlinkDeployment flinkDeployment, Configuration configuration)
+            throws Exception {
+        return configuration =
+                FlinkConfigBuilder.buildFrom(
+                        flinkDeployment.getMetadata().getNamespace(),
+                        flinkDeployment.getMetadata().getName(),
+                        flinkDeployment.getSpec(),
+                        configuration);
+    }
+
+    private void createDeployments() {
+        Deployment jmDeployment = new Deployment();
+        ObjectMeta jmMetadata = new ObjectMeta();
+        jmMetadata.setName(
+                StandaloneKubernetesUtils.getJobManagerDeploymentName(
+                        TestUtils.TEST_DEPLOYMENT_NAME));
+        jmDeployment.setMetadata(jmMetadata);
+        kubernetesClient
+                .apps()
+                .deployments()
+                .inNamespace(TestUtils.TEST_NAMESPACE)
+                .create(jmDeployment);
+
+        Deployment tmDeployment = new Deployment();
+        ObjectMeta tmMetadata = new ObjectMeta();
+        tmMetadata.setName(
+                StandaloneKubernetesUtils.getTaskManagerDeploymentName(
+                        TestUtils.TEST_DEPLOYMENT_NAME));
+        tmDeployment.setMetadata(tmMetadata);
+        kubernetesClient
+                .apps()
+                .deployments()
+                .inNamespace(TestUtils.TEST_NAMESPACE)
+                .create(tmDeployment);
+    }
+}
diff --git a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
index 83f3b9e3..86d3a97a 100644
--- a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
+++ b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
@@ -9047,6 +9047,11 @@ spec:
                 additionalProperties:
                   type: string
                 type: object
+              mode:
+                enum:
+                - native
+                - standalone
+                type: string
               job:
                 properties:
                   jarURI: