You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/06/04 17:22:51 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-27889] Fix cleanup when lastReconciledSpec is null

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new cd5107d  [FLINK-27889] Fix cleanup when lastReconciledSpec is null
cd5107d is described below

commit cd5107d7ace9d2896efde91ac0ca3c7a9226f3bc
Author: Hector Miuler Malpica Gallegos <mi...@gmail.com>
AuthorDate: Fri Jun 3 21:06:11 2022 -0500

    [FLINK-27889] Fix cleanup when lastReconciledSpec is null
---
 .../operator/reconciler/deployment/ApplicationReconciler.java      | 7 ++++++-
 .../operator/controller/FlinkDeploymentControllerTest.java         | 7 +++++++
 2 files changed, 13 insertions(+), 1 deletion(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index 18551fa..5c8030b 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -322,6 +322,11 @@ public class ApplicationReconciler extends AbstractDeploymentReconciler {
     @Override
     @SneakyThrows
     protected void shutdown(FlinkDeployment flinkApp) {
-        flinkService.cancelJob(flinkApp, UpgradeMode.STATELESS);
+        var status = flinkApp.getStatus();
+        if (status.getReconciliationStatus().getLastReconciledSpec() == null) {
+            flinkService.deleteClusterDeployment(flinkApp.getMetadata(), status, true);
+        } else {
+            flinkService.cancelJob(flinkApp, UpgradeMode.STATELESS);
+        }
     }
 }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index cab534a..8a76aa1 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -680,6 +680,13 @@ public class FlinkDeploymentControllerTest {
                 appCluster.getStatus().getReconciliationStatus().getLastStableSpec());
     }
 
+    @Test
+    public void cleanUpNewDeployment() {
+        FlinkDeployment flinkDeployment = TestUtils.buildApplicationCluster();
+        var deleteControl = testController.cleanup(flinkDeployment, context);
+        assertNotNull(deleteControl);
+    }
+
     private static Stream<Arguments> applicationTestParams() {
         List<Arguments> args = new ArrayList<>();
         for (FlinkVersion version : FlinkVersion.values()) {