You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by GitBox <gi...@apache.org> on 2022/02/28 08:35:50 UTC

[GitHub] [flink-kubernetes-operator] gyfora opened a new pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

gyfora opened a new pull request #28:
URL: https://github.com/apache/flink-kubernetes-operator/pull/28


   Seperating the original commit from https://github.com/apache/flink-kubernetes-operator/pull/26 to not interfere with the refactor discussion.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a change in pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on a change in pull request #28:
URL: https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r816388033



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
##########
@@ -108,21 +111,38 @@ private static void mergeInto(JsonNode toNode, JsonNode fromNode) {
         }
     }
 
-    public static void deleteCluster(FlinkDeployment flinkApp, KubernetesClient kubernetesClient) {
+    public static void deleteCluster(
+            FlinkDeployment flinkApp,
+            KubernetesClient kubernetesClient,
+            boolean deleteHaConfigmaps) {
         deleteCluster(
                 flinkApp.getMetadata().getNamespace(),
                 flinkApp.getMetadata().getName(),
-                kubernetesClient);
+                kubernetesClient,
+                deleteHaConfigmaps);
     }
 
     public static void deleteCluster(
-            String namespace, String clusterId, KubernetesClient kubernetesClient) {
+            String namespace,
+            String clusterId,
+            KubernetesClient kubernetesClient,
+            boolean deleteHaConfigmaps) {
         kubernetesClient
                 .apps()
                 .deployments()
                 .inNamespace(namespace)
                 .withName(clusterId)
                 .cascading(true)
                 .delete();
+
+        if (deleteHaConfigmaps) {

Review comment:
       The most appropriate way to the HA data clean up is `HighAvailabilityServices#closeAndCleanupAllData()`. It should work both for ZooKeeper and K8s HA. But I agree we could comment the limitation here and do the improvement later.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] tweise commented on pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
tweise commented on pull request #28:
URL: https://github.com/apache/flink-kubernetes-operator/pull/28#issuecomment-1054880231


   @gyfora I'm going to run through some of the scenarios manually to verify. Hopefully I can add the missing tests in the next days.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #28:
URL: https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r816483886



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobDeploymentStatus.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.PORT_READY_DELAY_SECONDS;
+import static org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.REFRESH_SECONDS;
+
+/** Status of the Flink job deployment. */
+public enum JobDeploymentStatus {

Review comment:
       or better yet JobManager as you initially named :) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] asfgit closed pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #28:
URL: https://github.com/apache/flink-kubernetes-operator/pull/28


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #28:
URL: https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r816481799



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobDeploymentStatus.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.PORT_READY_DELAY_SECONDS;
+import static org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.REFRESH_SECONDS;
+
+/** Status of the Flink job deployment. */
+public enum JobDeploymentStatus {

Review comment:
       I could rename this to cluster deployment status, then it wouldn't be confusing in case of session




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] tweise commented on a change in pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #28:
URL: https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r816376893



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobDeploymentStatus.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.PORT_READY_DELAY_SECONDS;
+import static org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.REFRESH_SECONDS;
+
+/** Status of the Flink job deployment. */
+public enum JobDeploymentStatus {

Review comment:
       nit: if this applies to session mode, then the `Job` part would be misleading. But this can be dealt with later as we will need to iterate on the states more. It's already great to have a starting point!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
gyfora commented on pull request #28:
URL: https://github.com/apache/flink-kubernetes-operator/pull/28#issuecomment-1054608306


   I have addressed the comments @wangyang0918 @tweise , let me know if there is anything else.
   
   I had to improve the waitForClusterShutdown logic to be able to create a safe point where we can delete the configmaps without the jobmanager recreating them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] tweise commented on a change in pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #28:
URL: https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r816455956



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
##########
@@ -108,21 +116,109 @@ private static void mergeInto(JsonNode toNode, JsonNode fromNode) {
         }
     }
 
