You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/03/01 08:19:29 UTC

[flink-kubernetes-operator] branch main updated (a5ecd44 -> 61799f3)

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git.


    from a5ecd44  [FLINK-26356] Directly use service name to make up rest endpoint
     new 38c366d  [FLINK-26336] Call cancel on deletion & clean up configmaps as well
     new 61799f3  [FLINK-26405] Add validation check of num of JM replica

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../controller/FlinkDeploymentController.java      |  18 ++--
 .../operator/reconciler/BaseReconciler.java        |  62 ++++++++++---
 .../reconciler/JobManagerDeploymentStatus.java     |  58 ++++++++++++
 .../operator/reconciler/JobReconciler.java         |  27 ++++--
 .../operator/reconciler/SessionReconciler.java     |  16 ++--
 .../kubernetes/operator/service/FlinkService.java  |  30 ++-----
 .../kubernetes/operator/utils/FlinkUtils.java      | 100 ++++++++++++++++++++-
 .../validation/DefaultDeploymentValidator.java     |  21 ++++-
 .../kubernetes/operator/TestingFlinkService.java   |   3 +-
 .../controller/FlinkDeploymentControllerTest.java  |  12 ++-
 .../validation/DeploymentValidatorTest.java        |   6 ++
 11 files changed, 283 insertions(+), 70 deletions(-)
 create mode 100644 flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobManagerDeploymentStatus.java

[flink-kubernetes-operator] 02/02: [FLINK-26405] Add validation check of num of JM replica

Posted by gy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 61799f33905fb93c2e90599c0439a5df03190506
Author: bgeng777 <ge...@alibaba-inc.com>
AuthorDate: Tue Mar 1 13:49:47 2022 +0800

    [FLINK-26405] Add validation check of num of JM replica
    
    Closes #29
---
 .../validation/DefaultDeploymentValidator.java      | 21 ++++++++++++++++++---
 .../validation/DeploymentValidatorTest.java         |  6 ++++++
 2 files changed, 24 insertions(+), 3 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
index e090ec4..997e109 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
@@ -18,6 +18,7 @@
 package org.apache.flink.kubernetes.operator.validation;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
@@ -43,7 +44,7 @@ public class DefaultDeploymentValidator implements FlinkDeploymentValidator {
         return firstPresent(
                 validateFlinkConfig(spec.getFlinkConfiguration()),
                 validateJobSpec(spec.getJob()),
-                validateJmSpec(spec.getJobManager()),
+                validateJmSpec(spec.getJobManager(), spec.getFlinkConfiguration()),
                 validateTmSpec(spec.getTaskManager()),
                 validateSpecChange(deployment));
     }
@@ -86,12 +87,26 @@ public class DefaultDeploymentValidator implements FlinkDeploymentValidator {
         return Optional.empty();
     }
 
