You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/07/20 15:59:09 UTC

[flink-kubernetes-operator] 03/05: [FLINK-27444] Add KubernetesStandaloneClusterDescriptor and FlinkStandaloneKubeClient

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 261fed2076efe385ede148152c946eb7c5f1f48d
Author: Usamah Jassat <us...@amazon.com>
AuthorDate: Mon Jun 13 15:00:12 2022 +0100

    [FLINK-27444] Add KubernetesStandaloneClusterDescriptor and FlinkStandaloneKubeClient
---
 flink-kubernetes-standalone/pom.xml                |   1 -
 .../Fabric8FlinkStandaloneKubeClient.java          |  71 ++++++
 .../kubeclient/FlinkStandaloneKubeClient.java      |  27 ++
 .../CmdStandaloneJobManagerDecorator.java          |   6 +
 .../decorators/UserLibMountDecorator.java          |  13 +
 .../StandaloneKubernetesJobManagerFactory.java     | 124 ++++++++++
 .../StandaloneKubernetesTaskManagerFactory.java    |  90 +++++++
 .../StandaloneKubernetesJobManagerParameters.java  |   8 +
 .../KubernetesStandaloneClusterDescriptor.java     | 257 +++++++++++++++++++
 .../src/main/resources/META-INF/NOTICE             |   5 +
 .../Fabric8FlinkStandaloneKubeClientTest.java      | 104 ++++++++
 .../CmdStandaloneJobManagerDecoratorTest.java      |  26 +-
 .../decorators/UserLibMountDecoratorTest.java      |  50 +++-
 .../StandaloneKubernetesJobManagerFactoryTest.java | 273 +++++++++++++++++++++
 ...StandaloneKubernetesTaskManagerFactoryTest.java | 150 +++++++++++
 .../KubernetesStandaloneClusterDescriptorTest.java | 182 ++++++++++++++
 16 files changed, 1383 insertions(+), 4 deletions(-)

diff --git a/flink-kubernetes-standalone/pom.xml b/flink-kubernetes-standalone/pom.xml
index f2a1c2f6..6125efe3 100644
--- a/flink-kubernetes-standalone/pom.xml
+++ b/flink-kubernetes-standalone/pom.xml
@@ -29,7 +29,6 @@ under the License.
     </parent>
 
 
-
     <artifactId>flink-kubernetes-standalone</artifactId>
     <name>Flink Kubernetes Standalone</name>
     <packaging>jar</packaging>
diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClient.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClient.java
new file mode 100644
index 00000000..45454dd7
--- /dev/null
+++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClient.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.kubeclient;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient;
+import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
+
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.client.DefaultKubernetesClient;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+
+import java.util.concurrent.ExecutorService;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The Implementation of {@link FlinkStandaloneKubeClient}. */
+public class Fabric8FlinkStandaloneKubeClient extends Fabric8FlinkKubeClient
+        implements FlinkStandaloneKubeClient {
+
+    private final NamespacedKubernetesClient internalClient;
+
+    public Fabric8FlinkStandaloneKubeClient(
+            Configuration flinkConfig,
+            NamespacedKubernetesClient client,
+            ExecutorService executorService) {
+        super(flinkConfig, client, executorService);
+        internalClient = checkNotNull(client);
+    }
+
+    @Override
+    public void createTaskManagerDeployment(Deployment tmDeployment) {
+        this.internalClient.apps().deployments().create(tmDeployment);
+    }
+
+    @Override
+    public void stopAndCleanupCluster(String clusterId) {
+        this.internalClient
+                .apps()
+                .deployments()
+                .withName(StandaloneKubernetesUtils.getJobManagerDeploymentName(clusterId))
+                .cascading(true)
+                .delete();
+
+        this.internalClient
+                .apps()
+                .deployments()
+                .withName(StandaloneKubernetesUtils.getTaskManagerDeploymentName(clusterId))
+                .cascading(true)
+                .delete();
+    }
+
+    public static NamespacedKubernetesClient createNamespacedKubeClient(String namespace) {
+        return new DefaultKubernetesClient().inNamespace(namespace);
+    }
+}
diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/FlinkStandaloneKubeClient.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/FlinkStandaloneKubeClient.java
new file mode 100644
index 00000000..9a7f8207
--- /dev/null
+++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/FlinkStandaloneKubeClient.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.kubeclient;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+
+/** Extension of the FlinkKubeClient that is used for Flink standalone deployments. */
+public interface FlinkStandaloneKubeClient extends FlinkKubeClient {
+    void createTaskManagerDeployment(Deployment tmDeployment);
+}
diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java
index 430abe2e..9bfcc866 100644
--- a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java
+++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java
@@ -79,6 +79,12 @@ public class CmdStandaloneJobManagerDecorator extends AbstractKubernetesStepDeco
             args.add(mainClass);
         }
 
+        Boolean allowNonRestoredState = kubernetesJobManagerParameters.getAllowNonRestoredState();
+        if (allowNonRestoredState != null) {
+            args.add("--allowNonRestoredState");
+            args.add(allowNonRestoredState.toString());
+        }
+
         return args;
     }
 }
diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/UserLibMountDecorator.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/UserLibMountDecorator.java
index a66026b2..207637ac 100644
--- a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/UserLibMountDecorator.java
+++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/UserLibMountDecorator.java
@@ -27,6 +27,9 @@ import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.api.model.PodBuilder;
 import io.fabric8.kubernetes.api.model.Volume;
 import io.fabric8.kubernetes.api.model.VolumeBuilder;
+import io.fabric8.kubernetes.api.model.VolumeMount;
+
+import java.util.List;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -51,6 +54,10 @@ public class UserLibMountDecorator extends AbstractKubernetesStepDecorator {
             return flinkPod;
         }
 
+        if (mainContainerHasUserLibPath(flinkPod)) {
+            return flinkPod;
+        }
+
         final Volume userLibVolume =
                 new VolumeBuilder()
                         .withName(USER_LIB_VOLUME)
@@ -78,4 +85,10 @@ public class UserLibMountDecorator extends AbstractKubernetesStepDecorator {
                 .withMainContainer(mountedMainContainer)
                 .build();
     }
+
+    private boolean mainContainerHasUserLibPath(FlinkPod flinkPod) {
+        List<VolumeMount> volumeMounts = flinkPod.getMainContainer().getVolumeMounts();
+        return volumeMounts.stream()
+                .anyMatch(volumeMount -> volumeMount.getMountPath().startsWith(USER_LIB_PATH));
+    }
 }
diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesJobManagerFactory.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesJobManagerFactory.java
new file mode 100644
index 00000000..f789db93
--- /dev/null
+++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesJobManagerFactory.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.kubeclient.factory;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkPod;
+import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerSpecification;
+import org.apache.flink.kubernetes.kubeclient.decorators.EnvSecretsDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.FlinkConfMountDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.InitJobManagerDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.InternalServiceDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.KerberosMountDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.KubernetesStepDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.MountSecretsDecorator;
+import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesOwnerReference;
+import org.apache.flink.kubernetes.operator.kubeclient.decorators.CmdStandaloneJobManagerDecorator;
+import org.apache.flink.kubernetes.operator.kubeclient.decorators.UserLibMountDecorator;
+import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesJobManagerParameters;
+import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.util.Preconditions;
+
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodBuilder;
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Utility class for constructing all the Kubernetes for the JobManager deploying in standalone
+ * mode. This can include the Deployment, the ConfigMap(s), and the Service(s).
+ */
+public class StandaloneKubernetesJobManagerFactory {
+    public static KubernetesJobManagerSpecification buildKubernetesJobManagerSpecification(
+            FlinkPod podTemplate,
+            StandaloneKubernetesJobManagerParameters kubernetesJobManagerParameters)
+            throws IOException {
+        FlinkPod flinkPod = Preconditions.checkNotNull(podTemplate).copy();
+        List<HasMetadata> accompanyingResources = new ArrayList<>();
+
+        final KubernetesStepDecorator[] stepDecorators =
+                new KubernetesStepDecorator[] {
+                    new InitJobManagerDecorator(kubernetesJobManagerParameters),
+                    new EnvSecretsDecorator(kubernetesJobManagerParameters),
+                    new MountSecretsDecorator(kubernetesJobManagerParameters),
+                    new CmdStandaloneJobManagerDecorator(kubernetesJobManagerParameters),
+                    new InternalServiceDecorator(kubernetesJobManagerParameters),
+                    new ExternalServiceDecorator(kubernetesJobManagerParameters),
+                    new HadoopConfMountDecorator(kubernetesJobManagerParameters),
+                    new KerberosMountDecorator(kubernetesJobManagerParameters),
+                    new FlinkConfMountDecorator(kubernetesJobManagerParameters),
+                    new UserLibMountDecorator(kubernetesJobManagerParameters),
+                };
+
+        for (KubernetesStepDecorator stepDecorator : stepDecorators) {
+            flinkPod = stepDecorator.decorateFlinkPod(flinkPod);
+            accompanyingResources.addAll(stepDecorator.buildAccompanyingKubernetesResources());
+        }
+
+        final Deployment deployment =
+                createJobManagerDeployment(flinkPod, kubernetesJobManagerParameters);
+
+        return new KubernetesJobManagerSpecification(deployment, accompanyingResources);
+    }
+
+    private static Deployment createJobManagerDeployment(
+            FlinkPod flinkPod, KubernetesJobManagerParameters kubernetesJobManagerParameters) {
+        final Container resolvedMainContainer = flinkPod.getMainContainer();
+
+        final Pod resolvedPod =
+                new PodBuilder(flinkPod.getPodWithoutMainContainer())
+                        .editOrNewSpec()
+                        .addToContainers(resolvedMainContainer)
+                        .endSpec()
+                        .build();
+        return new DeploymentBuilder()
+                .withApiVersion(Constants.APPS_API_VERSION)
+                .editOrNewMetadata()
+                .withName(
+                        StandaloneKubernetesUtils.getJobManagerDeploymentName(
+                                kubernetesJobManagerParameters.getClusterId()))
+                .withAnnotations(kubernetesJobManagerParameters.getAnnotations())
+                .withLabels(kubernetesJobManagerParameters.getLabels())
+                .withOwnerReferences(
+                        kubernetesJobManagerParameters.getOwnerReference().stream()
+                                .map(e -> KubernetesOwnerReference.fromMap(e).getInternalResource())
+                                .collect(Collectors.toList()))
+                .endMetadata()
+                .editOrNewSpec()
+                .withReplicas(kubernetesJobManagerParameters.getReplicas())
+                .editOrNewTemplate()
+                .withMetadata(resolvedPod.getMetadata())
+                .withSpec(resolvedPod.getSpec())
+                .endTemplate()
+                .editOrNewSelector()
+                .addToMatchLabels(kubernetesJobManagerParameters.getSelectors())
+                .endSelector()
+                .endSpec()
+                .build();
+    }
+}
diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesTaskManagerFactory.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesTaskManagerFactory.java
new file mode 100644
index 00000000..8bcd9990
--- /dev/null
+++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesTaskManagerFactory.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.kubeclient.factory;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkPod;
+import org.apache.flink.kubernetes.kubeclient.decorators.EnvSecretsDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.FlinkConfMountDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.KerberosMountDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.KubernetesStepDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.MountSecretsDecorator;
+import org.apache.flink.kubernetes.operator.kubeclient.decorators.CmdStandaloneTaskManagerDecorator;
+import org.apache.flink.kubernetes.operator.kubeclient.decorators.InitStandaloneTaskManagerDecorator;
+import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesTaskManagerParameters;
+import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.util.Preconditions;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodBuilder;
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder;
+
+/** Utility class for constructing the TaskManager Deployment when deploying in standalone mode. */
+public class StandaloneKubernetesTaskManagerFactory {
+
+    public static Deployment buildKubernetesTaskManagerDeployment(
+            FlinkPod podTemplate,
+            StandaloneKubernetesTaskManagerParameters kubernetesTaskManagerParameters) {
+        FlinkPod flinkPod = Preconditions.checkNotNull(podTemplate).copy();
+
+        final KubernetesStepDecorator[] stepDecorators =
+                new KubernetesStepDecorator[] {
+                    new InitStandaloneTaskManagerDecorator(kubernetesTaskManagerParameters),
+                    new EnvSecretsDecorator(kubernetesTaskManagerParameters),
+                    new MountSecretsDecorator(kubernetesTaskManagerParameters),
+                    new CmdStandaloneTaskManagerDecorator(kubernetesTaskManagerParameters),
+                    new HadoopConfMountDecorator(kubernetesTaskManagerParameters),
+                    new KerberosMountDecorator(kubernetesTaskManagerParameters),
+                    new FlinkConfMountDecorator(kubernetesTaskManagerParameters)
+                };
+
+        for (KubernetesStepDecorator stepDecorator : stepDecorators) {
+            flinkPod = stepDecorator.decorateFlinkPod(flinkPod);
+        }
+
+        final Pod resolvedPod =
+                new PodBuilder(flinkPod.getPodWithoutMainContainer())
+                        .editOrNewSpec()
+                        .addToContainers(flinkPod.getMainContainer())
+                        .endSpec()
+                        .build();
+
+        return new DeploymentBuilder()
+                .withApiVersion(Constants.APPS_API_VERSION)
+                .editOrNewMetadata()
+                .withName(
+                        StandaloneKubernetesUtils.getTaskManagerDeploymentName(
+                                kubernetesTaskManagerParameters.getClusterId()))
+                .withAnnotations(kubernetesTaskManagerParameters.getAnnotations())
+                .withLabels(kubernetesTaskManagerParameters.getLabels())
+                .endMetadata()
+                .editOrNewSpec()
+                .withReplicas(kubernetesTaskManagerParameters.getReplicas())
+                .editOrNewTemplate()
+                .withMetadata(resolvedPod.getMetadata())
+                .withSpec(resolvedPod.getSpec())
+                .endTemplate()
+                .editOrNewSelector()
+                .addToMatchLabels(kubernetesTaskManagerParameters.getSelectors())
+                .endSelector()
+                .endSpec()
+                .build();
+    }
+}
diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java
index bdc28d48..fbfcbdeb 100644
--- a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java
+++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java
@@ -24,6 +24,7 @@ import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
 import org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal;
 import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -80,4 +81,11 @@ public class StandaloneKubernetesJobManagerParameters extends KubernetesJobManag
         }
         return flinkConfig.getString(ApplicationConfiguration.APPLICATION_MAIN_CLASS);
     }