-    public static void deleteCluster(FlinkDeployment flinkApp, KubernetesClient kubernetesClient) {
+    public static void deleteCluster(
+            FlinkDeployment flinkApp,
+            KubernetesClient kubernetesClient,
+            boolean deleteHaConfigmaps) {
         deleteCluster(
                 flinkApp.getMetadata().getNamespace(),
                 flinkApp.getMetadata().getName(),
-                kubernetesClient);
+                kubernetesClient,
+                deleteHaConfigmaps);
     }
 
+    /**
+     * Delete Flink kubernetes cluster by deleting the kubernetes resources directly. Optionally
+     * allows deleting the native kubernetes HA resources as well.
+     *
+     * @param namespace Namespace where the Flink cluster is deployed
+     * @param clusterId ClusterId of the Flink cluster
+     * @param kubernetesClient Kubernetes client
+     * @param deleteHaConfigmaps Flag to indicate whether k8s HA metadata should be removed as well
+     */
     public static void deleteCluster(
-            String namespace, String clusterId, KubernetesClient kubernetesClient) {
+            String namespace,
+            String clusterId,
+            KubernetesClient kubernetesClient,
+            boolean deleteHaConfigmaps) {
         kubernetesClient
                 .apps()
                 .deployments()
                 .inNamespace(namespace)
-                .withName(clusterId)
+                .withName(KubernetesUtils.getDeploymentName(clusterId))
                 .cascading(true)
                 .delete();
+
+        if (deleteHaConfigmaps) {
+            // We need to wait for cluster shutdown otherwise confimaps might be recreated
+            waitForClusterShutdown(kubernetesClient, namespace, clusterId);
+            kubernetesClient
+                    .configMaps()
+                    .inNamespace(namespace)
+                    .withLabels(
+                            KubernetesUtils.getConfigMapLabels(
+                                    clusterId, LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY))
+                    .delete();
+        }
+    }
+
+    /** We need this due to the buggy flink kube cluster client behaviour for now. */
+    public static void waitForClusterShutdown(
+            KubernetesClient kubernetesClient, String namespace, String clusterId) {
+
+        boolean jobManagerRunning = true;
+        boolean serviceRunning = true;
+
+        for (int i = 0; i < 60; i++) {
+            if (jobManagerRunning) {
+                PodList jmPodList =
+                        kubernetesClient
+                                .pods()
+                                .inNamespace(namespace)
+                                .withLabel(
+                                        Constants.LABEL_TYPE_KEY, Constants.LABEL_TYPE_NATIVE_TYPE)
+                                .withLabel(
+                                        Constants.LABEL_COMPONENT_KEY,
+                                        Constants.LABEL_COMPONENT_JOB_MANAGER)
+                                .withLabel(Constants.LABEL_APP_KEY, clusterId)
+                                .list();
+
+                if (jmPodList.getItems().isEmpty()) {
+                    jobManagerRunning = false;
+                }
+            }
+
+            if (serviceRunning) {
+                Service service =
+                        kubernetesClient
+                                .services()
+                                .inNamespace(namespace)
+                                .withName(
+                                        ExternalServiceDecorator.getExternalServiceName(clusterId))
+                                .fromServer()
+                                .get();
+                if (service == null) {
+                    serviceRunning = false;
+                }
+            }
+
+            if (!jobManagerRunning && !serviceRunning) {
+                break;
+            }
+            LOG.info("Waiting for cluster shutdown... ({})", i);
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }

Review comment:
       It would be good to log that shutdown was completed, otherwise the last thing we see is:
   ```2022-02-28 21:12:45,381 o.a.f.k.o.u.FlinkUtils         [INFO ] [default.basic-example] Waiting for cluster shutdown... (5)
   2022-02-28 21:12:46,395 o.a.f.k.o.u.FlinkUtils         [INFO ] [default.basic-example] Waiting for cluster shutdown... (6)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #28:
URL: https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r815968841



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
##########
@@ -108,21 +111,38 @@ private static void mergeInto(JsonNode toNode, JsonNode fromNode) {
         }
     }
 
-    public static void deleteCluster(FlinkDeployment flinkApp, KubernetesClient kubernetesClient) {
+    public static void deleteCluster(
+            FlinkDeployment flinkApp,
+            KubernetesClient kubernetesClient,
+            boolean deleteHaConfigmaps) {
         deleteCluster(
                 flinkApp.getMetadata().getNamespace(),
                 flinkApp.getMetadata().getName(),
-                kubernetesClient);
+                kubernetesClient,
+                deleteHaConfigmaps);
     }
 
     public static void deleteCluster(
-            String namespace, String clusterId, KubernetesClient kubernetesClient) {
+            String namespace,
+            String clusterId,
+            KubernetesClient kubernetesClient,
+            boolean deleteHaConfigmaps) {
         kubernetesClient
                 .apps()
                 .deployments()
                 .inNamespace(namespace)
                 .withName(clusterId)
                 .cascading(true)
                 .delete();
+
+        if (deleteHaConfigmaps) {

Review comment:
       Make sense to me, I'm OK to merge the current shape with some more comments for clarification.  
   
   But I have sense that we may have hard work to do the nice clean up work for other HA providers, It's a bit of out of scope of the operator responsibility or ability, maybe we should extend at the Flink to support `deleteAndCleanUpHA` ?. Do you have some inputs for this cc @wangyang0918 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #28:
URL: https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r815961573



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
##########
@@ -75,29 +83,45 @@ public boolean removeDeployment(FlinkDeployment flinkApp) {
                                 flinkApp.getMetadata().getName(),
                                 flinkApp.getMetadata().getNamespace());
                         jobManagerDeployments.add(flinkApp.getMetadata().getUid());
-                        if (flinkApp.getStatus().getJobStatus() != null) {
-                            // pre-existing deployments on operator restart - proceed with
-                            // reconciliation
-                            return null;
-                        }
+                        return JobDeploymentStatus.READY;
                     }
                     LOG.info(
                             "JobManager deployment {} in namespace {} port not ready",
                             flinkApp.getMetadata().getName(),
                             flinkApp.getMetadata().getNamespace());
-                    return UpdateControl.updateStatus(flinkApp)
-                            .rescheduleAfter(PORT_READY_DELAY_SECONDS, TimeUnit.SECONDS);
+                    return JobDeploymentStatus.DEPLOYED_NOT_READY;
                 }
                 LOG.info(
                         "JobManager deployment {} in namespace {} not yet ready, status {}",
                         flinkApp.getMetadata().getName(),
                         flinkApp.getMetadata().getNamespace(),
                         status);
-                // TODO: how frequently do we want here
-                return UpdateControl.updateStatus(flinkApp)
-                        .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
+
+                return JobDeploymentStatus.DEPLOYING;
             }
+            return JobDeploymentStatus.MISSING;
+        }
+        return JobDeploymentStatus.READY;
+    }
+
+    public DeleteControl shutdownAndDelete(

Review comment:
       Sure I will add a comment




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
gyfora commented on pull request #28:
URL: https://github.com/apache/flink-kubernetes-operator/pull/28#issuecomment-1054395615


   I discovered that we need to wait for cluster shutdown before deleting the configmaps otherwise they might be recreated. Adding this now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a change in pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on a change in pull request #28:
URL: https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r816390801



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
##########
@@ -108,21 +116,109 @@ private static void mergeInto(JsonNode toNode, JsonNode fromNode) {
         }
     }
 
-    public static void deleteCluster(FlinkDeployment flinkApp, KubernetesClient kubernetesClient) {
+    public static void deleteCluster(
+            FlinkDeployment flinkApp,
+            KubernetesClient kubernetesClient,
+            boolean deleteHaConfigmaps) {
         deleteCluster(
                 flinkApp.getMetadata().getNamespace(),
                 flinkApp.getMetadata().getName(),
-                kubernetesClient);
+                kubernetesClient,
+                deleteHaConfigmaps);
     }
 
+    /**
+     * Delete Flink kubernetes cluster by deleting the kubernetes resources directly. Optionally
+     * allows deleting the native kubernetes HA resources as well.
+     *
+     * @param namespace Namespace where the Flink cluster is deployed
+     * @param clusterId ClusterId of the Flink cluster
+     * @param kubernetesClient Kubernetes client
+     * @param deleteHaConfigmaps Flag to indicate whether k8s HA metadata should be removed as well
+     */
     public static void deleteCluster(
-            String namespace, String clusterId, KubernetesClient kubernetesClient) {
+            String namespace,
+            String clusterId,
+            KubernetesClient kubernetesClient,
+            boolean deleteHaConfigmaps) {
         kubernetesClient
                 .apps()
                 .deployments()
                 .inNamespace(namespace)
-                .withName(clusterId)
+                .withName(KubernetesUtils.getDeploymentName(clusterId))
                 .cascading(true)
                 .delete();
+
+        if (deleteHaConfigmaps) {
+            // We need to wait for cluster shutdown otherwise confimaps might be recreated

Review comment:
       ```suggestion
               // We need to wait for cluster shutdown otherwise configmaps might be recreated
   ```
   
   Typo

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
##########
@@ -108,21 +116,109 @@ private static void mergeInto(JsonNode toNode, JsonNode fromNode) {
         }
     }
 
-    public static void deleteCluster(FlinkDeployment flinkApp, KubernetesClient kubernetesClient) {
+    public static void deleteCluster(
+            FlinkDeployment flinkApp,
+            KubernetesClient kubernetesClient,
+            boolean deleteHaConfigmaps) {
         deleteCluster(
                 flinkApp.getMetadata().getNamespace(),
                 flinkApp.getMetadata().getName(),
-                kubernetesClient);
+                kubernetesClient,
+                deleteHaConfigmaps);
     }
 
+    /**
+     * Delete Flink kubernetes cluster by deleting the kubernetes resources directly. Optionally
+     * allows deleting the native kubernetes HA resources as well.
+     *
+     * @param namespace Namespace where the Flink cluster is deployed
+     * @param clusterId ClusterId of the Flink cluster
+     * @param kubernetesClient Kubernetes client
+     * @param deleteHaConfigmaps Flag to indicate whether k8s HA metadata should be removed as well
+     */
     public static void deleteCluster(
-            String namespace, String clusterId, KubernetesClient kubernetesClient) {
+            String namespace,
+            String clusterId,
+            KubernetesClient kubernetesClient,
+            boolean deleteHaConfigmaps) {
         kubernetesClient
                 .apps()
                 .deployments()
                 .inNamespace(namespace)
-                .withName(clusterId)
+                .withName(KubernetesUtils.getDeploymentName(clusterId))
                 .cascading(true)
                 .delete();
+
+        if (deleteHaConfigmaps) {
+            // We need to wait for cluster shutdown otherwise confimaps might be recreated
+            waitForClusterShutdown(kubernetesClient, namespace, clusterId);
+            kubernetesClient
+                    .configMaps()
+                    .inNamespace(namespace)
+                    .withLabels(
+                            KubernetesUtils.getConfigMapLabels(
+                                    clusterId, LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY))
+                    .delete();
+        }
+    }
+
+    /** We need this due to the buggy flink kube cluster client behaviour for now. */

Review comment:
       I am confusing about this comment. @gyfora Do you know what is the buggy of flink kube cluster client?

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
##########
@@ -108,21 +116,109 @@ private static void mergeInto(JsonNode toNode, JsonNode fromNode) {
         }
     }
 
-    public static void deleteCluster(FlinkDeployment flinkApp, KubernetesClient kubernetesClient) {
+    public static void deleteCluster(
+            FlinkDeployment flinkApp,
+            KubernetesClient kubernetesClient,
+            boolean deleteHaConfigmaps) {
         deleteCluster(
                 flinkApp.getMetadata().getNamespace(),
                 flinkApp.getMetadata().getName(),
-                kubernetesClient);
+                kubernetesClient,
+                deleteHaConfigmaps);
     }
 
+    /**
+     * Delete Flink kubernetes cluster by deleting the kubernetes resources directly. Optionally
+     * allows deleting the native kubernetes HA resources as well.
+     *
+     * @param namespace Namespace where the Flink cluster is deployed
+     * @param clusterId ClusterId of the Flink cluster
+     * @param kubernetesClient Kubernetes client
+     * @param deleteHaConfigmaps Flag to indicate whether k8s HA metadata should be removed as well
+     */
     public static void deleteCluster(
-            String namespace, String clusterId, KubernetesClient kubernetesClient) {
+            String namespace,
+            String clusterId,
+            KubernetesClient kubernetesClient,
+            boolean deleteHaConfigmaps) {
         kubernetesClient
                 .apps()
                 .deployments()
                 .inNamespace(namespace)
-                .withName(clusterId)
+                .withName(KubernetesUtils.getDeploymentName(clusterId))
                 .cascading(true)
                 .delete();
+
+        if (deleteHaConfigmaps) {
+            // We need to wait for cluster shutdown otherwise confimaps might be recreated
+            waitForClusterShutdown(kubernetesClient, namespace, clusterId);
+            kubernetesClient
+                    .configMaps()
+                    .inNamespace(namespace)
+                    .withLabels(
+                            KubernetesUtils.getConfigMapLabels(
+                                    clusterId, LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY))
+                    .delete();
+        }
+    }
+
+    /** We need this due to the buggy flink kube cluster client behaviour for now. */
+    public static void waitForClusterShutdown(
+            KubernetesClient kubernetesClient, String namespace, String clusterId) {
+
+        boolean jobManagerRunning = true;
+        boolean serviceRunning = true;
+
+        for (int i = 0; i < 60; i++) {
+            if (jobManagerRunning) {

Review comment:
       This minor refactor also fixes [FLINK-26344](https://issues.apache.org/jira/browse/FLINK-26344).

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
##########
@@ -108,21 +116,109 @@ private static void mergeInto(JsonNode toNode, JsonNode fromNode) {
         }
     }
 
-    public static void deleteCluster(FlinkDeployment flinkApp, KubernetesClient kubernetesClient) {
+    public static void deleteCluster(
+            FlinkDeployment flinkApp,
+            KubernetesClient kubernetesClient,
+            boolean deleteHaConfigmaps) {
         deleteCluster(
                 flinkApp.getMetadata().getNamespace(),
                 flinkApp.getMetadata().getName(),
-                kubernetesClient);
+                kubernetesClient,
+                deleteHaConfigmaps);
     }
 
+    /**
+     * Delete Flink kubernetes cluster by deleting the kubernetes resources directly. Optionally
+     * allows deleting the native kubernetes HA resources as well.
+     *
+     * @param namespace Namespace where the Flink cluster is deployed
+     * @param clusterId ClusterId of the Flink cluster
+     * @param kubernetesClient Kubernetes client
+     * @param deleteHaConfigmaps Flag to indicate whether k8s HA metadata should be removed as well
+     */
     public static void deleteCluster(
-            String namespace, String clusterId, KubernetesClient kubernetesClient) {
+            String namespace,
+            String clusterId,
+            KubernetesClient kubernetesClient,
+            boolean deleteHaConfigmaps) {
         kubernetesClient
                 .apps()
                 .deployments()
                 .inNamespace(namespace)
-                .withName(clusterId)
+                .withName(KubernetesUtils.getDeploymentName(clusterId))
                 .cascading(true)
                 .delete();
+
+        if (deleteHaConfigmaps) {
+            // We need to wait for cluster shutdown otherwise confimaps might be recreated
+            waitForClusterShutdown(kubernetesClient, namespace, clusterId);
+            kubernetesClient
+                    .configMaps()
+                    .inNamespace(namespace)
+                    .withLabels(
+                            KubernetesUtils.getConfigMapLabels(
+                                    clusterId, LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY))
+                    .delete();
+        }
+    }
+
+    /** We need this due to the buggy flink kube cluster client behaviour for now. */
+    public static void waitForClusterShutdown(
+            KubernetesClient kubernetesClient, String namespace, String clusterId) {
+
+        boolean jobManagerRunning = true;
+        boolean serviceRunning = true;
+
+        for (int i = 0; i < 60; i++) {
+            if (jobManagerRunning) {
+                PodList jmPodList =
+                        kubernetesClient
+                                .pods()
+                                .inNamespace(namespace)
+                                .withLabel(
+                                        Constants.LABEL_TYPE_KEY, Constants.LABEL_TYPE_NATIVE_TYPE)
+                                .withLabel(
+                                        Constants.LABEL_COMPONENT_KEY,
+                                        Constants.LABEL_COMPONENT_JOB_MANAGER)
+                                .withLabel(Constants.LABEL_APP_KEY, clusterId)

Review comment:
       Could be simplified by `KubernetesUtils.getJobManagerSelectors(clusterId)`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #28:
URL: https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r815963282



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobDeploymentStatus.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.PORT_READY_DELAY_SECONDS;
+import static org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.REFRESH_SECONDS;
+
+/** Status of the Flink job deployment. */
+public enum JobDeploymentStatus {
+
+    /** JobManager is running and ready to receive REST API calls. */
+    READY,
+
+    /** JobManager is running but not ready yet to receive REST API calls. */
+    DEPLOYED_NOT_READY,
+
+    /** JobManager process is starting up. */
+    DEPLOYING,
+
+    /** JobManager deployment not found, probably not started or killed by user. */
+    MISSING;
+
+    public UpdateControl<FlinkDeployment> toUpdateControl(FlinkDeployment flinkDeployment) {
+        switch (this) {
+            case DEPLOYING:
+                return UpdateControl.updateStatus(flinkDeployment)
+                        .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
+            case DEPLOYED_NOT_READY:
+                return UpdateControl.updateStatus(flinkDeployment)
+                        .rescheduleAfter(PORT_READY_DELAY_SECONDS, TimeUnit.SECONDS);

Review comment:
       OK, I have created the [ticket](https://issues.apache.org/jira/browse/FLINK-26399) for this 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #28:
URL: https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r815963282



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobDeploymentStatus.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.PORT_READY_DELAY_SECONDS;
+import static org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.REFRESH_SECONDS;
+
+/** Status of the Flink job deployment. */
+public enum JobDeploymentStatus {
+
+    /** JobManager is running and ready to receive REST API calls. */
+    READY,
+
+    /** JobManager is running but not ready yet to receive REST API calls. */
+    DEPLOYED_NOT_READY,
+
+    /** JobManager process is starting up. */
+    DEPLOYING,
+
+    /** JobManager deployment not found, probably not started or killed by user. */
+    MISSING;
+
+    public UpdateControl<FlinkDeployment> toUpdateControl(FlinkDeployment flinkDeployment) {
+        switch (this) {
+            case DEPLOYING:
+                return UpdateControl.updateStatus(flinkDeployment)
+                        .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
+            case DEPLOYED_NOT_READY:
+                return UpdateControl.updateStatus(flinkDeployment)
+                        .rescheduleAfter(PORT_READY_DELAY_SECONDS, TimeUnit.SECONDS);

Review comment:
       I have create the [ticket](https://issues.apache.org/jira/browse/FLINK-26399) for this 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #28:
URL: https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r815887310



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobDeploymentStatus.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.PORT_READY_DELAY_SECONDS;
+import static org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.REFRESH_SECONDS;
+
+/** Status of the Flink job deployment. */
+public enum JobDeploymentStatus {
+
+    /** JobManager is running and ready to receive REST API calls. */
+    READY,
+
+    /** JobManager is running but not ready yet to receive REST API calls. */
+    DEPLOYED_NOT_READY,
+
+    /** JobManager process is starting up. */
+    DEPLOYING,
+
+    /** JobManager deployment not found, probably not started or killed by user. */
+    MISSING;
+
+    public UpdateControl<FlinkDeployment> toUpdateControl(FlinkDeployment flinkDeployment) {
+        switch (this) {
+            case DEPLOYING:
+                return UpdateControl.updateStatus(flinkDeployment)
+                        .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
+            case DEPLOYED_NOT_READY:
+                return UpdateControl.updateStatus(flinkDeployment)
+                        .rescheduleAfter(PORT_READY_DELAY_SECONDS, TimeUnit.SECONDS);

Review comment:
       Sounds good, I was trying to preserve the current behaviour :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #28:
URL: https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r815886981



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
##########
@@ -108,21 +111,38 @@ private static void mergeInto(JsonNode toNode, JsonNode fromNode) {
         }
     }
 
-    public static void deleteCluster(FlinkDeployment flinkApp, KubernetesClient kubernetesClient) {
+    public static void deleteCluster(
+            FlinkDeployment flinkApp,
+            KubernetesClient kubernetesClient,
+            boolean deleteHaConfigmaps) {
         deleteCluster(
                 flinkApp.getMetadata().getNamespace(),
                 flinkApp.getMetadata().getName(),
-                kubernetesClient);
+                kubernetesClient,
+                deleteHaConfigmaps);
     }
 
     public static void deleteCluster(
-            String namespace, String clusterId, KubernetesClient kubernetesClient) {
+            String namespace,
+            String clusterId,
+            KubernetesClient kubernetesClient,
+            boolean deleteHaConfigmaps) {
         kubernetesClient
                 .apps()
                 .deployments()
                 .inNamespace(namespace)
                 .withName(clusterId)
                 .cascading(true)
                 .delete();
+
+        if (deleteHaConfigmaps) {

Review comment:
       You are absolutely correct, this only works for kubernetes HA (using configmaps). 
   
   It's a good idea to add comments to the code and highlight in the README.
   
   In the future we could support other HA providers but that is out of scope in this ticket.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #28:
URL: https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r815701617



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobDeploymentStatus.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.PORT_READY_DELAY_SECONDS;
+import static org.apache.flink.kubernetes.operator.reconciler.BaseReconciler.REFRESH_SECONDS;
+
+/** Status of the Flink job deployment. */
+public enum JobDeploymentStatus {
+
+    /** JobManager is running and ready to receive REST API calls. */
+    READY,
+
+    /** JobManager is running but not ready yet to receive REST API calls. */
+    DEPLOYED_NOT_READY,
+
+    /** JobManager process is starting up. */
+    DEPLOYING,
+
+    /** JobManager deployment not found, probably not started or killed by user. */
+    MISSING;
+
+    public UpdateControl<FlinkDeployment> toUpdateControl(FlinkDeployment flinkDeployment) {
+        switch (this) {
+            case DEPLOYING:
+                return UpdateControl.updateStatus(flinkDeployment)
+                        .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
+            case DEPLOYED_NOT_READY:
+                return UpdateControl.updateStatus(flinkDeployment)
+                        .rescheduleAfter(PORT_READY_DELAY_SECONDS, TimeUnit.SECONDS);

Review comment:
       Maybe make this configurable , I will create a ticket for this.

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
##########
@@ -108,21 +111,38 @@ private static void mergeInto(JsonNode toNode, JsonNode fromNode) {
         }
     }
 
-    public static void deleteCluster(FlinkDeployment flinkApp, KubernetesClient kubernetesClient) {
+    public static void deleteCluster(
+            FlinkDeployment flinkApp,
+            KubernetesClient kubernetesClient,
+            boolean deleteHaConfigmaps) {
         deleteCluster(
                 flinkApp.getMetadata().getNamespace(),
                 flinkApp.getMetadata().getName(),
-                kubernetesClient);
+                kubernetesClient,
+                deleteHaConfigmaps);
     }
 
     public static void deleteCluster(
-            String namespace, String clusterId, KubernetesClient kubernetesClient) {
+            String namespace,
+            String clusterId,
+            KubernetesClient kubernetesClient,
+            boolean deleteHaConfigmaps) {
         kubernetesClient
                 .apps()
                 .deployments()
                 .inNamespace(namespace)
                 .withName(clusterId)
                 .cascading(true)
                 .delete();
+
+        if (deleteHaConfigmaps) {

Review comment:
       This seems only handle the situation for HA based on ConfigMap. Will this also work for the HA based on ZK ?

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
##########
@@ -75,29 +83,45 @@ public boolean removeDeployment(FlinkDeployment flinkApp) {
                                 flinkApp.getMetadata().getName(),
                                 flinkApp.getMetadata().getNamespace());
                         jobManagerDeployments.add(flinkApp.getMetadata().getUid());
-                        if (flinkApp.getStatus().getJobStatus() != null) {
-                            // pre-existing deployments on operator restart - proceed with
-                            // reconciliation
-                            return null;
-                        }
+                        return JobDeploymentStatus.READY;
                     }
                     LOG.info(
                             "JobManager deployment {} in namespace {} port not ready",
                             flinkApp.getMetadata().getName(),
                             flinkApp.getMetadata().getNamespace());
-                    return UpdateControl.updateStatus(flinkApp)
-                            .rescheduleAfter(PORT_READY_DELAY_SECONDS, TimeUnit.SECONDS);
+                    return JobDeploymentStatus.DEPLOYED_NOT_READY;
                 }
                 LOG.info(
                         "JobManager deployment {} in namespace {} not yet ready, status {}",
                         flinkApp.getMetadata().getName(),
                         flinkApp.getMetadata().getNamespace(),
                         status);
-                // TODO: how frequently do we want here
-                return UpdateControl.updateStatus(flinkApp)
-                        .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
+
+                return JobDeploymentStatus.DEPLOYING;
             }
+            return JobDeploymentStatus.MISSING;
+        }
+        return JobDeploymentStatus.READY;
+    }
+
+    public DeleteControl shutdownAndDelete(

Review comment:
       Can we document here to clarify that what is to be deleted ? Like HA data and cluster resource ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #28: [FLINK-26336] Call cancel on deletion & clean up configmaps as well

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #28:
URL: https://github.com/apache/flink-kubernetes-operator/pull/28#discussion_r816481099



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
##########
@@ -108,21 +116,109 @@ private static void mergeInto(JsonNode toNode, JsonNode fromNode) {
         }
     }
 
-    public static void deleteCluster(FlinkDeployment flinkApp, KubernetesClient kubernetesClient) {
+    public static void deleteCluster(
+            FlinkDeployment flinkApp,
+            KubernetesClient kubernetesClient,
+            boolean deleteHaConfigmaps) {
         deleteCluster(
                 flinkApp.getMetadata().getNamespace(),
                 flinkApp.getMetadata().getName(),
-                kubernetesClient);
+                kubernetesClient,
+                deleteHaConfigmaps);
     }
 
+    /**
+     * Delete Flink kubernetes cluster by deleting the kubernetes resources directly. Optionally
+     * allows deleting the native kubernetes HA resources as well.
+     *
+     * @param namespace Namespace where the Flink cluster is deployed
+     * @param clusterId ClusterId of the Flink cluster
+     * @param kubernetesClient Kubernetes client
+     * @param deleteHaConfigmaps Flag to indicate whether k8s HA metadata should be removed as well
+     */
     public static void deleteCluster(
-            String namespace, String clusterId, KubernetesClient kubernetesClient) {
+            String namespace,
+            String clusterId,
+            KubernetesClient kubernetesClient,
+            boolean deleteHaConfigmaps) {
         kubernetesClient
                 .apps()
                 .deployments()
                 .inNamespace(namespace)
-                .withName(clusterId)
+                .withName(KubernetesUtils.getDeploymentName(clusterId))
                 .cascading(true)
                 .delete();
+
+        if (deleteHaConfigmaps) {
+            // We need to wait for cluster shutdown otherwise confimaps might be recreated
+            waitForClusterShutdown(kubernetesClient, namespace, clusterId);
+            kubernetesClient
+                    .configMaps()
+                    .inNamespace(namespace)
+                    .withLabels(
+                            KubernetesUtils.getConfigMapLabels(
+                                    clusterId, LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY))
+                    .delete();
+        }
+    }
+
+    /** We need this due to the buggy flink kube cluster client behaviour for now. */

Review comment:
       what I meant originally by this, is that the deployment client wont let you submit a new flink job as long as there is still a service around even if marked for deletion. Maybe this is not even a bug, but in any case now the comment is irrelevant with the canged behaviour, will remove it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org