-    private Optional<String> validateJmSpec(JobManagerSpec jmSpec) {
+    private Optional<String> validateJmSpec(JobManagerSpec jmSpec, Map<String, String> confMap) {
         if (jmSpec == null) {
             return Optional.empty();
         }
 
-        return validateResources("JobManager", jmSpec.getResource());
+        return firstPresent(
+                validateResources("JobManager", jmSpec.getResource()),
+                validateJmReplicas("JobManager", jmSpec.getReplicas(), confMap));
+    }
+
+    private Optional<String> validateJmReplicas(
+            String component, int replicas, Map<String, String> confMap) {
+        if (!confMap.containsKey(HighAvailabilityOptions.HA_MODE.key()) && replicas != 1) {
+            return Optional.of(
+                    component
+                            + " replicas should be 1 when "
+                            + HighAvailabilityOptions.HA_MODE.key()
+                            + " is not set.");
+        }
+        return Optional.empty();
     }
 
     private Optional<String> validateTmSpec(TaskManagerSpec tmSpec) {
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
index 030d4e5..38204f9 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.kubernetes.operator.validation;
 
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
@@ -71,6 +72,11 @@ public class DeploymentValidatorTest {
                                         Collections.singletonMap(
                                                 KubernetesConfigOptions.NAMESPACE.key(), "myns")),
                 "Forbidden Flink config key");
+        testError(
+                dep -> dep.getSpec().getJobManager().setReplicas(2),
+                "JobManager replicas should be 1 when "
+                        + HighAvailabilityOptions.HA_MODE.key()
+                        + " is not set.");
 
         // Test resource validation
         testSuccess(dep -> dep.getSpec().getTaskManager().getResource().setMemory("1G"));

[flink-kubernetes-operator] 01/02: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by gy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 38c366de0d8fa11751051eadbddcf19cb1f31e8f
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Fri Feb 25 15:45:42 2022 +0100

    [FLINK-26336] Call cancel on deletion & clean up configmaps as well
    
    Closes #28
---
 .../controller/FlinkDeploymentController.java      |  18 ++--
 .../operator/reconciler/BaseReconciler.java        |  62 ++++++++++---
 .../reconciler/JobManagerDeploymentStatus.java     |  58 ++++++++++++
 .../operator/reconciler/JobReconciler.java         |  27 ++++--
 .../operator/reconciler/SessionReconciler.java     |  16 ++--
 .../kubernetes/operator/service/FlinkService.java  |  30 ++-----
 .../kubernetes/operator/utils/FlinkUtils.java      | 100 ++++++++++++++++++++-
 .../kubernetes/operator/TestingFlinkService.java   |   3 +-
 .../controller/FlinkDeploymentControllerTest.java  |  12 ++-
 9 files changed, 259 insertions(+), 67 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index 2e79926..5dff8f4 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -27,7 +27,6 @@ import org.apache.flink.kubernetes.operator.reconciler.BaseReconciler;
 import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
 import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
-import org.apache.flink.kubernetes.operator.utils.IngressUtils;
 import org.apache.flink.kubernetes.operator.validation.FlinkDeploymentValidator;
 import org.apache.flink.kubernetes.utils.Constants;
 
@@ -86,16 +85,13 @@ public class FlinkDeploymentController
 
     @Override
     public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
-        LOG.info("Cleaning up application cluster {}", flinkApp.getMetadata().getName());
-        FlinkUtils.deleteCluster(flinkApp, kubernetesClient);
-        IngressUtils.updateIngressRules(
-                flinkApp,
-                FlinkUtils.getEffectiveConfig(flinkApp, defaultConfig.getFlinkConfig()),
-                operatorNamespace,
-                kubernetesClient,
-                true);
-        getReconciler(flinkApp).removeDeployment(flinkApp);
-        return DeleteControl.defaultDelete();
+        LOG.info("Stopping cluster {}", flinkApp.getMetadata().getName());
+        return getReconciler(flinkApp)
+                .shutdownAndDelete(
+                        operatorNamespace,
+                        flinkApp,
+                        context,
+                        FlinkUtils.getEffectiveConfig(flinkApp, defaultConfig.getFlinkConfig()));
     }
 
     @Override
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
index f1c0c23..d6cbb29 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
@@ -20,18 +20,21 @@ package org.apache.flink.kubernetes.operator.reconciler;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+import org.apache.flink.kubernetes.operator.utils.IngressUtils;
 
 import io.fabric8.kubernetes.api.model.apps.Deployment;
 import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
 import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
+import io.fabric8.kubernetes.client.KubernetesClient;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
 import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashSet;
 import java.util.Optional;
