You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/08/09 09:48:45 UTC

[flink-kubernetes-operator] branch release-1.1 updated: [FLINK-28845] Do not ignore initialSavepointPath if first deploy fails completely

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch release-1.1
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/release-1.1 by this push:
     new d0c9f6ba [FLINK-28845] Do not ignore initialSavepointPath if first deploy fails completely
d0c9f6ba is described below

commit d0c9f6ba714d5265ec55154213f3b2c383831cdc
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Tue Aug 9 08:51:14 2022 +0200

    [FLINK-28845] Do not ignore initialSavepointPath if first deploy fails completely
---
 .../operator/crd/status/ReconciliationStatus.java  |  6 +--
 .../deployment/AbstractDeploymentObserver.java     |  7 ++--
 .../observer/sessionjob/SessionJobObserver.java    |  1 +
 .../reconciler/ReconciliationMetadata.java         | 49 ++++++++++++++++++++++
 .../operator/reconciler/ReconciliationUtils.java   | 47 ++++++++++++++-------
 .../controller/FlinkDeploymentControllerTest.java  | 16 +++++++
 .../deployment/ApplicationObserverTest.java        | 17 +++++++-
 .../observer/deployment/SessionObserverTest.java   |  2 +-
 .../sessionjob/SessionJobObserverTest.java         | 16 +++++++
 9 files changed, 137 insertions(+), 24 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/ReconciliationStatus.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/ReconciliationStatus.java
index b867887f..380971b7 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/ReconciliationStatus.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/ReconciliationStatus.java
@@ -21,10 +21,10 @@ import org.apache.flink.annotation.Experimental;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationMetadata;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
@@ -68,12 +68,12 @@ public abstract class ReconciliationStatus<SPEC extends AbstractFlinkSpec> {
     }
 
     @JsonIgnore
