You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/09/26 08:32:59 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-29159] Harden initial deployment logic

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new d2161ac9 [FLINK-29159] Harden initial deployment logic
d2161ac9 is described below

commit d2161ac99e02af322b38b961b6df2660de812c59
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Sat Sep 24 17:39:46 2022 +0200

    [FLINK-29159] Harden initial deployment logic
---
 .../operator/crd/status/CommonStatus.java           |  2 +-
 .../operator/crd/status/ReconciliationStatus.java   |  2 +-
 .../deployment/AbstractDeploymentObserver.java      |  2 +-
 .../observer/sessionjob/SessionJobObserver.java     |  2 +-
 .../operator/reconciler/ReconciliationMetadata.java | 11 ++++++++++-
 .../operator/reconciler/ReconciliationUtils.java    |  8 ++++----
 .../deployment/AbstractFlinkResourceReconciler.java |  2 +-
 .../deployment/ApplicationReconciler.java           | 15 +++------------
 .../operator/validation/DefaultValidator.java       |  4 ++--
 .../deployment/ApplicationObserverTest.java         | 11 +++++++++--
 .../observer/sessionjob/SessionJobObserverTest.java |  4 ++--
 .../deployment/ApplicationReconcilerTest.java       | 21 +++++++++++++++++++++
 12 files changed, 56 insertions(+), 28 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/CommonStatus.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/CommonStatus.java