+
+    public Boolean getAllowNonRestoredState() {
+        if (flinkConfig.contains(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE)) {
+            return flinkConfig.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE);
+        }
+        return null;
+    }
 }
diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptor.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptor.java
new file mode 100644
index 00000000..cfed561a
--- /dev/null
+++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptor.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.standalone;
+
+import org.apache.flink.client.deployment.ClusterDeploymentException;
+import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.deployment.ClusterRetrieveException;
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.deployment.application.ApplicationConfiguration;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ClusterClientProvider;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.KubernetesClusterDescriptor;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.Endpoint;
+import org.apache.flink.kubernetes.kubeclient.FlinkPod;
+import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerSpecification;
+import org.apache.flink.kubernetes.operator.kubeclient.FlinkStandaloneKubeClient;
+import org.apache.flink.kubernetes.operator.kubeclient.factory.StandaloneKubernetesJobManagerFactory;
+import org.apache.flink.kubernetes.operator.kubeclient.factory.StandaloneKubernetesTaskManagerFactory;
+import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesJobManagerParameters;
+import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesTaskManagerParameters;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.rpc.AddressResolution;
+
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Standalone Kubernetes specific {@link ClusterDescriptor} implementation. */
+public class KubernetesStandaloneClusterDescriptor extends KubernetesClusterDescriptor {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(KubernetesStandaloneClusterDescriptor.class);
+
+    private static final String CLUSTER_DESCRIPTION = "Standalone Kubernetes cluster";
+
+    private final Configuration flinkConfig;
+
+    private final FlinkStandaloneKubeClient client;
+
+    private final String clusterId;
+
+    public KubernetesStandaloneClusterDescriptor(
+            Configuration flinkConfig, FlinkStandaloneKubeClient client) {
+        super(flinkConfig, client);
+        this.flinkConfig = checkNotNull(flinkConfig);
+        this.client = checkNotNull(client);
+        this.clusterId =
+                checkNotNull(
+                        flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID),
+                        "ClusterId must be specified!");
+    }
+
+    @Override
+    public String getClusterDescription() {
+        return CLUSTER_DESCRIPTION;
+    }
+
+    @Override
+    public ClusterClientProvider<String> deploySessionCluster(
+            ClusterSpecification clusterSpecification) throws ClusterDeploymentException {
+        flinkConfig.set(
+                StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE,
+                StandaloneKubernetesConfigOptionsInternal.ClusterMode.SESSION);
+
+        final ClusterClientProvider clusterClientProvider =
+                deployClusterInternal(clusterSpecification);
+
+        try (ClusterClient<String> clusterClient = clusterClientProvider.getClusterClient()) {
+            LOG.info(
+                    "Created flink session cluster {} successfully, JobManager Web Interface: {}",
+                    clusterId,
+                    clusterClient.getWebInterfaceURL());
+        }
+        return clusterClientProvider;
+    }
+
+    @Override
+    public ClusterClientProvider<String> deployApplicationCluster(
+            ClusterSpecification clusterSpecification,
+            ApplicationConfiguration applicationConfiguration)
+            throws ClusterDeploymentException {
+        flinkConfig.set(
+                StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE,
+                StandaloneKubernetesConfigOptionsInternal.ClusterMode.APPLICATION);
+        applicationConfiguration.applyToConfiguration(flinkConfig);
+        final ClusterClientProvider clusterClientProvider =
+                deployClusterInternal(clusterSpecification);
+
+        try (ClusterClient<String> clusterClient = clusterClientProvider.getClusterClient()) {
+            LOG.info(
+                    "Created flink application cluster {} successfully, JobManager Web Interface: {}",
+                    clusterId,
+                    clusterClient.getWebInterfaceURL());
+        }
+        return clusterClientProvider;
+    }
+
+    private ClusterClientProvider<String> deployClusterInternal(
+            ClusterSpecification clusterSpecification) throws ClusterDeploymentException {
+
+        // Rpc, blob, rest, taskManagerRpc ports need to be exposed, so update them to fixed values.
+        KubernetesUtils.checkAndUpdatePortConfigOption(
+                flinkConfig, BlobServerOptions.PORT, Constants.BLOB_SERVER_PORT);
+        KubernetesUtils.checkAndUpdatePortConfigOption(
+                flinkConfig, TaskManagerOptions.RPC_PORT, Constants.TASK_MANAGER_RPC_PORT);
+        KubernetesUtils.checkAndUpdatePortConfigOption(
+                flinkConfig, RestOptions.BIND_PORT, Constants.REST_PORT);
+
+        if (HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfig)) {
+            flinkConfig.setString(HighAvailabilityOptions.HA_CLUSTER_ID, clusterId);
+            KubernetesUtils.checkAndUpdatePortConfigOption(
+                    flinkConfig,
+                    HighAvailabilityOptions.HA_JOB_MANAGER_PORT_RANGE,
+                    flinkConfig.get(JobManagerOptions.PORT));
+        }
+
+        // Deploy JM + resources
+        try {
+            KubernetesJobManagerSpecification jmSpec = getJobManagerSpec(clusterSpecification);
+            Deployment tmDeployment = getTaskManagerDeployment(clusterSpecification);
+
+            client.createJobManagerComponent(jmSpec);
+            client.createTaskManagerDeployment(tmDeployment);
+
+            return createClusterClientProvider(clusterId);
+        } catch (Exception e) {
+            try {
+                LOG.warn(
+                        "Failed to create the Kubernetes cluster \"{}\", try to clean up the residual resources.",
+                        clusterId);
+                client.stopAndCleanupCluster(clusterId);
+            } catch (Exception e1) {
+                LOG.info(
+                        "Failed to stop and clean up the Kubernetes cluster \"{}\".",
+                        clusterId,
+                        e1);
+            }
+            throw new ClusterDeploymentException(
+                    "Could not create Kubernetes cluster \"" + clusterId + "\".", e);
+        }
+    }
+
+    private KubernetesJobManagerSpecification getJobManagerSpec(
+            ClusterSpecification clusterSpecification) throws IOException {
+        final StandaloneKubernetesJobManagerParameters kubernetesJobManagerParameters =
+                new StandaloneKubernetesJobManagerParameters(flinkConfig, clusterSpecification);
+
+        final FlinkPod podTemplate =
+                kubernetesJobManagerParameters
+                        .getPodTemplateFilePath()
+                        .map(
+                                file ->
+                                        KubernetesUtils.loadPodFromTemplateFile(
+                                                client, file, Constants.MAIN_CONTAINER_NAME))
+                        .orElse(new FlinkPod.Builder().build());
+
+        return StandaloneKubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(
+                podTemplate, kubernetesJobManagerParameters);
+    }
+
+    private Deployment getTaskManagerDeployment(ClusterSpecification clusterSpecification) {
+        final StandaloneKubernetesTaskManagerParameters kubernetesTaskManagerParameters =
+                new StandaloneKubernetesTaskManagerParameters(flinkConfig, clusterSpecification);
+
+        final FlinkPod podTemplate =
+                kubernetesTaskManagerParameters
+                        .getPodTemplateFilePath()
+                        .map(
+                                file ->
+                                        KubernetesUtils.loadPodFromTemplateFile(
+                                                client, file, Constants.MAIN_CONTAINER_NAME))
+                        .orElse(new FlinkPod.Builder().build());
+
+        return StandaloneKubernetesTaskManagerFactory.buildKubernetesTaskManagerDeployment(
+                podTemplate, kubernetesTaskManagerParameters);
+    }
+
+    private ClusterClientProvider<String> createClusterClientProvider(String clusterId) {
+        return () -> {
+            final Configuration configuration = new Configuration(flinkConfig);
+
+            final Optional<Endpoint> restEndpoint = client.getRestEndpoint(clusterId);
+
+            if (restEndpoint.isPresent()) {
+                configuration.setString(RestOptions.ADDRESS, restEndpoint.get().getAddress());
+                configuration.setInteger(RestOptions.PORT, restEndpoint.get().getPort());
+            } else {
+                throw new RuntimeException(
+                        new ClusterRetrieveException(
+                                "Could not get the rest endpoint of " + clusterId));
+            }
+
+            try {
+                // Flink client will always use Kubernetes service to contact with jobmanager. So we
+                // have a pre-configured web monitor address. Using StandaloneClientHAServices to
+                // create RestClusterClient is reasonable.
+                return new RestClusterClient<>(
+                        configuration,
+                        clusterId,
+                        (effectiveConfiguration, fatalErrorHandler) ->
+                                new StandaloneClientHAServices(
+                                        getWebMonitorAddress(effectiveConfiguration)));
+            } catch (Exception e) {
+                throw new RuntimeException(
+                        new ClusterRetrieveException("Could not create the RestClusterClient.", e));
+            }
+        };
+    }
+
+    private String getWebMonitorAddress(Configuration configuration) throws Exception {
+        AddressResolution resolution = AddressResolution.TRY_ADDRESS_RESOLUTION;
+        final KubernetesConfigOptions.ServiceExposedType serviceType =
+                configuration.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE);
+        if (serviceType.isClusterIP()) {
+            resolution = AddressResolution.NO_ADDRESS_RESOLUTION;
+            LOG.warn(
+                    "Please note that Flink client operations(e.g. cancel, list, stop,"
+                            + " savepoint, etc.) won't work from outside the Kubernetes cluster"
+                            + " since '{}' has been set to {}.",
+                    KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE.key(),
+                    serviceType);
+        }
+        return HighAvailabilityServicesUtils.getWebMonitorAddress(configuration, resolution);
+    }
+}
diff --git a/flink-kubernetes-standalone/src/main/resources/META-INF/NOTICE b/flink-kubernetes-standalone/src/main/resources/META-INF/NOTICE
new file mode 100644
index 00000000..993ca343
--- /dev/null
+++ b/flink-kubernetes-standalone/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,5 @@
+flink-kubernetes-operator-standalone
+Copyright 2014-2022 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
\ No newline at end of file
diff --git a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClientTest.java b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClientTest.java
new file mode 100644
index 00000000..9c52eb32
--- /dev/null
+++ b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClientTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.kubeclient;
+
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.kubeclient.FlinkPod;
+import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerSpecification;
+import org.apache.flink.kubernetes.operator.kubeclient.factory.StandaloneKubernetesJobManagerFactory;
+import org.apache.flink.kubernetes.operator.kubeclient.factory.StandaloneKubernetesTaskManagerFactory;
+import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesJobManagerParameters;
+import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesTaskManagerParameters;
+import org.apache.flink.kubernetes.operator.kubeclient.utils.TestUtils;
+import org.apache.flink.util.concurrent.Executors;
+
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** @link Fabric8FlinkStandaloneKubeClient unit tests */
+@EnableKubernetesMockClient(crud = true)
+public class Fabric8FlinkStandaloneKubeClientTest {
+    private static final String NAMESPACE = "test";
+
+    KubernetesMockServer mockServer;
+    protected NamespacedKubernetesClient kubernetesClient;
+    private FlinkStandaloneKubeClient flinkKubeClient;
+    private StandaloneKubernetesTaskManagerParameters taskManagerParameters;
+    private Deployment tmDeployment;
+    private ClusterSpecification clusterSpecification;
+    private Configuration flinkConfig = new Configuration();
+
+    @BeforeEach
+    public final void setup() {
+        flinkConfig = TestUtils.createTestFlinkConfig();
+        kubernetesClient = mockServer.createClient();
+
+        flinkKubeClient =
+                new Fabric8FlinkStandaloneKubeClient(
+                        flinkConfig, kubernetesClient, Executors.newDirectExecutorService());
+        clusterSpecification = TestUtils.createClusterSpecification();
+
+        taskManagerParameters =
+                new StandaloneKubernetesTaskManagerParameters(flinkConfig, clusterSpecification);
+
+        tmDeployment =
+                StandaloneKubernetesTaskManagerFactory.buildKubernetesTaskManagerDeployment(
+                        new FlinkPod.Builder().build(), taskManagerParameters);
+    }
+
+    @Test
+    public void testCreateTaskManagerDeployment() {
+        flinkKubeClient.createTaskManagerDeployment(tmDeployment);
+
+        final List<Deployment> resultedDeployments =
+                kubernetesClient.apps().deployments().inNamespace(NAMESPACE).list().getItems();
+        assertEquals(1, resultedDeployments.size());
+    }
+
+    @Test
+    public void testStopAndCleanupCluster() throws Exception {
+        ClusterSpecification clusterSpecification = TestUtils.createClusterSpecification();
+        StandaloneKubernetesJobManagerParameters jmParameters =
+                new StandaloneKubernetesJobManagerParameters(flinkConfig, clusterSpecification);
+        KubernetesJobManagerSpecification jmSpec =
+                StandaloneKubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(
+                        new FlinkPod.Builder().build(), jmParameters);
+
+        flinkKubeClient.createJobManagerComponent(jmSpec);
+        flinkKubeClient.createTaskManagerDeployment(tmDeployment);
+
+        List<Deployment> resultedDeployments =
+                kubernetesClient.apps().deployments().inNamespace(NAMESPACE).list().getItems();
+        assertEquals(2, resultedDeployments.size());
+
+        flinkKubeClient.stopAndCleanupCluster(taskManagerParameters.getClusterId());
+
+        resultedDeployments =
+                kubernetesClient.apps().deployments().inNamespace(NAMESPACE).list().getItems();
+        assertEquals(0, resultedDeployments.size());
+    }
+}
diff --git a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecoratorTest.java b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecoratorTest.java
index 49355574..8bdb0f33 100644
--- a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecoratorTest.java
+++ b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecoratorTest.java
@@ -18,11 +18,13 @@
 package org.apache.flink.kubernetes.operator.kubeclient.decorators;
 
 import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.deployment.application.ApplicationConfiguration;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.kubeclient.FlinkPod;
 import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesJobManagerParameters;
 import org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -67,7 +69,29 @@ public class CmdStandaloneJobManagerDecoratorTest {
     }
 
     @Test