-import java.util.concurrent.TimeUnit;
 
 /** BaseReconciler with functionality that is common to job and session modes. */
 public abstract class BaseReconciler {
@@ -43,6 +46,14 @@ public abstract class BaseReconciler {
 
     private final HashSet<String> jobManagerDeployments = new HashSet<>();
 
+    protected final KubernetesClient kubernetesClient;
+    protected final FlinkService flinkService;
+
+    public BaseReconciler(KubernetesClient kubernetesClient, FlinkService flinkService) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+    }
+
     public boolean removeDeployment(FlinkDeployment flinkApp) {
         return jobManagerDeployments.remove(flinkApp.getMetadata().getUid());
     }
@@ -54,11 +65,8 @@ public abstract class BaseReconciler {
             Configuration effectiveConfig)
             throws Exception;
 
-    protected UpdateControl<FlinkDeployment> checkJobManagerDeployment(
-            FlinkDeployment flinkApp,
-            Context context,
-            Configuration effectiveConfig,
-            FlinkService flinkService) {
+    protected JobManagerDeploymentStatus checkJobManagerDeployment(
+            FlinkDeployment flinkApp, Context context, Configuration effectiveConfig) {
         if (!jobManagerDeployments.contains(flinkApp.getMetadata().getUid())) {
             Optional<Deployment> deployment = context.getSecondaryResource(Deployment.class);
             if (deployment.isPresent()) {
@@ -78,26 +86,54 @@ public abstract class BaseReconciler {
                         if (flinkApp.getStatus().getJobStatus() != null) {
                             // pre-existing deployments on operator restart - proceed with
                             // reconciliation
-                            return null;
+                            return JobManagerDeploymentStatus.READY;
                         }
                     }
                     LOG.info(
                             "JobManager deployment {} in namespace {} port not ready",
                             flinkApp.getMetadata().getName(),
                             flinkApp.getMetadata().getNamespace());
-                    return UpdateControl.updateStatus(flinkApp)
-                            .rescheduleAfter(PORT_READY_DELAY_SECONDS, TimeUnit.SECONDS);
+                    return JobManagerDeploymentStatus.DEPLOYED_NOT_READY;
                 }
                 LOG.info(
                         "JobManager deployment {} in namespace {} not yet ready, status {}",
                         flinkApp.getMetadata().getName(),
                         flinkApp.getMetadata().getNamespace(),
                         status);
-                // TODO: how frequently do we want here
-                return UpdateControl.updateStatus(flinkApp)
-                        .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
+
+                return JobManagerDeploymentStatus.DEPLOYING;
             }
+            return JobManagerDeploymentStatus.MISSING;
         }
-        return null;
+        return JobManagerDeploymentStatus.READY;
     }
+
+    /**
+     * Shuts down the job and deletes all kubernetes resources including k8s HA resources. It will
+     * first perform a graceful shutdown (cancel) before deleting the data if that is unsuccessful
+     * in order to avoid leaking HA metadata to durable storage.
+     *
+     * <p>This feature is limited at the moment to cleaning up native kubernetes HA resources, other
+     * HA providers like ZK need to be cleaned up manually after deletion.
+     */
+    public DeleteControl shutdownAndDelete(
+            String operatorNamespace,
+            FlinkDeployment flinkApp,
+            Context context,
+            Configuration effectiveConfig) {
+
+        if (checkJobManagerDeployment(flinkApp, context, effectiveConfig)
+                == JobManagerDeploymentStatus.READY) {
+            shutdown(flinkApp, effectiveConfig);
+        } else {
+            FlinkUtils.deleteCluster(flinkApp, kubernetesClient, true);
+        }
+        removeDeployment(flinkApp);
+        IngressUtils.updateIngressRules(
+                flinkApp, effectiveConfig, operatorNamespace, kubernetesClient, true);
+
+        return DeleteControl.defaultDelete();
+    }
+
+    protected abstract void shutdown(FlinkDeployment flinkApp, Configuration effectiveConfig);
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobManagerDeploymentStatus.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobManagerDeploymentStatus.java
new file mode 100644
index 0000000..5e634a9
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobManagerDeploymentStatus.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.PORT_READY_DELAY_SECONDS;
+import static org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.REFRESH_SECONDS;
+
+/** Status of the Flink JobManager Kubernetes deployment. */
+public enum JobManagerDeploymentStatus {
+
+    /** JobManager is running and ready to receive REST API calls. */
+    READY,
+
+    /** JobManager is running but not ready yet to receive REST API calls. */
+    DEPLOYED_NOT_READY,
+
+    /** JobManager process is starting up. */
+    DEPLOYING,
+
+    /** JobManager deployment not found, probably not started or killed by user. */
+    MISSING;
+
+    public UpdateControl<FlinkDeployment> toUpdateControl(FlinkDeployment flinkDeployment) {
+        switch (this) {
+            case DEPLOYING:
+                return UpdateControl.updateStatus(flinkDeployment)
+                        .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
+            case DEPLOYED_NOT_READY:
+                return UpdateControl.updateStatus(flinkDeployment)
+                        .rescheduleAfter(PORT_READY_DELAY_SECONDS, TimeUnit.SECONDS);
+            case READY:
+            case MISSING:
+            default:
+                return null;
+        }
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
index 7a072e0..ee121e9 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
@@ -27,6 +27,7 @@ import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.IngressUtils;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
 
@@ -47,13 +48,10 @@ public class JobReconciler extends BaseReconciler {
 
     private static final Logger LOG = LoggerFactory.getLogger(JobReconciler.class);
 
-    private final KubernetesClient kubernetesClient;
-    private final FlinkService flinkService;
     private final JobStatusObserver observer;
 
     public JobReconciler(KubernetesClient kubernetesClient, FlinkService flinkService) {
-        this.kubernetesClient = kubernetesClient;
-        this.flinkService = flinkService;
+        super(kubernetesClient, flinkService);
         this.observer = new JobStatusObserver(flinkService);
     }
 
@@ -78,7 +76,8 @@ public class JobReconciler extends BaseReconciler {
 
         // wait until the deployment is ready
         UpdateControl<FlinkDeployment> uc =
-                checkJobManagerDeployment(flinkApp, context, effectiveConfig, flinkService);
+                checkJobManagerDeployment(flinkApp, context, effectiveConfig)
+                        .toUpdateControl(flinkApp);
         if (uc != null) {
             return uc;
         }
@@ -179,4 +178,22 @@ public class JobReconciler extends BaseReconciler {
         savepointOpt.ifPresent(jobStatus::setSavepointLocation);
         return savepointOpt;
     }
+
+    @Override
+    protected void shutdown(FlinkDeployment flinkApp, Configuration effectiveConfig) {
+        if (flinkApp.getStatus().getJobStatus() != null
+                && flinkApp.getStatus().getJobStatus().getJobId() != null) {
+            try {
+                flinkService.cancelJob(
+                        JobID.fromHexString(flinkApp.getStatus().getJobStatus().getJobId()),
+                        UpgradeMode.STATELESS,
+                        effectiveConfig);
+                return;
+            } catch (Exception e) {
+                LOG.error("Could not shut down cluster gracefully, deleting...", e);
+            }
+        }
+
+        FlinkUtils.deleteCluster(flinkApp, kubernetesClient, true);
+    }
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
index 0c9ade3..e72de56 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
@@ -39,12 +39,8 @@ public class SessionReconciler extends BaseReconciler {
 
     private static final Logger LOG = LoggerFactory.getLogger(SessionReconciler.class);
 
-    private final KubernetesClient kubernetesClient;
-    private final FlinkService flinkService;
-
     public SessionReconciler(KubernetesClient kubernetesClient, FlinkService flinkService) {
-        this.kubernetesClient = kubernetesClient;
-        this.flinkService = flinkService;
+        super(kubernetesClient, flinkService);
     }
 
     @Override
@@ -65,7 +61,8 @@ public class SessionReconciler extends BaseReconciler {
         }
 
         UpdateControl<FlinkDeployment> uc =
-                checkJobManagerDeployment(flinkApp, context, effectiveConfig, flinkService);
+                checkJobManagerDeployment(flinkApp, context, effectiveConfig)
+                        .toUpdateControl(flinkApp);
         if (uc != null) {
             return uc;
         }
@@ -81,7 +78,12 @@ public class SessionReconciler extends BaseReconciler {
 
     private void upgradeSessionCluster(FlinkDeployment flinkApp, Configuration effectiveConfig)
             throws Exception {
-        flinkService.stopSessionCluster(flinkApp, effectiveConfig);
+        flinkService.stopSessionCluster(flinkApp, effectiveConfig, false);
         flinkService.submitSessionCluster(flinkApp, effectiveConfig);
     }
+
+    @Override
+    protected void shutdown(FlinkDeployment flinkApp, Configuration effectiveConfig) {
+        flinkService.stopSessionCluster(flinkApp, effectiveConfig, true);
+    }
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index 7e680b6..dd3d8fb 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -31,7 +31,6 @@ import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
-import org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient;
 import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
@@ -54,7 +53,6 @@ import java.net.SocketTimeoutException;
 import java.net.URI;
 import java.util.Collection;
 import java.util.Optional;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 /** Service for submitting and interacting with Flink clusters and jobs. */
@@ -161,35 +159,19 @@ public class FlinkService {
                     }
                     final String namespace = conf.getString(KubernetesConfigOptions.NAMESPACE);
                     final String clusterId = clusterClient.getClusterId();
-                    FlinkUtils.deleteCluster(namespace, clusterId, kubernetesClient);
+                    FlinkUtils.deleteCluster(namespace, clusterId, kubernetesClient, false);
                     break;
                 default:
                     throw new RuntimeException("Unsupported upgrade mode " + upgradeMode);
             }
         }
-        waitForClusterShutdown(conf);
+        FlinkUtils.waitForClusterShutdown(kubernetesClient, conf);
         return savepointOpt;
     }
 
-    public void stopSessionCluster(FlinkDeployment deployment, Configuration conf)
-            throws Exception {
-        FlinkUtils.deleteCluster(deployment, kubernetesClient);
-        waitForClusterShutdown(conf);
-    }
-
-    /** We need this due to the buggy flink kube cluster client behaviour for now. */
-    private void waitForClusterShutdown(Configuration conf) throws Exception {
-        Fabric8FlinkKubeClient flinkKubeClient =
-                new Fabric8FlinkKubeClient(
-                        conf, kubernetesClient, Executors.newSingleThreadExecutor());
-        for (int i = 0; i < 60; i++) {
-            if (!flinkKubeClient
-                    .getRestEndpoint(conf.get(KubernetesConfigOptions.CLUSTER_ID))
-                    .isPresent()) {
-                break;
-            }
-            LOG.info("Waiting for cluster shutdown... ({})", i);
-            Thread.sleep(1000);
-        }
+    public void stopSessionCluster(
+            FlinkDeployment deployment, Configuration conf, boolean deleteHaData) {
+        FlinkUtils.deleteCluster(deployment, kubernetesClient, deleteHaData);
+        FlinkUtils.waitForClusterShutdown(kubernetesClient, conf);
     }
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index 8a0ef5c..868b946 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -19,20 +19,27 @@ package org.apache.flink.kubernetes.operator.utils;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
 import org.apache.flink.kubernetes.operator.config.DefaultConfig;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodList;
+import io.fabric8.kubernetes.api.model.Service;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Iterator;
 
+import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
+
 /** Flink Utility methods used by the operator. */
 public class FlinkUtils {
 
@@ -108,21 +115,106 @@ public class FlinkUtils {
         }
     }
 
-    public static void deleteCluster(FlinkDeployment flinkApp, KubernetesClient kubernetesClient) {
+    public static void deleteCluster(
+            FlinkDeployment flinkApp,
+            KubernetesClient kubernetesClient,
+            boolean deleteHaConfigmaps) {
         deleteCluster(
                 flinkApp.getMetadata().getNamespace(),
                 flinkApp.getMetadata().getName(),
-                kubernetesClient);
+                kubernetesClient,
+                deleteHaConfigmaps);
     }
 
+    /**
+     * Delete Flink kubernetes cluster by deleting the kubernetes resources directly. Optionally
+     * allows deleting the native kubernetes HA resources as well.
+     *
+     * @param namespace Namespace where the Flink cluster is deployed
+     * @param clusterId ClusterId of the Flink cluster
+     * @param kubernetesClient Kubernetes client
+     * @param deleteHaConfigmaps Flag to indicate whether k8s HA metadata should be removed as well
+     */
     public static void deleteCluster(
-            String namespace, String clusterId, KubernetesClient kubernetesClient) {
+            String namespace,
+            String clusterId,
+            KubernetesClient kubernetesClient,
+            boolean deleteHaConfigmaps) {
         kubernetesClient
                 .apps()
                 .deployments()
                 .inNamespace(namespace)
-                .withName(clusterId)
+                .withName(KubernetesUtils.getDeploymentName(clusterId))
                 .cascading(true)
                 .delete();
+
+        if (deleteHaConfigmaps) {
+            // We need to wait for cluster shutdown otherwise HA configmaps might be recreated
+            waitForClusterShutdown(kubernetesClient, namespace, clusterId);
+            kubernetesClient
+                    .configMaps()
+                    .inNamespace(namespace)
+                    .withLabels(
+                            KubernetesUtils.getConfigMapLabels(
+                                    clusterId, LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY))
+                    .delete();
+        }
+    }
+
+    /** Wait until the FLink cluster has completely shut down. */
+    public static void waitForClusterShutdown(
+            KubernetesClient kubernetesClient, String namespace, String clusterId) {
+
+        boolean jobManagerRunning = true;
+        boolean serviceRunning = true;
+
+        for (int i = 0; i < 60; i++) {
+            if (jobManagerRunning) {
+                PodList jmPodList =
+                        kubernetesClient
+                                .pods()
+                                .inNamespace(namespace)
+                                .withLabels(KubernetesUtils.getJobManagerSelectors(clusterId))
+                                .list();
+
+                if (jmPodList.getItems().isEmpty()) {
+                    jobManagerRunning = false;
+                }
+            }
+
+            if (serviceRunning) {
+                Service service =
+                        kubernetesClient
+                                .services()
+                                .inNamespace(namespace)
+                                .withName(
+                                        ExternalServiceDecorator.getExternalServiceName(clusterId))
+                                .fromServer()
+                                .get();
+                if (service == null) {
+                    serviceRunning = false;
+                }
+            }
+
+            if (!jobManagerRunning && !serviceRunning) {
+                break;
+            }
+            LOG.info("Waiting for cluster shutdown... ({})", i);
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        LOG.info("Cluster shutdown completed.");
+    }
+
+    /** Wait until the FLink cluster has completely shut down. */
+    public static void waitForClusterShutdown(
+            KubernetesClient kubernetesClient, Configuration conf) {
+        FlinkUtils.waitForClusterShutdown(
+                kubernetesClient,
+                conf.getString(KubernetesConfigOptions.NAMESPACE),
+                conf.getString(KubernetesConfigOptions.CLUSTER_ID));
     }
 }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index 626c0ae..610176b 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -97,7 +97,8 @@ public class TestingFlinkService extends FlinkService {
     }
 
     @Override
-    public void stopSessionCluster(FlinkDeployment deployment, Configuration conf) {
+    public void stopSessionCluster(
+            FlinkDeployment deployment, Configuration conf, boolean deleteHa) {
         sessions.remove(deployment.getMetadata().getName());
     }
 
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index ab7c884..ee7a386 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
 import org.apache.flink.kubernetes.operator.reconciler.BaseReconciler;
+import org.apache.flink.kubernetes.operator.reconciler.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
 import org.apache.flink.kubernetes.operator.reconciler.JobReconcilerTest;
 import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
@@ -62,8 +63,10 @@ public class FlinkDeploymentControllerTest {
         updateControl = testController.reconcile(appCluster, context);
         assertTrue(updateControl.isUpdateStatus());
         assertEquals(
-                BaseReconciler.PORT_READY_DELAY_SECONDS * 1000,
-                (long) updateControl.getScheduleDelay().get());
+                JobManagerDeploymentStatus.DEPLOYED_NOT_READY
+                        .toUpdateControl(appCluster)
+                        .getScheduleDelay(),
+                updateControl.getScheduleDelay());
 
         updateControl = testController.reconcile(appCluster, context);
         assertTrue(updateControl.isUpdateStatus());
@@ -153,6 +156,11 @@ public class FlinkDeploymentControllerTest {
         jobs = flinkService.listJobs();
         assertEquals(1, jobs.size());
         assertEquals("savepoint_1", jobs.get(0).f0);
+
+        testController.reconcile(appCluster, context);
+        testController.cleanup(appCluster, context);
+        jobs = flinkService.listJobs();
+        assertEquals(0, jobs.size());
     }
 
     @Test