index 44c9ebdc..b91efcf3 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/CommonStatus.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/CommonStatus.java
@@ -54,7 +54,7 @@ public abstract class CommonStatus<SPEC extends AbstractFlinkSpec> {
     public ResourceLifecycleState getLifecycleState() {
         var reconciliationStatus = getReconciliationStatus();
 
-        if (reconciliationStatus.isFirstDeployment()) {
+        if (reconciliationStatus.isBeforeFirstDeployment()) {
             return StringUtils.isEmpty(error)
                     ? ResourceLifecycleState.CREATED
                     : ResourceLifecycleState.FAILED;
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/ReconciliationStatus.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/ReconciliationStatus.java
index fb16c50d..0e18d998 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/ReconciliationStatus.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/ReconciliationStatus.java
@@ -98,7 +98,7 @@ public abstract class ReconciliationStatus<SPEC extends AbstractFlinkSpec> {
     }
 
     @JsonIgnore
-    public boolean isFirstDeployment() {
+    public boolean isBeforeFirstDeployment() {
         return lastReconciledSpec == null;
     }
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
index 3a41a1c3..e828a8ab 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
@@ -76,7 +76,7 @@ public abstract class AbstractDeploymentObserver implements Observer<FlinkDeploy
         var reconciliationStatus = status.getReconciliationStatus();
 
         // Nothing has been launched so skip observing
-        if (reconciliationStatus.isFirstDeployment()
+        if (reconciliationStatus.isBeforeFirstDeployment()
                 || reconciliationStatus.getState() == ReconciliationState.ROLLING_BACK) {
             return;
         }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
index 6558dd19..760bd327 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
@@ -98,7 +98,7 @@ public class SessionJobObserver implements Observer<FlinkSessionJob> {
 
     @Override
     public void observe(FlinkSessionJob flinkSessionJob, Context<?> context) {
-        if (flinkSessionJob.getStatus().getReconciliationStatus().isFirstDeployment()) {
+        if (flinkSessionJob.getStatus().getReconciliationStatus().isBeforeFirstDeployment()) {
             return;
         }
 
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationMetadata.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationMetadata.java
index a20f56f5..dce62ac8 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationMetadata.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationMetadata.java
@@ -18,6 +18,7 @@
 package org.apache.flink.kubernetes.operator.reconciler;
 
 import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
 
 import com.fasterxml.jackson.annotation.JsonInclude;
 import io.fabric8.kubernetes.api.model.ObjectMeta;
@@ -42,8 +43,16 @@ public class ReconciliationMetadata {
         ObjectMeta metadata = new ObjectMeta();
         metadata.setGeneration(resource.getMetadata().getGeneration());
 
-        var firstDeploy = resource.getStatus().getReconciliationStatus().isFirstDeployment();
+        var firstDeploy =
+                resource.getStatus().getReconciliationStatus().isBeforeFirstDeployment()
+                        || isFirstDeployment(resource);
 
         return new ReconciliationMetadata(resource.getApiVersion(), metadata, firstDeploy);
     }
+
+    private static boolean isFirstDeployment(AbstractFlinkResource<?, ?> resource) {
+        var reconStatus = resource.getStatus().getReconciliationStatus();
+        return reconStatus.getState() == ReconciliationState.DEPLOYED
+                && reconStatus.deserializeLastReconciledSpecWithMeta().f1.firstDeployment;
+    }
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index 389e59d6..cec8d1c2 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -107,6 +107,9 @@ public class ReconciliationUtils {
 
         // Clear errors
         status.setError("");
+        reconciliationStatus.setReconciliationTimestamp(System.currentTimeMillis());
+        reconciliationStatus.setState(
+                upgrading ? ReconciliationState.UPGRADING : ReconciliationState.DEPLOYED);
 
         if (spec.getJob() != null) {
             // For jobs we have to adjust the reconciled spec
@@ -135,10 +138,6 @@ public class ReconciliationUtils {
         } else {
             reconciliationStatus.serializeAndSetLastReconciledSpec(spec, target);
         }
-
-        reconciliationStatus.setReconciliationTimestamp(System.currentTimeMillis());
-        reconciliationStatus.setState(
-                upgrading ? ReconciliationState.UPGRADING : ReconciliationState.DEPLOYED);
     }
 
     public static <SPEC extends AbstractFlinkSpec> void updateLastReconciledSavepointTriggerNonce(
@@ -451,6 +450,7 @@ public class ReconciliationUtils {
 
         if (lastSpecWithMeta.f1.isFirstDeployment()) {
             reconStatus.setLastReconciledSpec(null);
+            reconStatus.setState(ReconciliationState.UPGRADING);
         }
     }
 
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index 436f96ec..859d32a6 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -99,7 +99,7 @@ public abstract class AbstractFlinkResourceReconciler<
 
         // If this is the first deployment for the resource we simply submit the job and return.
         // No further logic is required at this point.
-        if (reconciliationStatus.isFirstDeployment()) {
+        if (reconciliationStatus.isBeforeFirstDeployment()) {
             LOG.info("Deploying for the first time");
 
             // Before we try to submit the job we record the current spec in the status so we can
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index 43520df3..a4384edd 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -27,7 +27,6 @@ import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
-import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
 import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
@@ -135,16 +134,8 @@ public class ApplicationReconciler
                 deployment.getMetadata(), deployment.getStatus(), false);
         flinkService.waitForClusterShutdown(deployConfig);
         if (!flinkService.isHaMetadataAvailable(deployConfig)) {
-            LOG.info(
-                    "Job never entered stable state. Clearing previous spec to reset for initial deploy");
-            // TODO: lastSpecWithMeta.f1.isFirstDeployment() is false
-            // ReconciliationUtils.clearLastReconciledSpecIfFirstDeploy(deployment);
-            deployment.getStatus().getReconciliationStatus().setLastReconciledSpec(null);
-            // UPGRADING triggers immediate reconciliation
-            deployment
-                    .getStatus()
-                    .getReconciliationStatus()
-                    .setState(ReconciliationState.UPGRADING);
+            LOG.info("Job never entered stable state. Resetting status for initial deploy");
+            ReconciliationUtils.clearLastReconciledSpecIfFirstDeploy(deployment);
             return Optional.empty();
         } else {
             // proceed with upgrade if deployment succeeded between check and delete
@@ -250,7 +241,7 @@ public class ApplicationReconciler
     @SneakyThrows
     protected DeleteControl cleanupInternal(FlinkDeployment deployment, Context<?> context) {
         var status = deployment.getStatus();
-        if (status.getReconciliationStatus().isFirstDeployment()) {
+        if (status.getReconciliationStatus().isBeforeFirstDeployment()) {
             flinkService.deleteClusterDeployment(deployment.getMetadata(), status, true);
         } else {
             flinkService.cancelJob(
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
index 5486c7d6..03604b59 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
@@ -285,7 +285,7 @@ public class DefaultValidator implements FlinkResourceValidator {
             FlinkDeployment deployment, Map<String, String> effectiveConfig) {
         FlinkDeploymentSpec newSpec = deployment.getSpec();
 
-        if (deployment.getStatus().getReconciliationStatus().isFirstDeployment()) {
+        if (deployment.getStatus().getReconciliationStatus().isBeforeFirstDeployment()) {
             if (newSpec.getJob() != null && !newSpec.getJob().getState().equals(JobState.RUNNING)) {
                 return Optional.of("Job must start in running state");
             }
@@ -417,7 +417,7 @@ public class DefaultValidator implements FlinkResourceValidator {
     private Optional<String> validateSpecChange(FlinkSessionJob sessionJob) {
         FlinkSessionJobSpec newSpec = sessionJob.getSpec();
 
-        if (sessionJob.getStatus().getReconciliationStatus().isFirstDeployment()) {
+        if (sessionJob.getStatus().getReconciliationStatus().isBeforeFirstDeployment()) {
             // New job
             if (newSpec.getJob() != null && !newSpec.getJob().getState().equals(JobState.RUNNING)) {
                 return Optional.of("Job must start in running state");
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
index cbe59f85..c4575aa9 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
@@ -477,6 +477,10 @@ public class ApplicationObserverTest {
         var reconStatus = status.getReconciliationStatus();
 
         // New deployment
+        ReconciliationUtils.updateStatusBeforeDeploymentAttempt(
+                deployment,
+                new FlinkConfigManager(new Configuration())
+                        .getDeployConfig(deployment.getMetadata(), deployment.getSpec()));
         ReconciliationUtils.updateStatusForDeployedSpec(deployment, new Configuration());
 
         // Test regular upgrades
@@ -528,9 +532,12 @@ public class ApplicationObserverTest {
                 deployment,
                 new FlinkConfigManager(new Configuration())
                         .getDeployConfig(deployment.getMetadata(), deployment.getSpec()));
+        var reconStatus = deployment.getStatus().getReconciliationStatus();
+
+        assertTrue(reconStatus.deserializeLastReconciledSpecWithMeta().f1.isFirstDeployment());
+        assertFalse(reconStatus.isBeforeFirstDeployment());
 
-        assertFalse(deployment.getStatus().getReconciliationStatus().isFirstDeployment());
         observer.observe(deployment, TestUtils.createEmptyContext());
-        assertTrue(deployment.getStatus().getReconciliationStatus().isFirstDeployment());
+        assertTrue(reconStatus.isBeforeFirstDeployment());
     }
 }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
index 05ad76f3..092fe899 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
@@ -352,8 +352,8 @@ public class SessionJobObserverTest {
 
         ReconciliationUtils.updateStatusBeforeDeploymentAttempt(sessionJob, new Configuration());
 
-        assertFalse(sessionJob.getStatus().getReconciliationStatus().isFirstDeployment());
+        assertFalse(sessionJob.getStatus().getReconciliationStatus().isBeforeFirstDeployment());
         observer.observe(sessionJob, TestUtils.createContextWithReadyFlinkDeployment());
-        assertTrue(sessionJob.getStatus().getReconciliationStatus().isFirstDeployment());
+        assertTrue(sessionJob.getStatus().getReconciliationStatus().isBeforeFirstDeployment());
     }
 }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index b102cdb1..fce09a14 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -102,16 +102,37 @@ public class ApplicationReconcilerTest {
         reconciler.reconcile(deployment, context);
         var runningJobs = flinkService.listJobs();
         verifyAndSetRunningJobsToStatus(deployment, runningJobs);
+        assertTrue(
+                deployment
+                        .getStatus()
+                        .getReconciliationStatus()
+                        .deserializeLastReconciledSpecWithMeta()
+                        .f1
+                        .isFirstDeployment());
 
         // Test stateless upgrade
         FlinkDeployment statelessUpgrade = ReconciliationUtils.clone(deployment);
         statelessUpgrade.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS);
         statelessUpgrade.getSpec().getFlinkConfiguration().put("new", "conf");
         reconciler.reconcile(statelessUpgrade, context);
+        assertFalse(
+                statelessUpgrade
+                        .getStatus()
+                        .getReconciliationStatus()
+                        .deserializeLastReconciledSpecWithMeta()
+                        .f1
+                        .isFirstDeployment());
 
         assertEquals(0, flinkService.getRunningCount());
 
         reconciler.reconcile(statelessUpgrade, context);
+        assertFalse(
+                statelessUpgrade
+                        .getStatus()
+                        .getReconciliationStatus()
+                        .deserializeLastReconciledSpecWithMeta()
+                        .f1
+                        .isFirstDeployment());
 
         runningJobs = flinkService.listJobs();
         assertEquals(1, flinkService.getRunningCount());