-    public void testApplicationCommandAdded() {
+    public void testApplicationCommandAndArgsAdded() {
+        final String testMainClass = "org.main.class";
+        configuration.set(
+                StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE,
+                StandaloneKubernetesConfigOptionsInternal.ClusterMode.APPLICATION);
+        configuration.set(ApplicationConfiguration.APPLICATION_MAIN_CLASS, testMainClass);
+        configuration.set(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, false);
+
+        FlinkPod decoratedPod = decorator.decorateFlinkPod(new FlinkPod.Builder().build());
+        assertThat(
+                decoratedPod.getMainContainer().getCommand(), containsInAnyOrder(MOCK_ENTRYPATH));
+        assertThat(
+                decoratedPod.getMainContainer().getArgs(),
+                containsInAnyOrder(
+                        CmdStandaloneJobManagerDecorator.APPLICATION_MODE_ARG,
+                        "--allowNonRestoredState",
+                        "false",
+                        "--job-classname",
+                        testMainClass));
+    }
+
+    @Test
+    public void testApplicationOptionalArgsNotAdded() {
         configuration.set(
                 StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE,
                 StandaloneKubernetesConfigOptionsInternal.ClusterMode.APPLICATION);
diff --git a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/UserLibMountDecoratorTest.java b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/UserLibMountDecoratorTest.java
index 07c8af20..1a867521 100644
--- a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/UserLibMountDecoratorTest.java
+++ b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/UserLibMountDecoratorTest.java
@@ -23,6 +23,10 @@ import org.apache.flink.kubernetes.kubeclient.FlinkPod;
 import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesJobManagerParameters;
 import org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal;
 
+import io.fabric8.kubernetes.api.model.ContainerBuilder;
+import io.fabric8.kubernetes.api.model.PodBuilder;
+import io.fabric8.kubernetes.api.model.PodSpecBuilder;
+import io.fabric8.kubernetes.api.model.VolumeBuilder;
 import io.fabric8.kubernetes.api.model.VolumeMount;
 import org.junit.jupiter.api.Test;
 
@@ -32,7 +36,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 public class UserLibMountDecoratorTest {
 
     @Test
-    public void testVolumeAdded() {
+    public void testVolumeAddedApplicationMode() {
         StandaloneKubernetesJobManagerParameters jmParameters =
                 createJmParamsWithClusterMode(
                         StandaloneKubernetesConfigOptionsInternal.ClusterMode.APPLICATION);
@@ -54,7 +58,7 @@ public class UserLibMountDecoratorTest {
     }
 
     @Test
-    public void testVolumeNotAdded() {
+    public void testVolumeNotAddedSessionMode() {
         StandaloneKubernetesJobManagerParameters jmParameters =
                 createJmParamsWithClusterMode(
                         StandaloneKubernetesConfigOptionsInternal.ClusterMode.SESSION);
@@ -69,6 +73,48 @@ public class UserLibMountDecoratorTest {
         assertEquals(0, decoratedPod.getPodWithoutMainContainer().getSpec().getVolumes().size());
     }
 
+    @Test
+    public void testVolumeNotAddedExistingVolumeMount() {
+        StandaloneKubernetesJobManagerParameters jmParameters =
+                createJmParamsWithClusterMode(
+                        StandaloneKubernetesConfigOptionsInternal.ClusterMode.APPLICATION);
+        UserLibMountDecorator decorator = new UserLibMountDecorator(jmParameters);
+
+        final String volName = "flink-artifact";
+        final String userLibPath = "/opt/flink/usrlib";
+
+        FlinkPod baseFlinkPod =
+                new FlinkPod.Builder()
+                        .withMainContainer(
+                                new ContainerBuilder()
+                                        .addNewVolumeMount()
+                                        .withName(volName)
+                                        .withMountPath(userLibPath)
+                                        .endVolumeMount()
+                                        .build())
+                        .withPod(
+                                new PodBuilder()
+                                        .withSpec(
+                                                new PodSpecBuilder()
+                                                        .addNewVolumeLike(
+                                                                new VolumeBuilder()
+                                                                        .withName(volName)
+                                                                        .withNewEmptyDir()
+                                                                        .endEmptyDir()
+                                                                        .build())
+                                                        .endVolume()
+                                                        .build())
+                                        .build())
+                        .build();
+
+        assertEquals(1, baseFlinkPod.getMainContainer().getVolumeMounts().size());
+        assertEquals(1, baseFlinkPod.getPodWithoutMainContainer().getSpec().getVolumes().size());
+
+        FlinkPod decoratedPod = decorator.decorateFlinkPod(baseFlinkPod);
+        assertEquals(1, decoratedPod.getMainContainer().getVolumeMounts().size());
+        assertEquals(1, decoratedPod.getPodWithoutMainContainer().getSpec().getVolumes().size());
+    }
+
     private StandaloneKubernetesJobManagerParameters createJmParamsWithClusterMode(
             StandaloneKubernetesConfigOptionsInternal.ClusterMode clusterMode) {
         return new StandaloneKubernetesJobManagerParameters(
diff --git a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesJobManagerFactoryTest.java b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesJobManagerFactoryTest.java
new file mode 100644
index 00000000..832ce97a
--- /dev/null
+++ b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesJobManagerFactoryTest.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.kubeclient.factory;
+
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkPod;
+import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerSpecification;
+import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.FlinkConfMountDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.InternalServiceDecorator;
+import org.apache.flink.kubernetes.kubeclient.services.HeadlessClusterIPService;
+import org.apache.flink.kubernetes.operator.kubeclient.parameters.ParametersTestBase;
+import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesJobManagerParameters;
+import org.apache.flink.kubernetes.operator.kubeclient.utils.TestUtils;
+import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
+import org.apache.flink.kubernetes.utils.Constants;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ContainerPort;
+import io.fabric8.kubernetes.api.model.ContainerPortBuilder;
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.PodSpec;
+import io.fabric8.kubernetes.api.model.Quantity;
+import io.fabric8.kubernetes.api.model.ResourceRequirements;
+import io.fabric8.kubernetes.api.model.Service;
+import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.kubernetes.configuration.KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE;
+import static org.apache.flink.kubernetes.operator.kubeclient.utils.TestUtils.CLUSTER_ID;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+/** @link StandaloneKubernetesJobManagerFactory unit tests */
+public class StandaloneKubernetesJobManagerFactoryTest extends ParametersTestBase {
+
+    KubernetesJobManagerSpecification jmSpec;
+
+    @BeforeEach
+    public void setup() throws Exception {
+        setupFlinkConfig();
+        flinkConfig.set(KubernetesConfigOptions.FLINK_CONF_DIR, "/missing/dir");
+        FlinkPod podTemplate = createPodTemplate();
+        StandaloneKubernetesJobManagerParameters tmParameters =
+                new StandaloneKubernetesJobManagerParameters(
+                        flinkConfig, TestUtils.createClusterSpecification());
+        jmSpec =
+                StandaloneKubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(
+                        podTemplate, tmParameters);
+    }
+
+    @Test
+    public void testDeploymentMetadata() {
+        ObjectMeta deploymentMetadata = jmSpec.getDeployment().getMetadata();
+        assertEquals(
+                StandaloneKubernetesUtils.getJobManagerDeploymentName(CLUSTER_ID),
+                deploymentMetadata.getName());
+
+        final Map<String, String> expectedLabels =
+                new HashMap<>(StandaloneKubernetesUtils.getJobManagerSelectors(CLUSTER_ID));
+        expectedLabels.putAll(userLabels);
+        assertEquals(expectedLabels, deploymentMetadata.getLabels());
+
+        assertEquals(userAnnotations, deploymentMetadata.getAnnotations());
+    }
+
+    @Test
+    public void testDeploymentSpec() {
+        DeploymentSpec deploymentSpec = jmSpec.getDeployment().getSpec();
+
+        assertEquals(1, deploymentSpec.getReplicas());
+        assertEquals(
+                StandaloneKubernetesUtils.getJobManagerSelectors(CLUSTER_ID),
+                deploymentSpec.getSelector().getMatchLabels());
+    }
+
+    @Test
+    public void testTemplateMetadata() {
+        final ObjectMeta podMetadata = jmSpec.getDeployment().getSpec().getTemplate().getMetadata();
+
+        final Map<String, String> expectedLabels =
+                new HashMap<>(StandaloneKubernetesUtils.getJobManagerSelectors(CLUSTER_ID));
+        expectedLabels.putAll(userLabels);
+        expectedLabels.putAll(templateLabels);
+        assertEquals(expectedLabels, podMetadata.getLabels());
+
+        final Map<String, String> expectedAnnotations = new HashMap<>(userAnnotations);
+        expectedAnnotations.putAll(templateAnnotations);
+        assertEquals(expectedAnnotations, podMetadata.getAnnotations());
+    }
+
+    @Test
+    public void testTemplateSpec() {
+        final PodSpec podSpec = jmSpec.getDeployment().getSpec().getTemplate().getSpec();
+
+        assertEquals(1, podSpec.getContainers().size());
+        assertEquals(TestUtils.SERVICE_ACCOUNT, podSpec.getServiceAccountName());
+        // Config and secret volumes
+        assertEquals(3, podSpec.getVolumes().size());
+
+        final Container mainContainer = podSpec.getContainers().get(0);
+        assertEquals(Constants.MAIN_CONTAINER_NAME, mainContainer.getName());
+        assertEquals(TestUtils.IMAGE, mainContainer.getImage());
+        assertEquals(TestUtils.IMAGE_POLICY, mainContainer.getImagePullPolicy());
+
+        final Map<String, String> envs = new HashMap<>();
+        mainContainer.getEnv().forEach(env -> envs.put(env.getName(), env.getValue()));
+
+        Map<String, String> expectedEnvs = new HashMap<>(templateEnvs);
+        expectedEnvs.put(Constants.ENV_FLINK_POD_IP_ADDRESS, null);
+        assertEquals(expectedEnvs, envs);
+
+        final List<ContainerPort> expectedContainerPorts =
+                Arrays.asList(
+                        new ContainerPortBuilder()
+                                .withName(Constants.JOB_MANAGER_RPC_PORT_NAME)
+                                .withContainerPort(6123)
+                                .build(),
+                        new ContainerPortBuilder()
+                                .withName(Constants.BLOB_SERVER_PORT_NAME)
+                                .withContainerPort(Constants.BLOB_SERVER_PORT)
+                                .build(),
+                        new ContainerPortBuilder()
+                                .withName(Constants.REST_PORT_NAME)
+                                .withContainerPort(Constants.REST_PORT)
+                                .build(),
+                        new ContainerPortBuilder()
+                                .withName(TEMPLATE_PORT_NAME)
+                                .withContainerPort(TEMPLATE_PORT)
+                                .build());
+
+        assertThat(mainContainer.getPorts(), containsInAnyOrder(expectedContainerPorts.toArray()));
+
+        final ResourceRequirements resourceRequirements = mainContainer.getResources();
+
+        final Map<String, Quantity> requests = resourceRequirements.getRequests();
+        assertEquals(Double.toString(TestUtils.JOB_MANAGER_CPU), requests.get("cpu").getAmount());
+        assertEquals(
+                String.valueOf(TestUtils.JOB_MANAGER_MEMORY_MB),
+                requests.get("memory").getAmount());
+
+        final Map<String, Quantity> limits = resourceRequirements.getLimits();
+        assertEquals(Double.toString(TestUtils.JOB_MANAGER_CPU), limits.get("cpu").getAmount());
+        assertEquals(
+                String.valueOf(TestUtils.JOB_MANAGER_MEMORY_MB), limits.get("memory").getAmount());
+
+        assertEquals(3, mainContainer.getVolumeMounts().size());
+    }
+
+    @Test
+    public void testAdditionalResourcesSize() throws IOException {
+        final List<HasMetadata> resultAdditionalResources = this.jmSpec.getAccompanyingResources();
+        assertEquals(3, resultAdditionalResources.size());
+
+        final List<HasMetadata> resultServices =
+                resultAdditionalResources.stream()
+                        .filter(x -> x instanceof Service)
+                        .collect(Collectors.toList());
+        assertEquals(2, resultServices.size());
+
+        final List<HasMetadata> resultConfigMaps =
+                resultAdditionalResources.stream()
+                        .filter(x -> x instanceof ConfigMap)
+                        .collect(Collectors.toList());
+        assertEquals(1, resultConfigMaps.size());
+    }
+
+    @Test
+    public void testServices() throws IOException {
+        final List<Service> resultServices =
+                this.jmSpec.getAccompanyingResources().stream()
+                        .filter(x -> x instanceof Service)
+                        .map(x -> (Service) x)
+                        .collect(Collectors.toList());
+
+        assertEquals(2, resultServices.size());
+
+        final List<Service> internalServiceCandidates =
+                resultServices.stream()
+                        .filter(
+                                x ->
+                                        x.getMetadata()
+                                                .getName()
+                                                .equals(
+                                                        InternalServiceDecorator
+                                                                .getInternalServiceName(
+                                                                        CLUSTER_ID)))
+                        .collect(Collectors.toList());
+        assertEquals(1, internalServiceCandidates.size());
+
+        final List<Service> restServiceCandidates =
+                resultServices.stream()
+                        .filter(
+                                x ->
+                                        x.getMetadata()
+                                                .getName()
+                                                .equals(
+                                                        ExternalServiceDecorator
+                                                                .getExternalServiceName(
+                                                                        CLUSTER_ID)))
+                        .collect(Collectors.toList());
+        assertEquals(1, restServiceCandidates.size());
+
+        final Service resultInternalService = internalServiceCandidates.get(0);
+        assertEquals(2, resultInternalService.getMetadata().getLabels().size());
+
+        assertNull(resultInternalService.getSpec().getType());
+        assertEquals(
+                HeadlessClusterIPService.HEADLESS_CLUSTER_IP,
+                resultInternalService.getSpec().getClusterIP());
+        assertEquals(2, resultInternalService.getSpec().getPorts().size());
+        assertEquals(3, resultInternalService.getSpec().getSelector().size());
+
+        final Service resultRestService = restServiceCandidates.get(0);
+        assertEquals(2, resultRestService.getMetadata().getLabels().size());
+
+        assertEquals(
+                REST_SERVICE_EXPOSED_TYPE.defaultValue().toString(),
+                resultRestService.getSpec().getType());
+        assertEquals(1, resultRestService.getSpec().getPorts().size());
+        assertEquals(3, resultRestService.getSpec().getSelector().size());
+    }
+
+    @Test
+    public void testFlinkConfConfigMap() throws IOException {
+        final ConfigMap resultConfigMap =
+                (ConfigMap)
+                        jmSpec.getAccompanyingResources().stream()
+                                .filter(
+                                        x ->
+                                                x instanceof ConfigMap
+                                                        && x.getMetadata()
+                                                                .getName()
+                                                                .equals(
+                                                                        FlinkConfMountDecorator
+                                                                                .getFlinkConfConfigMapName(
+                                                                                        CLUSTER_ID)))
+                                .collect(Collectors.toList())
+                                .get(0);
+
+        assertEquals(2, resultConfigMap.getMetadata().getLabels().size());
+
+        final Map<String, String> resultData = resultConfigMap.getData();
+        assertEquals(1, resultData.size());
+    }
+}
diff --git a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesTaskManagerFactoryTest.java b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesTaskManagerFactoryTest.java
new file mode 100644
index 00000000..8c8d4e62
--- /dev/null
+++ b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesTaskManagerFactoryTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.kubeclient.factory;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkPod;
+import org.apache.flink.kubernetes.operator.kubeclient.parameters.ParametersTestBase;
+import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesTaskManagerParameters;
+import org.apache.flink.kubernetes.operator.kubeclient.utils.TestUtils;
+import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
+import org.apache.flink.kubernetes.utils.Constants;
+
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ContainerPort;
+import io.fabric8.kubernetes.api.model.ContainerPortBuilder;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.PodSpec;
+import io.fabric8.kubernetes.api.model.Quantity;
+import io.fabric8.kubernetes.api.model.ResourceRequirements;
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** @link StandaloneKubernetesJobManagerFactory unit tests */
+public class StandaloneKubernetesTaskManagerFactoryTest extends ParametersTestBase {
+
+    private Deployment deployment;
+
+    @BeforeEach
+    public void setup() {
+        setupFlinkConfig();
+        FlinkPod podTemplate = createPodTemplate();
+        StandaloneKubernetesTaskManagerParameters tmParameters =
+                new StandaloneKubernetesTaskManagerParameters(
+                        flinkConfig, TestUtils.createClusterSpecification());
+        deployment =
+                StandaloneKubernetesTaskManagerFactory.buildKubernetesTaskManagerDeployment(
+                        podTemplate, tmParameters);
+    }
+
+    @Test
+    public void testDeploymentMetadata() {
+        assertEquals(
+                StandaloneKubernetesUtils.getTaskManagerDeploymentName(TestUtils.CLUSTER_ID),
+                deployment.getMetadata().getName());
+
+        final Map<String, String> expectedLabels =
+                new HashMap<>(
+                        StandaloneKubernetesUtils.getTaskManagerSelectors(TestUtils.CLUSTER_ID));
+        expectedLabels.putAll(userLabels);
+        assertEquals(expectedLabels, deployment.getMetadata().getLabels());
+
+        assertEquals(userAnnotations, deployment.getMetadata().getAnnotations());
+    }
+
+    @Test
+    public void testDeploymentSpec() {
+        assertEquals(1, deployment.getSpec().getReplicas());
+        assertEquals(
+                StandaloneKubernetesUtils.getTaskManagerSelectors(TestUtils.CLUSTER_ID),
+                deployment.getSpec().getSelector().getMatchLabels());
+    }
+
+    @Test
+    public void testTemplateMetadata() {
+        final ObjectMeta podMetadata = deployment.getSpec().getTemplate().getMetadata();
+
+        final Map<String, String> expectedLabels =
+                new HashMap<>(
+                        StandaloneKubernetesUtils.getTaskManagerSelectors(TestUtils.CLUSTER_ID));
+        expectedLabels.putAll(userLabels);
+        expectedLabels.putAll(templateLabels);
+        assertEquals(expectedLabels, podMetadata.getLabels());
+
+        final Map<String, String> expectedAnnotations = new HashMap<>(userAnnotations);
+        expectedAnnotations.putAll(templateAnnotations);
+        assertEquals(expectedAnnotations, podMetadata.getAnnotations());
+    }
+
+    @Test
+    public void testTemplateSpec() {
+        final PodSpec podSpec = deployment.getSpec().getTemplate().getSpec();
+
+        assertEquals(1, podSpec.getContainers().size());
+        assertEquals(TestUtils.SERVICE_ACCOUNT, podSpec.getServiceAccountName());
+        // Config and secret volumes
+        assertEquals(2, podSpec.getVolumes().size());
+
+        final Container mainContainer = podSpec.getContainers().get(0);
+        assertEquals(Constants.MAIN_CONTAINER_NAME, mainContainer.getName());
+        assertEquals(TestUtils.IMAGE, mainContainer.getImage());
+        assertEquals(TestUtils.IMAGE_POLICY, mainContainer.getImagePullPolicy());
+
+        final Map<String, String> envs = new HashMap<>();
+        mainContainer.getEnv().forEach(env -> envs.put(env.getName(), env.getValue()));
+
+        assertEquals(templateEnvs, envs);
+
+        final List<ContainerPort> expectedContainerPorts =
+                Arrays.asList(
+                        new ContainerPortBuilder()
+                                .withName(Constants.TASK_MANAGER_RPC_PORT_NAME)
+                                .withContainerPort(Constants.TASK_MANAGER_RPC_PORT)
+                                .build(),
+                        new ContainerPortBuilder()
+                                .withName(TEMPLATE_PORT_NAME)
+                                .withContainerPort(TEMPLATE_PORT)
+                                .build());
+
+        assertThat(mainContainer.getPorts(), containsInAnyOrder(expectedContainerPorts.toArray()));
+
+        final ResourceRequirements resourceRequirements = mainContainer.getResources();
+
+        final Map<String, Quantity> requests = resourceRequirements.getRequests();
+        assertEquals(Double.toString(TestUtils.TASK_MANAGER_CPU), requests.get("cpu").getAmount());
+        assertEquals(
+                String.valueOf(TestUtils.TASK_MANAGER_MEMORY_MB),
+                requests.get("memory").getAmount());
+
+        final Map<String, Quantity> limits = resourceRequirements.getLimits();
+        assertEquals(Double.toString(TestUtils.TASK_MANAGER_CPU), limits.get("cpu").getAmount());
+        assertEquals(
+                String.valueOf(TestUtils.TASK_MANAGER_MEMORY_MB), limits.get("memory").getAmount());
+
+        assertEquals(2, mainContainer.getVolumeMounts().size());
+    }
+}
diff --git a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptorTest.java b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptorTest.java
new file mode 100644
index 00000000..8aae921e
--- /dev/null
+++ b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptorTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.standalone;
+
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.deployment.application.ApplicationConfiguration;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ClusterClientProvider;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
+import org.apache.flink.kubernetes.operator.kubeclient.Fabric8FlinkStandaloneKubeClient;
+import org.apache.flink.kubernetes.operator.kubeclient.FlinkStandaloneKubeClient;
+import org.apache.flink.kubernetes.operator.kubeclient.utils.TestUtils;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.util.concurrent.Executors;
+
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** @link KubernetesStandaloneClusterDescriptor unit tests */
+@EnableKubernetesMockClient(crud = true)
+public class KubernetesStandaloneClusterDescriptorTest {
+
+    private KubernetesStandaloneClusterDescriptor clusterDescriptor;
+    KubernetesMockServer mockServer;
+    private NamespacedKubernetesClient kubernetesClient;
+    private FlinkStandaloneKubeClient flinkKubeClient;
+    private Configuration flinkConfig = new Configuration();
+
+    @BeforeEach
+    public void setup() {
+        flinkConfig = TestUtils.createTestFlinkConfig();
+        kubernetesClient = mockServer.createClient().inNamespace(TestUtils.TEST_NAMESPACE);
+        flinkKubeClient =
+                new Fabric8FlinkStandaloneKubeClient(
+                        flinkConfig, kubernetesClient, Executors.newDirectExecutorService());
+
+        clusterDescriptor = new KubernetesStandaloneClusterDescriptor(flinkConfig, flinkKubeClient);
+    }
+
+    @Test
+    public void testDeploySessionCluster() throws Exception {
+        ClusterSpecification clusterSpecification = TestUtils.createClusterSpecification();
+
+        flinkConfig.setString(BlobServerOptions.PORT, String.valueOf(0));
+        flinkConfig.setString(TaskManagerOptions.RPC_PORT, String.valueOf(0));
+        flinkConfig.setString(RestOptions.BIND_PORT, String.valueOf(0));
+
+        ClusterClientProvider clusterClientProvider =
+                clusterDescriptor.deploySessionCluster(clusterSpecification);
+
+        List<Deployment> deployments =
+                kubernetesClient
+                        .apps()
+                        .deployments()
+                        .inNamespace(TestUtils.TEST_NAMESPACE)
+                        .list()
+                        .getItems();
+        String expectedJMDeploymentName = TestUtils.CLUSTER_ID;
+        String expectedTMDeploymentName = TestUtils.CLUSTER_ID + "-taskmanager";
+
+        assertEquals(2, deployments.size());
+        assertThat(
+                deployments.stream()
+                        .map(d -> d.getMetadata().getName())
+                        .collect(Collectors.toList()),
+                containsInAnyOrder(expectedJMDeploymentName, expectedTMDeploymentName));
+        assertEquals(
+                flinkConfig.get(BlobServerOptions.PORT),
+                String.valueOf(Constants.BLOB_SERVER_PORT));
+        assertEquals(
+                flinkConfig.get(TaskManagerOptions.RPC_PORT),
+                String.valueOf(Constants.TASK_MANAGER_RPC_PORT));
+        assertEquals(flinkConfig.get(RestOptions.BIND_PORT), String.valueOf(Constants.REST_PORT));
+
+        Deployment jmDeployment =
+                deployments.stream()
+                        .filter(d -> d.getMetadata().getName().equals(expectedJMDeploymentName))
+                        .findFirst()
+                        .orElse(null);
+        assertTrue(
+                jmDeployment.getSpec().getTemplate().getSpec().getContainers().stream()
+                        .anyMatch(c -> c.getArgs().contains("jobmanager")));
+
+        ClusterClient clusterClient = clusterClientProvider.getClusterClient();
+
+        String expectedWebUrl =
+                String.format(
+                        "http://%s:%d",
+                        ExternalServiceDecorator.getNamespacedExternalServiceName(
+                                TestUtils.CLUSTER_ID, TestUtils.TEST_NAMESPACE),
+                        Constants.REST_PORT);
+        assertEquals(expectedWebUrl, clusterClient.getWebInterfaceURL());
+    }
+
+    @Test
+    public void testDeployApplicationCluster() throws Exception {
+        ClusterSpecification clusterSpecification = TestUtils.createClusterSpecification();
+
+        flinkConfig.setString(BlobServerOptions.PORT, String.valueOf(0));
+        flinkConfig.setString(TaskManagerOptions.RPC_PORT, String.valueOf(0));
+        flinkConfig.setString(RestOptions.BIND_PORT, String.valueOf(0));
+
+        ClusterClientProvider clusterClientProvider =
+                clusterDescriptor.deployApplicationCluster(
+                        clusterSpecification,
+                        ApplicationConfiguration.fromConfiguration(flinkConfig));
+
+        List<Deployment> deployments =
+                kubernetesClient
+                        .apps()
+                        .deployments()
+                        .inNamespace(TestUtils.TEST_NAMESPACE)
+                        .list()
+                        .getItems();
+        String expectedJMDeploymentName = TestUtils.CLUSTER_ID;
+        String expectedTMDeploymentName = TestUtils.CLUSTER_ID + "-taskmanager";
+
+        assertEquals(2, deployments.size());
+        assertThat(
+                deployments.stream()
+                        .map(d -> d.getMetadata().getName())
+                        .collect(Collectors.toList()),
+                containsInAnyOrder(expectedJMDeploymentName, expectedTMDeploymentName));
+        assertEquals(
+                flinkConfig.get(BlobServerOptions.PORT),
+                String.valueOf(Constants.BLOB_SERVER_PORT));
+        assertEquals(
+                flinkConfig.get(TaskManagerOptions.RPC_PORT),
+                String.valueOf(Constants.TASK_MANAGER_RPC_PORT));
+        assertEquals(flinkConfig.get(RestOptions.BIND_PORT), String.valueOf(Constants.REST_PORT));
+
+        Deployment jmDeployment =
+                deployments.stream()
+                        .filter(d -> d.getMetadata().getName().equals(expectedJMDeploymentName))
+                        .findFirst()
+                        .orElse(null);
+        assertTrue(
+                jmDeployment.getSpec().getTemplate().getSpec().getContainers().stream()
+                        .anyMatch(c -> c.getArgs().contains("standalone-job")));
+
+        ClusterClient clusterClient = clusterClientProvider.getClusterClient();
+
+        String expectedWebUrl =
+                String.format(
+                        "http://%s:%d",
+                        ExternalServiceDecorator.getNamespacedExternalServiceName(
+                                TestUtils.CLUSTER_ID, TestUtils.TEST_NAMESPACE),
+                        Constants.REST_PORT);
+        assertEquals(expectedWebUrl, clusterClient.getWebInterfaceURL());
+    }
+}