You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/07/20 15:59:11 UTC

[flink-kubernetes-operator] 05/05: [FLINK-27446] Add standalone mode validation and config building

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 79b169964cd3aa1ea3383b3180a601939fbd3a04
Author: Usamah Jassat <us...@amazon.com>
AuthorDate: Tue Jun 21 17:35:08 2022 +0100

    [FLINK-27446] Add standalone mode validation and config building
---
 .github/workflows/ci.yml                           |  2 +
 docs/content/docs/custom-resource/reference.md     | 11 +++
 e2e-tests/data/flinkdep-cr.yaml                    |  1 +
 e2e-tests/data/multi-sessionjob.yaml               |  2 +
 e2e-tests/data/sessionjob-cr.yaml                  |  1 +
 .../operator/config/FlinkConfigBuilder.java        | 72 +++++++++++++++++-
 .../observer/deployment/ObserverFactory.java       | 15 ++--
 .../deployment/ApplicationReconciler.java          |  3 +-
 .../reconciler/deployment/ReconcilerFactory.java   | 15 ++--
 .../operator/service/AbstractFlinkService.java     | 43 ++++++++++-
 .../operator/service/FlinkServiceFactory.java      |  7 ++
 .../operator/service/NativeFlinkService.java       | 39 +++-------
 .../operator/service/StandaloneFlinkService.java   | 36 +++++++--
 .../operator/utils/EventSourceUtils.java           |  6 +-
 .../kubernetes/operator/utils/FlinkUtils.java      |  4 +
 .../operator/validation/DefaultValidator.java      | 36 ++++++++-
 .../kubernetes/operator/TestingFlinkService.java   | 12 +++
 .../operator/config/FlinkConfigBuilderTest.java    | 85 ++++++++++++++++++++++
 .../TestingFlinkDeploymentController.java          |  1 +
 .../sessionjob/SessionJobObserverTest.java         |  3 +-
 .../sessionjob/SessionJobReconcilerTest.java       |  2 +-
 .../operator/validation/DefaultValidatorTest.java  | 69 ++++++++++++++++++
 22 files changed, 400 insertions(+), 65 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 1dde3421..b34a0d28 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -97,6 +97,7 @@ jobs:
       matrix:
         version: ["v1_15","v1_14","v1_13"]
         namespace: ["default","flink"]
+        mode: ["native", "standalone"]
         test:
           - test_application_kubernetes_ha.sh
           - test_application_operations.sh
@@ -155,6 +156,7 @@ jobs:
         run: |
           sed -i "s/image: flink:.*/image: ${{ matrix.image }}/" e2e-tests/data/*.yaml
           sed -i "s/flinkVersion: .*/flinkVersion: ${{ matrix.version }}/" e2e-tests/data/*.yaml