-    public Tuple2<SPEC, ObjectNode> deserializeLastReconciledSpecWithMeta() {
+    public Tuple2<SPEC, ReconciliationMetadata> deserializeLastReconciledSpecWithMeta() {
         return ReconciliationUtils.deserializeSpecWithMeta(lastReconciledSpec, getSpecClass());
     }
 
     @JsonIgnore
-    public Tuple2<SPEC, ObjectNode> deserializeLastStableSpecWithMeta() {
+    public Tuple2<SPEC, ReconciliationMetadata> deserializeLastStableSpecWithMeta() {
         return ReconciliationUtils.deserializeSpecWithMeta(lastStableSpec, getSpecClass());
     }
 
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
index e3b5ebd6..eb046505 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
@@ -77,13 +77,15 @@ public abstract class AbstractDeploymentObserver implements Observer<FlinkDeploy
         var reconciliationStatus = status.getReconciliationStatus();
 
         // Nothing has been launched so skip observing
-        if (reconciliationStatus.getState() == ReconciliationState.ROLLING_BACK) {
+        if (reconciliationStatus.isFirstDeployment()
+                || reconciliationStatus.getState() == ReconciliationState.ROLLING_BACK) {
             return;
         }
 
         if (reconciliationStatus.getState() == ReconciliationState.UPGRADING) {
             checkIfAlreadyUpgraded(flinkApp, context);
             if (reconciliationStatus.getState() == ReconciliationState.UPGRADING) {
+                ReconciliationUtils.clearLastReconciledSpecIfFirstDeploy(flinkApp);
                 return;
             }
         }
@@ -263,9 +265,6 @@ public abstract class AbstractDeploymentObserver implements Observer<FlinkDeploy
      */
     private void checkIfAlreadyUpgraded(FlinkDeployment flinkDep, Context context) {
         var status = flinkDep.getStatus();
-        if (status.getReconciliationStatus().isFirstDeployment()) {
-            return;
-        }
         Optional<Deployment> depOpt = context.getSecondaryResource(Deployment.class);
         depOpt.ifPresent(
                 deployment -> {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
index 5ba397c4..e3c57c4f 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
@@ -118,6 +118,7 @@ public class SessionJobObserver implements Observer<FlinkSessionJob> {
         if (reconciliationStatus.getState() == ReconciliationState.UPGRADING) {
             checkIfAlreadyUpgraded(flinkSessionJob, deployedConfig);
             if (reconciliationStatus.getState() == ReconciliationState.UPGRADING) {
+                ReconciliationUtils.clearLastReconciledSpecIfFirstDeploy(flinkSessionJob);
                 return;
             }
         }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationMetadata.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationMetadata.java
new file mode 100644
index 00000000..a20f56f5
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationMetadata.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/** Extra metadata to be attached to the reconciled spec. */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class ReconciliationMetadata {
+
+    private String apiVersion;
+
+    private ObjectMeta metadata;
+
+    private boolean firstDeployment;
+
+    public static ReconciliationMetadata from(AbstractFlinkResource<?, ?> resource) {
+        ObjectMeta metadata = new ObjectMeta();
+        metadata.setGeneration(resource.getMetadata().getGeneration());
+
+        var firstDeploy = resource.getStatus().getReconciliationStatus().isFirstDeployment();
+
+        return new ReconciliationMetadata(resource.getApiVersion(), metadata, firstDeploy);
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index 79ee2dad..12dc633b 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -268,8 +268,9 @@ public class ReconciliationUtils {
      * @return Tuple2 of spec and meta.
      * @param <T> Spec type.
      */
-    public static <T extends AbstractFlinkSpec> Tuple2<T, ObjectNode> deserializeSpecWithMeta(
-            @Nullable String specWithMetaString, Class<T> specClass) {
+    public static <T extends AbstractFlinkSpec>
+            Tuple2<T, ReconciliationMetadata> deserializeSpecWithMeta(
+                    @Nullable String specWithMetaString, Class<T> specClass) {
         if (specWithMetaString == null) {
             return null;
         }
@@ -284,7 +285,8 @@ public class ReconciliationUtils {
                 return Tuple2.of(objectMapper.treeToValue(wrapper, specClass), null);
             } else {
                 return Tuple2.of(
-                        objectMapper.treeToValue(wrapper.get("spec"), specClass), internalMeta);
+                        objectMapper.treeToValue(wrapper.get("spec"), specClass),
+                        objectMapper.convertValue(internalMeta, ReconciliationMetadata.class));
             }
         } catch (JsonProcessingException e) {
             throw new RuntimeException("Could not deserialize spec, this indicates a bug...", e);
@@ -300,29 +302,25 @@ public class ReconciliationUtils {
      */
     public static String writeSpecWithMeta(
             AbstractFlinkSpec spec, AbstractFlinkResource<?, ?> relatedResource) {
-
-        ObjectNode internalMeta = objectMapper.createObjectNode();
-
-        internalMeta.put("apiVersion", relatedResource.getApiVersion());
-        ObjectNode metadata = internalMeta.putObject("metadata");
-        metadata.put("generation", relatedResource.getMetadata().getGeneration());
-
-        return writeSpecWithMeta(spec, internalMeta);
+        return writeSpecWithMeta(spec, ReconciliationMetadata.from(relatedResource));
     }
 
     /**
      * Serializes the spec and custom meta information into a JSON string.
      *
      * @param spec Flink resource spec.
-     * @param meta Custom meta object.
+     * @param metadata Reconciliation meta object.
      * @return Serialized json.
      */
-    public static String writeSpecWithMeta(AbstractFlinkSpec spec, ObjectNode meta) {
+    public static String writeSpecWithMeta(
+            AbstractFlinkSpec spec, ReconciliationMetadata metadata) {
 
         ObjectNode wrapper = objectMapper.createObjectNode();
 
         wrapper.set("spec", objectMapper.valueToTree(Preconditions.checkNotNull(spec)));
-        wrapper.set(INTERNAL_METADATA_JSON_KEY, meta);
+        wrapper.set(
+                INTERNAL_METADATA_JSON_KEY,
+                objectMapper.valueToTree(Preconditions.checkNotNull(metadata)));
 
         try {
             return objectMapper.writeValueAsString(wrapper);
@@ -435,7 +433,26 @@ public class ReconciliationUtils {
             return -1L;
         }
 
-        return lastSpecWithMeta.f1.get("metadata").get("generation").asLong(-1L);
+        return lastSpecWithMeta.f1.getMetadata().getGeneration();
+    }
+
+    /**
+     * Clear last reconciled spec if that corresponds to the first deployment. This is important in
+     * cases where the first deployment fails.
+     *
+     * @param resource Flink resource.
+     */
+    public static void clearLastReconciledSpecIfFirstDeploy(AbstractFlinkResource<?, ?> resource) {
+        var reconStatus = resource.getStatus().getReconciliationStatus();
+        var lastSpecWithMeta = reconStatus.deserializeLastReconciledSpecWithMeta();
+
+        if (lastSpecWithMeta.f1 == null) {
+            return;
+        }
+
+        if (lastSpecWithMeta.f1.isFirstDeployment()) {
+            reconStatus.setLastReconciledSpec(null);
+        }
     }
 
     /**
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 866ecbd5..d35efb7d 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -70,6 +70,7 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 import static org.junit.jupiter.params.provider.Arguments.arguments;
 
 /** {@link FlinkDeploymentController} tests. */
@@ -942,4 +943,19 @@ public class FlinkDeploymentControllerTest {
         }
         return args.stream();
     }
+
+    @Test
+    public void testInitialSavepointOnError() throws Exception {
+        FlinkDeployment flinkDeployment = TestUtils.buildApplicationCluster();
+        flinkDeployment.getSpec().getJob().setInitialSavepointPath("msp");
+        flinkService.setDeployFailure(true);
+        try {
+            testController.reconcile(flinkDeployment, context);
+            fail();
+        } catch (Exception expected) {
+        }
+        flinkService.setDeployFailure(false);
+        testController.reconcile(flinkDeployment, context);
+        assertEquals("msp", flinkService.listJobs().get(0).f0);
+    }
 }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
index f3f93de6..b5d849bb 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
@@ -490,8 +490,23 @@ public class ApplicationObserverTest {
                 status.getJobManagerDeploymentStatus());
 
         var specWithMeta = status.getReconciliationStatus().deserializeLastReconciledSpecWithMeta();
-        assertEquals(321L, specWithMeta.f1.get("metadata").get("generation").asLong());
+        assertEquals(321L, specWithMeta.f1.getMetadata().getGeneration());
         assertEquals(JobState.RUNNING, specWithMeta.f0.getJob().getState());
         assertEquals(5, specWithMeta.f0.getJob().getParallelism());
     }
+
+    @Test
+    public void validateLastReconciledClearedOnInitialFailure() {
+        FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+        deployment.getMetadata().setGeneration(123L);
+
+        ReconciliationUtils.updateStatusBeforeDeploymentAttempt(
+                deployment,
+                new FlinkConfigManager(new Configuration())
+                        .getDeployConfig(deployment.getMetadata(), deployment.getSpec()));
+
+        assertFalse(deployment.getStatus().getReconciliationStatus().isFirstDeployment());
+        observer.observe(deployment, TestUtils.createEmptyContext());
+        assertTrue(deployment.getStatus().getReconciliationStatus().isFirstDeployment());
+    }
 }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserverTest.java
index a9e06049..2a048ef3 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserverTest.java
@@ -148,7 +148,7 @@ public class SessionObserverTest {
                 status.getJobManagerDeploymentStatus());
 
         var specWithMeta = status.getReconciliationStatus().deserializeLastReconciledSpecWithMeta();
-        assertEquals(321L, specWithMeta.f1.get("metadata").get("generation").asLong());
+        assertEquals(321L, specWithMeta.f1.getMetadata().getGeneration());
         assertEquals("1", specWithMeta.f0.getFlinkConfiguration().get("k"));
     }
 }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
index e7270bd7..f7423c84 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
 import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
@@ -43,6 +44,9 @@ import org.junit.jupiter.api.Test;
 
 import java.util.Map;
 
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
 /** Tests for {@link SessionJobObserver}. */
 @EnableKubernetesMockClient(crud = true)
 public class SessionJobObserverTest {
@@ -341,4 +345,16 @@ public class SessionJobObserverTest {
         Assertions.assertTrue(
                 exception.getMessage().contains("doesn't match upgrade target generation"));
     }
+
+    @Test
+    public void validateLastReconciledClearedOnInitialFailure() {
+        var sessionJob = TestUtils.buildSessionJob();
+        sessionJob.getMetadata().setGeneration(123L);
+
+        ReconciliationUtils.updateStatusBeforeDeploymentAttempt(sessionJob, new Configuration());
+
+        assertFalse(sessionJob.getStatus().getReconciliationStatus().isFirstDeployment());
+        observer.observe(sessionJob, TestUtils.createContextWithReadyFlinkDeployment());
+        assertTrue(sessionJob.getStatus().getReconciliationStatus().isFirstDeployment());
+    }
 }