+          sed -i "s/mode: .*/mode: ${{ matrix.mode }}/" e2e-tests/data/*.yaml
           git diff HEAD
           echo "Running e2e-tests/$test"
           bash e2e-tests/${{ matrix.test }} || exit 1
diff --git a/docs/content/docs/custom-resource/reference.md b/docs/content/docs/custom-resource/reference.md
index 100db9d4..b2399505 100644
--- a/docs/content/docs/custom-resource/reference.md
+++ b/docs/content/docs/custom-resource/reference.md
@@ -57,6 +57,7 @@ This page serves as a full reference for FlinkDeployment custom resource definit
 | jobManager | org.apache.flink.kubernetes.operator.crd.spec.JobManagerSpec | JobManager specs. |
 | taskManager | org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec | TaskManager specs. |
 | logConfiguration | java.util.Map<java.lang.String,java.lang.String> | Log configuration overrides for the Flink deployment. Format logConfigFileName ->  configContent. |
+| mode | org.apache.flink.kubernetes.operator.crd.spec.KubernetesDeploymentMode | Deployment mode of the Flink cluster, native or standalone. |
 
 ### FlinkSessionJobSpec
 **Class**: org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec
@@ -131,6 +132,16 @@ This page serves as a full reference for FlinkDeployment custom resource definit
 | RUNNING | Job is expected to be processing data. |
 | SUSPENDED | Processing is suspended with the intention of continuing later. |
 
+### KubernetesDeploymentMode
+**Class**: org.apache.flink.kubernetes.operator.crd.spec.KubernetesDeploymentMode
+
+**Description**: Enum to control Flink deployment mode on Kubernetes.
+
+| Value | Docs |
+| ----- | ---- |
+| NATIVE | Deploys Flink using Flinks native Kubernetes support. Only supported for newer versions of  Flink |
+| STANDALONE | Deploys Flink on-top of kubernetes in standalone mode. |
+
 ### Resource
 **Class**: org.apache.flink.kubernetes.operator.crd.spec.Resource
 
diff --git a/e2e-tests/data/flinkdep-cr.yaml b/e2e-tests/data/flinkdep-cr.yaml
index d48af2be..18e73b27 100644
--- a/e2e-tests/data/flinkdep-cr.yaml
+++ b/e2e-tests/data/flinkdep-cr.yaml
@@ -83,6 +83,7 @@ spec:
     entryClass: org.apache.flink.streaming.examples.statemachine.StateMachineExample
     parallelism: 2
     upgradeMode: last-state
+  mode: native
 
 ---
 apiVersion: v1
diff --git a/e2e-tests/data/multi-sessionjob.yaml b/e2e-tests/data/multi-sessionjob.yaml
index ed5c892f..77991953 100644
--- a/e2e-tests/data/multi-sessionjob.yaml
+++ b/e2e-tests/data/multi-sessionjob.yaml
@@ -65,6 +65,7 @@ spec:
     resource:
       memory: "1024m"
       cpu: 0.25
+  mode: native
 
 ---
 apiVersion: flink.apache.org/v1beta1
@@ -116,6 +117,7 @@ spec:
     resource:
       memory: "1024m"
       cpu: 0.25
+  mode: native
 
 ---
 apiVersion: flink.apache.org/v1beta1
diff --git a/e2e-tests/data/sessionjob-cr.yaml b/e2e-tests/data/sessionjob-cr.yaml
index 53b49f0a..1953cdec 100644
--- a/e2e-tests/data/sessionjob-cr.yaml
+++ b/e2e-tests/data/sessionjob-cr.yaml
@@ -65,6 +65,7 @@ spec:
     resource:
       memory: "1024m"
       cpu: 0.5
+  mode: native
 
 ---
 apiVersion: flink.apache.org/v1beta1
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
index 8941dd51..706d1108 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.kubernetes.operator.config;
 
+import org.apache.flink.client.deployment.application.ApplicationConfiguration;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
@@ -32,8 +33,12 @@ import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
+import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.KubernetesDeploymentMode;
 import org.apache.flink.kubernetes.operator.crd.spec.Resource;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
@@ -59,6 +64,8 @@ import static org.apache.flink.configuration.DeploymentOptions.SUBMIT_FAILED_JOB
 import static org.apache.flink.configuration.DeploymentOptionsInternal.CONF_DIR;
 import static org.apache.flink.configuration.WebOptions.CANCEL_ENABLE;
 import static org.apache.flink.kubernetes.configuration.KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE;
+import static org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal.ClusterMode.APPLICATION;
+import static org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal.ClusterMode.SESSION;
 import static org.apache.flink.kubernetes.operator.utils.FlinkUtils.mergePodTemplates;
 import static org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOG4J_NAME;
 import static org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOGBACK_NAME;
@@ -216,30 +223,89 @@ public class FlinkConfigBuilder {
                     spec.getTaskManager().getPodTemplate(),
                     effectiveConfig,
                     false);
+
+            if (spec.getTaskManager().getReplicas() != null
+                    && spec.getTaskManager().getReplicas() > 0) {
+                effectiveConfig.set(
+                        StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS,
+                        spec.getTaskManager().getReplicas());
+            }
+        }
+
+        if (spec.getJob() != null
+                && KubernetesDeploymentMode.getDeploymentMode(spec)
+                        == KubernetesDeploymentMode.STANDALONE) {
+            if (!effectiveConfig.contains(
+                    StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS)) {
+                effectiveConfig.set(
+                        StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS,
+                        FlinkUtils.getNumTaskManagers(effectiveConfig, getParallelism()));
+            }
         }
         return this;
     }
 
     protected FlinkConfigBuilder applyJobOrSessionSpec() throws URISyntaxException {
+        KubernetesDeploymentMode deploymentMode = KubernetesDeploymentMode.getDeploymentMode(spec);
+
         if (spec.getJob() != null) {
+            JobSpec jobSpec = spec.getJob();
             effectiveConfig.set(
                     DeploymentOptions.TARGET, KubernetesDeploymentTarget.APPLICATION.getName());
-            final URI uri = new URI(spec.getJob().getJarURI());
+            final URI uri = new URI(jobSpec.getJarURI());
             effectiveConfig.set(PipelineOptions.JARS, Collections.singletonList(uri.toString()));
             effectiveConfig.set(CoreOptions.DEFAULT_PARALLELISM, getParallelism());
 
-            if (spec.getJob().getAllowNonRestoredState() != null) {
+            if (jobSpec.getAllowNonRestoredState() != null) {
                 effectiveConfig.set(
                         SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
-                        spec.getJob().getAllowNonRestoredState());
+                        jobSpec.getAllowNonRestoredState());
+            }
+
+            if (jobSpec.getEntryClass() != null) {
+                effectiveConfig.set(
+                        ApplicationConfiguration.APPLICATION_MAIN_CLASS, jobSpec.getEntryClass());
             }
         } else {
             effectiveConfig.set(
                     DeploymentOptions.TARGET, KubernetesDeploymentTarget.SESSION.getName());
         }
+
+        if (deploymentMode == KubernetesDeploymentMode.STANDALONE) {
+            effectiveConfig.set(DeploymentOptions.TARGET, "remote");
+            effectiveConfig.set(
+                    StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE,
+                    spec.getJob() == null ? SESSION : APPLICATION);
+
+            if (spec.getJob() != null) {
+                effectiveConfig.set(
+                        PipelineOptions.CLASSPATHS,
+                        Collections.singletonList(getStandaloneJarURI(spec.getJob())));
+            }
+        }
         return this;
     }
 
+    private String getStandaloneJarURI(JobSpec jobSpec) throws URISyntaxException {
+        URI uri = new URI(jobSpec.getJarURI());
+
+        // Running an application job through standalone mode doesn't requires file uri scheme and
+        // doesn't accept
+        // local scheme which is used for native so convert here to improve compatibilty at the
+        // operator layer
+        if (uri.getScheme().equals("local")) {
+            uri =
+                    new URI(
+                            "file",
+                            uri.getAuthority() == null ? "" : uri.getAuthority(),
+                            uri.getPath(),
+                            uri.getQuery(),
+                            uri.getFragment());
+        }
+
+        return uri.toASCIIString();
+    }
+
     private int getParallelism() {
         if (spec.getTaskManager() != null && spec.getTaskManager().getReplicas() != null) {
             if (spec.getJob().getParallelism() > 0) {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ObserverFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ObserverFactory.java
index 6c2c203d..b41d34b8 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ObserverFactory.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ObserverFactory.java
@@ -17,9 +17,11 @@
 
 package org.apache.flink.kubernetes.operator.observer.deployment;
 
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.config.Mode;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.KubernetesDeploymentMode;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.observer.Observer;
 import org.apache.flink.kubernetes.operator.service.FlinkServiceFactory;
@@ -36,7 +38,8 @@ public class ObserverFactory {
     private final FlinkConfigManager configManager;
     private final StatusRecorder<FlinkDeploymentStatus> statusRecorder;
     private final EventRecorder eventRecorder;
-    private final Map<Mode, Observer<FlinkDeployment>> observerMap;
+    private final Map<Tuple2<Mode, KubernetesDeploymentMode>, Observer<FlinkDeployment>>
+            observerMap;
 
     public ObserverFactory(
             FlinkServiceFactory flinkServiceFactory,
@@ -52,9 +55,11 @@ public class ObserverFactory {
 
     public Observer<FlinkDeployment> getOrCreate(FlinkDeployment flinkApp) {
         return observerMap.computeIfAbsent(
-                Mode.getMode(flinkApp),
-                mode -> {
-                    switch (mode) {
+                Tuple2.of(
+                        Mode.getMode(flinkApp),
+                        KubernetesDeploymentMode.getDeploymentMode(flinkApp)),
+                modes -> {
+                    switch (modes.f0) {
                         case SESSION:
                             return new SessionObserver(
                                     flinkServiceFactory.getOrCreate(flinkApp),
@@ -68,7 +73,7 @@ public class ObserverFactory {
                                     eventRecorder);
                         default:
                             throw new UnsupportedOperationException(
-                                    String.format("Unsupported running mode: %s", mode));
+                                    String.format("Unsupported running mode: %s", modes.f0));
                     }
                 });
     }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index c03ddc98..ec0389e2 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -33,7 +33,6 @@ import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.IngressUtils;
-import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 import org.apache.flink.runtime.highavailability.JobResultStoreOptions;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
@@ -194,7 +193,7 @@ public class ApplicationReconciler
     @Override
     public boolean reconcileOtherChanges(
             FlinkDeployment deployment, Context ctx, Configuration observeConfig) throws Exception {
-        if (SavepointUtils.triggerSavepointIfNeeded(flinkService, deployment, observeConfig)) {
+        if (super.reconcileOtherChanges(deployment, ctx, observeConfig)) {
             return true;
         }
 
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
index 419e3e41..2d11f495 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
@@ -17,9 +17,11 @@
 
 package org.apache.flink.kubernetes.operator.reconciler.deployment;
 
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.config.Mode;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.KubernetesDeploymentMode;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
 import org.apache.flink.kubernetes.operator.service.FlinkServiceFactory;
@@ -39,7 +41,8 @@ public class ReconcilerFactory {
     private final FlinkConfigManager configManager;
     private final EventRecorder eventRecorder;
     private final StatusRecorder<FlinkDeploymentStatus> deploymentStatusRecorder;
-    private final Map<Mode, Reconciler<FlinkDeployment>> reconcilerMap;
+    private final Map<Tuple2<Mode, KubernetesDeploymentMode>, Reconciler<FlinkDeployment>>
+            reconcilerMap;
 
     public ReconcilerFactory(
             KubernetesClient kubernetesClient,
@@ -57,9 +60,11 @@ public class ReconcilerFactory {
 
     public Reconciler<FlinkDeployment> getOrCreate(FlinkDeployment flinkApp) {
         return reconcilerMap.computeIfAbsent(
-                Mode.getMode(flinkApp),
-                mode -> {
-                    switch (mode) {
+                Tuple2.of(
+                        Mode.getMode(flinkApp),
+                        KubernetesDeploymentMode.getDeploymentMode(flinkApp)),
+                modes -> {
+                    switch (modes.f0) {
                         case SESSION:
                             return new SessionReconciler(
                                     kubernetesClient,
@@ -76,7 +81,7 @@ public class ReconcilerFactory {
                                     deploymentStatusRecorder);
                         default:
                             throw new UnsupportedOperationException(
-                                    String.format("Unsupported running mode: %s", mode));
+                                    String.format("Unsupported running mode: %s", modes.f0));
                     }
                 });
     }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index afb7bf27..c4b32529 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -142,11 +142,33 @@ public abstract class AbstractFlinkService implements FlinkService {
 
     protected abstract PodList getJmPodList(String namespace, String clusterId);
 
+    protected abstract void deployApplicationCluster(JobSpec jobSpec, Configuration conf)
+            throws Exception;
+
     @Override
     public KubernetesClient getKubernetesClient() {
         return kubernetesClient;
     }
 
+    @Override
+    public void submitApplicationCluster(
+            JobSpec jobSpec, Configuration conf, boolean requireHaMetadata) throws Exception {
+        LOG.info(
+                "Deploying application cluster{}",
+                requireHaMetadata ? " requiring last-state from HA metadata" : "");
+        if (FlinkUtils.isKubernetesHAActivated(conf)) {
+            final String clusterId = conf.get(KubernetesConfigOptions.CLUSTER_ID);
+            final String namespace = conf.get(KubernetesConfigOptions.NAMESPACE);
+            // Delete the job graph in the HA ConfigMaps so that the newly changed job config(e.g.
+            // parallelism) could take effect
+            FlinkUtils.deleteJobGraphInKubernetesHA(clusterId, namespace, kubernetesClient);
+        }
+        if (requireHaMetadata) {
+            validateHaMetadataExists(conf);
+        }
+        deployApplicationCluster(jobSpec, conf);
+    }
+
     @Override
     public boolean isHaMetadataAvailable(Configuration conf) {
         return FlinkUtils.isHaMetadataAvailable(conf, kubernetesClient);
@@ -219,8 +241,11 @@ public abstract class AbstractFlinkService implements FlinkService {
         }
     }
 
-    @Override
-    public void cancelJob(FlinkDeployment deployment, UpgradeMode upgradeMode, Configuration conf)
+    protected void cancelJob(
+            FlinkDeployment deployment,
+            UpgradeMode upgradeMode,
+            Configuration conf,
+            boolean deleteClusterAfterSavepoint)
             throws Exception {
         var deploymentStatus = deployment.getStatus();
         var jobIdString = deploymentStatus.getJobStatus().getJobId();
@@ -290,6 +315,9 @@ public abstract class AbstractFlinkService implements FlinkService {
                                         ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT.key()),
                                 exception);
                     }
+                    if (deleteClusterAfterSavepoint) {
+                        deleteClusterDeployment(deployment.getMetadata(), deploymentStatus, true);
+                    }
                     break;
                 case LAST_STATE:
                     deleteClusterDeployment(deployment.getMetadata(), deploymentStatus, false);
@@ -697,7 +725,6 @@ public abstract class AbstractFlinkService implements FlinkService {
     /** Wait until the FLink cluster has completely shut down. */
     @VisibleForTesting
     void waitForClusterShutdown(String namespace, String clusterId, long shutdownTimeout) {
-
         boolean jobManagerRunning = true;
         boolean serviceRunning = true;
 
@@ -765,4 +792,14 @@ public abstract class AbstractFlinkService implements FlinkService {
         }
         return effectiveStatus;
     }
+
+    private void validateHaMetadataExists(Configuration conf) {
+        if (!isHaMetadataAvailable(conf)) {
+            throw new DeploymentFailedException(
+                    "HA metadata not available to restore from last state. "
+                            + "It is possible that the job has finished or terminally failed, or the configmaps have been deleted. "
+                            + "Manual restore required.",
+                    "RestoreFailed");
+        }
+    }
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkServiceFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkServiceFactory.java
index 761f1e68..9fce6eec 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkServiceFactory.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkServiceFactory.java
@@ -22,6 +22,8 @@ import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.KubernetesDeploymentMode;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -33,6 +35,8 @@ public class FlinkServiceFactory {
     private final FlinkConfigManager configManager;
     private final Map<KubernetesDeploymentMode, FlinkService> serviceMap;
 
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkServiceFactory.class);
+
     public FlinkServiceFactory(
             KubernetesClient kubernetesClient, FlinkConfigManager configManager) {
         this.kubernetesClient = kubernetesClient;
@@ -46,8 +50,10 @@ public class FlinkServiceFactory {
                 mode -> {
                     switch (mode) {
                         case NATIVE:
+                            LOG.info("Using NativeFlinkService");
                             return new NativeFlinkService(kubernetesClient, configManager);
                         case STANDALONE:
+                            LOG.info("Using StandaloneFlinkService");
                             return new StandaloneFlinkService(kubernetesClient, configManager);
                         default:
                             throw new UnsupportedOperationException(
@@ -57,6 +63,7 @@ public class FlinkServiceFactory {
     }
 
     public FlinkService getOrCreate(FlinkDeployment deployment) {
+        LOG.info("Getting service for {}", deployment.getMetadata().getName());
         return getOrCreate(getDeploymentMode(deployment));
     }
 
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
index 60b5ee3b..47d3fc01 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
@@ -26,13 +26,12 @@ import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
 import org.apache.flink.client.deployment.application.ApplicationConfiguration;
 import org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
-import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
-import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.utils.KubernetesUtils;
 
 import io.fabric8.kubernetes.api.model.ObjectMeta;
@@ -56,21 +55,8 @@ public class NativeFlinkService extends AbstractFlinkService {
     }
 
     @Override
-    public void submitApplicationCluster(
-            JobSpec jobSpec, Configuration conf, boolean requireHaMetadata) throws Exception {
-        LOG.info(
-                "Deploying application cluster{}",
-                requireHaMetadata ? " requiring last-state from HA metadata" : "");
-        if (FlinkUtils.isKubernetesHAActivated(conf)) {
-            final String clusterId = conf.get(KubernetesConfigOptions.CLUSTER_ID);
-            final String namespace = conf.get(KubernetesConfigOptions.NAMESPACE);
-            // Delete the job graph in the HA ConfigMaps so that the newly changed job config(e.g.
-            // parallelism) could take effect
-            FlinkUtils.deleteJobGraphInKubernetesHA(clusterId, namespace, kubernetesClient);
-        }
-        if (requireHaMetadata) {
-            validateHaMetadataExists(conf);
-        }
+    protected void deployApplicationCluster(JobSpec jobSpec, Configuration conf) throws Exception {
+        LOG.info("Deploying application cluster");
         final ClusterClientServiceLoader clusterClientServiceLoader =
                 new DefaultClusterClientServiceLoader();
         final ApplicationDeployer deployer =
@@ -85,16 +71,6 @@ public class NativeFlinkService extends AbstractFlinkService {
         LOG.info("Application cluster successfully deployed");
     }
 
-    private void validateHaMetadataExists(Configuration conf) {
-        if (!isHaMetadataAvailable(conf)) {
-            throw new DeploymentFailedException(
-                    "HA metadata not available to restore from last state. "
-                            + "It is possible that the job has finished or terminally failed, or the configmaps have been deleted. "
-                            + "Manual restore required.",
-                    "RestoreFailed");
-        }
-    }
-
     @Override
     public void submitSessionCluster(Configuration conf) throws Exception {
         LOG.info("Deploying session cluster");
@@ -110,6 +86,13 @@ public class NativeFlinkService extends AbstractFlinkService {
         LOG.info("Session cluster successfully deployed");
     }
 
+    @Override
+    public void cancelJob(
+            FlinkDeployment deployment, UpgradeMode upgradeMode, Configuration configuration)
+            throws Exception {
+        cancelJob(deployment, upgradeMode, configuration, false);
+    }
+
     @Override
     public void deleteClusterDeployment(
             ObjectMeta meta, FlinkDeploymentStatus status, boolean deleteHaData) {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
index 5a0b85eb..08c2ef4e 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
@@ -20,11 +20,15 @@ package org.apache.flink.kubernetes.operator.service;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.client.deployment.ClusterDeploymentException;
 import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.deployment.application.ApplicationConfiguration;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.KubernetesClusterClientFactory;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.config.Mode;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.kubeclient.Fabric8FlinkStandaloneKubeClient;
 import org.apache.flink.kubernetes.operator.kubeclient.FlinkStandaloneKubeClient;
@@ -58,22 +62,25 @@ public class StandaloneFlinkService extends AbstractFlinkService {
     }
 
     @Override
-    public void submitApplicationCluster(
-            JobSpec jobSpec, Configuration conf, boolean requireHaMetadata) throws Exception {
+    protected void deployApplicationCluster(JobSpec jobSpec, Configuration conf) throws Exception {
         LOG.info("Deploying application cluster");
-        // TODO some HA stuff?
-        submitClusterInternal(conf);
+        submitClusterInternal(conf, Mode.APPLICATION);
         LOG.info("Application cluster successfully deployed");
     }
 
     @Override
     public void submitSessionCluster(Configuration conf) throws Exception {
         LOG.info("Deploying session cluster");
-        // TODO some HA stuff?
-        submitClusterInternal(conf);
+        submitClusterInternal(conf, Mode.SESSION);
         LOG.info("Session cluster successfully deployed");
     }
 
+    @Override
+    public void cancelJob(FlinkDeployment deployment, UpgradeMode upgradeMode, Configuration conf)
+            throws Exception {
+        cancelJob(deployment, upgradeMode, conf, true);
+    }
+
     @Override
     public void deleteClusterDeployment(
             ObjectMeta meta, FlinkDeploymentStatus status, boolean deleteHaData) {
@@ -106,13 +113,26 @@ public class StandaloneFlinkService extends AbstractFlinkService {
                 executorService);
     }
 
-    private void submitClusterInternal(Configuration conf) throws ClusterDeploymentException {
+    private void submitClusterInternal(Configuration conf, Mode mode)
+            throws ClusterDeploymentException {
         final String namespace = conf.get(KubernetesConfigOptions.NAMESPACE);
 
         FlinkStandaloneKubeClient client = createNamespacedKubeClient(conf, namespace);
         try (final KubernetesStandaloneClusterDescriptor kubernetesClusterDescriptor =
                 new KubernetesStandaloneClusterDescriptor(conf, client)) {
-            kubernetesClusterDescriptor.deploySessionCluster(getClusterSpecification(conf));
+            switch (mode) {
+                case APPLICATION:
+                    kubernetesClusterDescriptor.deployApplicationCluster(
+                            getClusterSpecification(conf),
+                            ApplicationConfiguration.fromConfiguration(conf));
+                    break;
+                case SESSION:
+                    kubernetesClusterDescriptor.deploySessionCluster(getClusterSpecification(conf));
+                    break;
+                default:
+                    throw new UnsupportedOperationException(
+                            String.format("Unsupported running mode: %s", mode));
+            }
         }
     }
 
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java
index 62bea6dd..db238f28 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java
@@ -45,11 +45,7 @@ public class EventSourceUtils {
     public static InformerEventSource<Deployment, FlinkDeployment> getDeploymentInformerEventSource(
             EventSourceContext<FlinkDeployment> context) {
         final String labelSelector =
-                Map.of(
-                                Constants.LABEL_TYPE_KEY,
-                                Constants.LABEL_TYPE_NATIVE_TYPE,
-                                Constants.LABEL_COMPONENT_KEY,
-                                Constants.LABEL_COMPONENT_JOB_MANAGER)
+                Map.of(Constants.LABEL_COMPONENT_KEY, Constants.LABEL_COMPONENT_JOB_MANAGER)
                         .entrySet().stream()
                         .map(Object::toString)
                         .collect(Collectors.joining(","));
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index 05cb7e03..d2f4c359 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -167,6 +167,10 @@ public class FlinkUtils {
 
     public static int getNumTaskManagers(Configuration conf) {
         int parallelism = conf.get(CoreOptions.DEFAULT_PARALLELISM);
+        return getNumTaskManagers(conf, parallelism);
+    }
+
+    public static int getNumTaskManagers(Configuration conf, int parallelism) {
         int taskSlots = conf.get(TaskManagerOptions.NUM_TASK_SLOTS);
         return (parallelism + taskSlots - 1) / taskSlots;
     }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
index de3c7b68..9e92dc93 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
@@ -32,6 +32,7 @@ import org.apache.flink.kubernetes.operator.crd.spec.IngressSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobManagerSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.spec.KubernetesDeploymentMode;
 import org.apache.flink.kubernetes.operator.crd.spec.Resource;
 import org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
@@ -88,7 +89,11 @@ public class DefaultValidator implements FlinkResourceValidator {
                         deployment.getMetadata().getName(),
                         deployment.getMetadata().getNamespace()),
                 validateLogConfig(spec.getLogConfiguration()),
-                validateJobSpec(spec.getJob(), spec.getTaskManager(), effectiveConfig),
+                validateJobSpec(
+                        spec.getJob(),
+                        spec.getTaskManager(),
+                        effectiveConfig,
+                        KubernetesDeploymentMode.getDeploymentMode(deployment)),
                 validateJmSpec(spec.getJobManager(), effectiveConfig),
                 validateTmSpec(spec.getTaskManager()),
                 validateSpecChange(deployment, effectiveConfig),
@@ -173,7 +178,10 @@ public class DefaultValidator implements FlinkResourceValidator {
     }
 
     private Optional<String> validateJobSpec(
-            JobSpec job, @Nullable TaskManagerSpec tm, Map<String, String> confMap) {
+            JobSpec job,
+            @Nullable TaskManagerSpec tm,
+            Map<String, String> confMap,
+            KubernetesDeploymentMode mode) {
         if (job == null) {
             return Optional.empty();
         }
@@ -306,6 +314,24 @@ public class DefaultValidator implements FlinkResourceValidator {
             return Optional.of("Cannot switch from job to session cluster");
         }
 
+        KubernetesDeploymentMode oldDeploymentMode =
+                oldSpec.getMode() == null ? KubernetesDeploymentMode.NATIVE : oldSpec.getMode();
+
+        KubernetesDeploymentMode newDeploymentMode =
+                newSpec.getMode() == null ? KubernetesDeploymentMode.NATIVE : newSpec.getMode();
+
+        if (oldDeploymentMode == KubernetesDeploymentMode.NATIVE
+                && newDeploymentMode != KubernetesDeploymentMode.NATIVE) {
+            return Optional.of(
+                    "Cannot switch from native kubernetes to standalone kubernetes cluster");
+        }
+
+        if (oldDeploymentMode == KubernetesDeploymentMode.STANDALONE
+                && newDeploymentMode != KubernetesDeploymentMode.STANDALONE) {
+            return Optional.of(
+                    "Cannot switch from standalone kubernetes to native kubernetes cluster");
+        }
+
         JobSpec oldJob = oldSpec.getJob();
         JobSpec newJob = newSpec.getJob();
         if (oldJob != null && newJob != null) {
@@ -358,7 +384,11 @@ public class DefaultValidator implements FlinkResourceValidator {
         return firstPresent(
                 validateNotApplicationCluster(sessionCluster),
                 validateSessionClusterId(sessionJob, sessionCluster),
-                validateJobSpec(sessionJob.getSpec().getJob(), null, effectiveConfig));
+                validateJobSpec(
+                        sessionJob.getSpec().getJob(),
+                        null,
+                        effectiveConfig,
+                        KubernetesDeploymentMode.getDeploymentMode(sessionCluster)));
     }
 
     private Optional<String> validateJobNotEmpty(FlinkSessionJob sessionJob) {
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index 2e99c53f..4ff0a33f 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -31,6 +31,7 @@ import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
@@ -156,6 +157,10 @@ public class TestingFlinkService extends AbstractFlinkService {
         if (requireHaMetadata) {
             validateHaMetadataExists(conf);
         }
+        deployApplicationCluster(jobSpec, conf);
+    }
+
+    protected void deployApplicationCluster(JobSpec jobSpec, Configuration conf) throws Exception {
         if (deployFailure) {
             throw new Exception("Deployment failure");
         }
@@ -358,6 +363,13 @@ public class TestingFlinkService extends AbstractFlinkService {
                 0);
     }
 
+    @Override
+    public void cancelJob(
+            FlinkDeployment deployment, UpgradeMode upgradeMode, Configuration configuration)
+            throws Exception {
+        cancelJob(deployment, upgradeMode, configuration, false);
+    }
+
     private String cancelJob(FlinkVersion flinkVersion, JobID jobID, boolean savepoint)
             throws Exception {
         Optional<Tuple2<String, JobStatusMessage>> jobOpt =
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
index df5bf415..f33a9986 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.kubernetes.operator.config;
 
+import org.apache.flink.client.deployment.application.ApplicationConfiguration;
+import org.apache.flink.configuration.ConfigUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.DeploymentOptions;
@@ -33,9 +35,11 @@ import org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.IngressSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.KubernetesDeploymentMode;
 import org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal;
 import org.apache.flink.kubernetes.utils.Constants;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
 import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
@@ -54,6 +58,7 @@ import java.net.URISyntaxException;
 import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 
 import static org.apache.flink.kubernetes.operator.TestUtils.IMAGE;
@@ -62,6 +67,8 @@ import static org.apache.flink.kubernetes.operator.TestUtils.SAMPLE_JAR;
 import static org.apache.flink.kubernetes.operator.TestUtils.SERVICE_ACCOUNT;
 import static org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.DEFAULT_CHECKPOINTING_INTERVAL;
 import static org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOG4J_NAME;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
 
 /** FlinkConfigBuilderTest. */
 public class FlinkConfigBuilderTest {
@@ -347,6 +354,84 @@ public class FlinkConfigBuilderTest {
                 configuration.getBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE));
     }
 
+    @Test
+    public void testApplyStandaloneApplicationSpec() throws URISyntaxException, IOException {
+        FlinkDeployment dep = ReconciliationUtils.clone(flinkDeployment);
+        final String entryClass = "entry.class";
+        final String jarUri = "local:///flink/opt/StateMachine.jar";
+        final String correctedJarUri = "file:///flink/opt/StateMachine.jar";
+        dep.getSpec().setMode(KubernetesDeploymentMode.STANDALONE);
+        dep.getSpec().getJob().setEntryClass(entryClass);
+        dep.getSpec().getJob().setJarURI(jarUri);
+        dep.getSpec().setTaskManager(new TaskManagerSpec());
+        dep.getSpec().getTaskManager().setReplicas(3);
+        dep.getSpec().getFlinkConfiguration().put(TaskManagerOptions.NUM_TASK_SLOTS.key(), "2");
+
+        Configuration configuration =
+                new FlinkConfigBuilder(dep, new Configuration())
+                        .applyFlinkConfiguration()
+                        .applyTaskManagerSpec()
+                        .applyJobOrSessionSpec()
+                        .build();
+
+        Assertions.assertEquals("remote", configuration.getString(DeploymentOptions.TARGET));
+        Assertions.assertEquals(
+                StandaloneKubernetesConfigOptionsInternal.ClusterMode.APPLICATION,
+                configuration.get(StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE));
+        Assertions.assertEquals(6, configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM));
+        Assertions.assertEquals(
+                entryClass,
+                configuration.getString(ApplicationConfiguration.APPLICATION_MAIN_CLASS));
+        Assertions.assertEquals(
+                3,
+                configuration.get(
+                        StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS));
+        List<String> classpaths =
+                ConfigUtils.decodeListFromConfig(
+                        configuration, PipelineOptions.CLASSPATHS, String::toString);
+        assertThat(classpaths, containsInAnyOrder(correctedJarUri));
+
+        dep.getSpec().getTaskManager().setReplicas(null);
+        dep.getSpec().getJob().setParallelism(10);
+
+        configuration =
+                new FlinkConfigBuilder(dep, new Configuration())
+                        .applyFlinkConfiguration()
+                        .applyTaskManagerSpec()
+                        .applyJobOrSessionSpec()
+                        .build();
+        Assertions.assertEquals(
+                5,
+                configuration.get(
+                        StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS));
+    }
+
+    @Test
+    public void testApplyStandaloneSessionSpec() throws URISyntaxException, IOException {
+        FlinkDeployment dep = ReconciliationUtils.clone(flinkDeployment);
+        dep.getSpec().setMode(KubernetesDeploymentMode.STANDALONE);
+        dep.getSpec().setJob(null);
+        dep.getSpec().setTaskManager(new TaskManagerSpec());
+        dep.getSpec().getTaskManager().setReplicas(5);
+        dep.getSpec().getFlinkConfiguration().put(TaskManagerOptions.NUM_TASK_SLOTS.key(), "2");
+
+        Configuration configuration =
+                new FlinkConfigBuilder(dep, new Configuration())
+                        .applyFlinkConfiguration()
+                        .applyTaskManagerSpec()
+                        .applyJobOrSessionSpec()
+                        .build();
+
+        Assertions.assertEquals("remote", configuration.getString(DeploymentOptions.TARGET));
+        Assertions.assertEquals(
+                StandaloneKubernetesConfigOptionsInternal.ClusterMode.SESSION,
+                configuration.get(StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE));
+        Assertions.assertEquals(
+                5,
+                configuration.get(
+                        StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS));
+    }
+
     @Test
     public void testBuildFrom() throws Exception {
         final Configuration configuration =
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
index c169e1d9..088d46bf 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
@@ -70,6 +70,7 @@ public class TestingFlinkDeploymentController
             KubernetesClient kubernetesClient,
             TestingFlinkService flinkService) {
         FlinkServiceFactory flinkServiceFactory = new TestingFlinkServiceFactory(flinkService);
+
         eventRecorder = new EventRecorder(kubernetesClient, eventCollector);
         statusRecorder =
                 new StatusRecorder<>(
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
index f801bfbc..5dddb7b9 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
@@ -54,7 +54,6 @@ public class SessionJobObserverTest {
     private TestingFlinkService flinkService;
     private SessionJobObserver observer;
     private SessionJobReconciler reconciler;
-    private FlinkServiceFactory flinkServiceFactory;
 
     @BeforeEach
     public void before() {
@@ -62,7 +61,7 @@ public class SessionJobObserverTest {
         var eventRecorder = new EventRecorder(kubernetesClient, (r, e) -> {});
         var statusRecorder = new TestingStatusRecorder<FlinkSessionJobStatus>();
         flinkService = new TestingFlinkService();
-        flinkServiceFactory = flinkServiceFactory = new TestingFlinkServiceFactory(flinkService);
+        FlinkServiceFactory flinkServiceFactory = new TestingFlinkServiceFactory(flinkService);
         observer =
                 new SessionJobObserver(
                         flinkServiceFactory, configManager, statusRecorder, eventRecorder);
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
index 998fa34d..d6f342c9 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
@@ -72,7 +72,7 @@ public class SessionJobReconcilerTest {
     @BeforeEach
     public void before() {
         flinkService = new TestingFlinkService();
-        flinkServiceFactory = flinkServiceFactory = new TestingFlinkServiceFactory(flinkService);
+        flinkServiceFactory = new TestingFlinkServiceFactory(flinkService);
         eventRecorder =
                 new EventRecorder(null, (r, e) -> {}) {
                     @Override
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
index 5189ab47..23e75bc1 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.crd.spec.IngressSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.spec.KubernetesDeploymentMode;
 import org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentReconciliationStatus;
@@ -301,6 +302,74 @@ public class DefaultValidatorTest {
                 },
                 "Cannot switch from session to job cluster");
 
+        testError(
+                dep -> {
+                    dep.setStatus(new FlinkDeploymentStatus());
+                    dep.getStatus().setJobStatus(new JobStatus());
+
+                    dep.getStatus()
+                            .setReconciliationStatus(new FlinkDeploymentReconciliationStatus());
+                    dep.getSpec().setMode(KubernetesDeploymentMode.STANDALONE);
+                    FlinkDeploymentSpec spec = ReconciliationUtils.clone(dep.getSpec());
+
+                    spec.setMode(KubernetesDeploymentMode.NATIVE);
+                    dep.getStatus()
+                            .getReconciliationStatus()
+                            .serializeAndSetLastReconciledSpec(spec, dep);
+                },
+                "Cannot switch from native kubernetes to standalone kubernetes cluster");
+
+        testError(
+                dep -> {
+                    dep.setStatus(new FlinkDeploymentStatus());
+                    dep.getStatus().setJobStatus(new JobStatus());
+
+                    dep.getStatus()
+                            .setReconciliationStatus(new FlinkDeploymentReconciliationStatus());
+                    dep.getSpec().setMode(KubernetesDeploymentMode.STANDALONE);
+                    FlinkDeploymentSpec spec = ReconciliationUtils.clone(dep.getSpec());
+
+                    spec.setMode(null);
+                    dep.getStatus()
+                            .getReconciliationStatus()
+                            .serializeAndSetLastReconciledSpec(spec, dep);
+                },
+                "Cannot switch from native kubernetes to standalone kubernetes cluster");
+
+        testError(
+                dep -> {
+                    dep.setStatus(new FlinkDeploymentStatus());
+                    dep.getStatus().setJobStatus(new JobStatus());
+
+                    dep.getStatus()
+                            .setReconciliationStatus(new FlinkDeploymentReconciliationStatus());
+                    dep.getSpec().setMode(null);
+                    FlinkDeploymentSpec spec = ReconciliationUtils.clone(dep.getSpec());
+
+                    spec.setMode(KubernetesDeploymentMode.STANDALONE);
+                    dep.getStatus()
+                            .getReconciliationStatus()
+                            .serializeAndSetLastReconciledSpec(spec, dep);
+                },
+                "Cannot switch from standalone kubernetes to native kubernetes cluster");
+
+        testError(
+                dep -> {
+                    dep.setStatus(new FlinkDeploymentStatus());
+                    dep.getStatus().setJobStatus(new JobStatus());
+
+                    dep.getStatus()
+                            .setReconciliationStatus(new FlinkDeploymentReconciliationStatus());
+                    dep.getSpec().setMode(KubernetesDeploymentMode.NATIVE);
+                    FlinkDeploymentSpec spec = ReconciliationUtils.clone(dep.getSpec());
+
+                    spec.setMode(KubernetesDeploymentMode.STANDALONE);
+                    dep.getStatus()
+                            .getReconciliationStatus()
+                            .serializeAndSetLastReconciledSpec(spec, dep);
+                },
+                "Cannot switch from standalone kubernetes to native kubernetes cluster");
+
         // Test upgrade mode change validation
         testError(
                 dep -> {