You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/08/09 09:48:45 UTC
[flink-kubernetes-operator] branch release-1.1 updated: [FLINK-28845] Do not ignore initialSavepointPath if first deploy fails completely
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch release-1.1
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/release-1.1 by this push:
new d0c9f6ba [FLINK-28845] Do not ignore initialSavepointPath if first deploy fails completely
d0c9f6ba is described below
commit d0c9f6ba714d5265ec55154213f3b2c383831cdc
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Tue Aug 9 08:51:14 2022 +0200
[FLINK-28845] Do not ignore initialSavepointPath if first deploy fails completely
---
.../operator/crd/status/ReconciliationStatus.java | 6 +--
.../deployment/AbstractDeploymentObserver.java | 7 ++--
.../observer/sessionjob/SessionJobObserver.java | 1 +
.../reconciler/ReconciliationMetadata.java | 49 ++++++++++++++++++++++
.../operator/reconciler/ReconciliationUtils.java | 47 ++++++++++++++-------
.../controller/FlinkDeploymentControllerTest.java | 16 +++++++
.../deployment/ApplicationObserverTest.java | 17 +++++++-
.../observer/deployment/SessionObserverTest.java | 2 +-
.../sessionjob/SessionJobObserverTest.java | 16 +++++++
9 files changed, 137 insertions(+), 24 deletions(-)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/ReconciliationStatus.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/ReconciliationStatus.java
index b867887f..380971b7 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/ReconciliationStatus.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/ReconciliationStatus.java
@@ -21,10 +21,10 @@ import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationMetadata;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.Data;
import lombok.NoArgsConstructor;
@@ -68,12 +68,12 @@ public abstract class ReconciliationStatus<SPEC extends AbstractFlinkSpec> {
}
@JsonIgnore
- public Tuple2<SPEC, ObjectNode> deserializeLastReconciledSpecWithMeta() {
+ public Tuple2<SPEC, ReconciliationMetadata> deserializeLastReconciledSpecWithMeta() {
return ReconciliationUtils.deserializeSpecWithMeta(lastReconciledSpec, getSpecClass());
}
@JsonIgnore
- public Tuple2<SPEC, ObjectNode> deserializeLastStableSpecWithMeta() {
+ public Tuple2<SPEC, ReconciliationMetadata> deserializeLastStableSpecWithMeta() {
return ReconciliationUtils.deserializeSpecWithMeta(lastStableSpec, getSpecClass());
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
index e3b5ebd6..eb046505 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
@@ -77,13 +77,15 @@ public abstract class AbstractDeploymentObserver implements Observer<FlinkDeploy
var reconciliationStatus = status.getReconciliationStatus();
// Nothing has been launched so skip observing
- if (reconciliationStatus.getState() == ReconciliationState.ROLLING_BACK) {
+ if (reconciliationStatus.isFirstDeployment()
+ || reconciliationStatus.getState() == ReconciliationState.ROLLING_BACK) {
return;
}
if (reconciliationStatus.getState() == ReconciliationState.UPGRADING) {
checkIfAlreadyUpgraded(flinkApp, context);
if (reconciliationStatus.getState() == ReconciliationState.UPGRADING) {
+ ReconciliationUtils.clearLastReconciledSpecIfFirstDeploy(flinkApp);
return;
}
}
@@ -263,9 +265,6 @@ public abstract class AbstractDeploymentObserver implements Observer<FlinkDeploy
*/
private void checkIfAlreadyUpgraded(FlinkDeployment flinkDep, Context context) {
var status = flinkDep.getStatus();
- if (status.getReconciliationStatus().isFirstDeployment()) {
- return;
- }
Optional<Deployment> depOpt = context.getSecondaryResource(Deployment.class);
depOpt.ifPresent(
deployment -> {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
index 5ba397c4..e3c57c4f 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
@@ -118,6 +118,7 @@ public class SessionJobObserver implements Observer<FlinkSessionJob> {
if (reconciliationStatus.getState() == ReconciliationState.UPGRADING) {
checkIfAlreadyUpgraded(flinkSessionJob, deployedConfig);
if (reconciliationStatus.getState() == ReconciliationState.UPGRADING) {
+ ReconciliationUtils.clearLastReconciledSpecIfFirstDeploy(flinkSessionJob);
return;
}
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationMetadata.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationMetadata.java
new file mode 100644
index 00000000..a20f56f5
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationMetadata.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/** Extra metadata to be attached to the reconciled spec. */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class ReconciliationMetadata {
+
+ private String apiVersion;
+
+ private ObjectMeta metadata;
+
+ private boolean firstDeployment;
+
+ public static ReconciliationMetadata from(AbstractFlinkResource<?, ?> resource) {
+ ObjectMeta metadata = new ObjectMeta();
+ metadata.setGeneration(resource.getMetadata().getGeneration());
+
+ var firstDeploy = resource.getStatus().getReconciliationStatus().isFirstDeployment();
+
+ return new ReconciliationMetadata(resource.getApiVersion(), metadata, firstDeploy);
+ }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index 79ee2dad..12dc633b 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -268,8 +268,9 @@ public class ReconciliationUtils {
* @return Tuple2 of spec and meta.
* @param <T> Spec type.
*/
- public static <T extends AbstractFlinkSpec> Tuple2<T, ObjectNode> deserializeSpecWithMeta(
- @Nullable String specWithMetaString, Class<T> specClass) {
+ public static <T extends AbstractFlinkSpec>
+ Tuple2<T, ReconciliationMetadata> deserializeSpecWithMeta(
+ @Nullable String specWithMetaString, Class<T> specClass) {
if (specWithMetaString == null) {
return null;
}
@@ -284,7 +285,8 @@ public class ReconciliationUtils {
return Tuple2.of(objectMapper.treeToValue(wrapper, specClass), null);
} else {
return Tuple2.of(
- objectMapper.treeToValue(wrapper.get("spec"), specClass), internalMeta);
+ objectMapper.treeToValue(wrapper.get("spec"), specClass),
+ objectMapper.convertValue(internalMeta, ReconciliationMetadata.class));
}
} catch (JsonProcessingException e) {
throw new RuntimeException("Could not deserialize spec, this indicates a bug...", e);
@@ -300,29 +302,25 @@ public class ReconciliationUtils {
*/
public static String writeSpecWithMeta(
AbstractFlinkSpec spec, AbstractFlinkResource<?, ?> relatedResource) {
-
- ObjectNode internalMeta = objectMapper.createObjectNode();
-
- internalMeta.put("apiVersion", relatedResource.getApiVersion());
- ObjectNode metadata = internalMeta.putObject("metadata");
- metadata.put("generation", relatedResource.getMetadata().getGeneration());
-
- return writeSpecWithMeta(spec, internalMeta);
+ return writeSpecWithMeta(spec, ReconciliationMetadata.from(relatedResource));
}
/**
* Serializes the spec and custom meta information into a JSON string.
*
* @param spec Flink resource spec.
- * @param meta Custom meta object.
+ * @param metadata Reconciliation meta object.
* @return Serialized json.
*/
- public static String writeSpecWithMeta(AbstractFlinkSpec spec, ObjectNode meta) {
+ public static String writeSpecWithMeta(
+ AbstractFlinkSpec spec, ReconciliationMetadata metadata) {
ObjectNode wrapper = objectMapper.createObjectNode();
wrapper.set("spec", objectMapper.valueToTree(Preconditions.checkNotNull(spec)));
- wrapper.set(INTERNAL_METADATA_JSON_KEY, meta);
+ wrapper.set(
+ INTERNAL_METADATA_JSON_KEY,
+ objectMapper.valueToTree(Preconditions.checkNotNull(metadata)));
try {
return objectMapper.writeValueAsString(wrapper);
@@ -435,7 +433,26 @@ public class ReconciliationUtils {
return -1L;
}
- return lastSpecWithMeta.f1.get("metadata").get("generation").asLong(-1L);
+ return lastSpecWithMeta.f1.getMetadata().getGeneration();
+ }
+
+ /**
+ * Clear last reconciled spec if that corresponds to the first deployment. This is important in
+ * cases where the first deployment fails.
+ *
+ * @param resource Flink resource.
+ */
+ public static void clearLastReconciledSpecIfFirstDeploy(AbstractFlinkResource<?, ?> resource) {
+ var reconStatus = resource.getStatus().getReconciliationStatus();
+ var lastSpecWithMeta = reconStatus.deserializeLastReconciledSpecWithMeta();
+
+ if (lastSpecWithMeta.f1 == null) {
+ return;
+ }
+
+ if (lastSpecWithMeta.f1.isFirstDeployment()) {
+ reconStatus.setLastReconciledSpec(null);
+ }
}
/**
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 866ecbd5..d35efb7d 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -70,6 +70,7 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.params.provider.Arguments.arguments;
/** {@link FlinkDeploymentController} tests. */
@@ -942,4 +943,19 @@ public class FlinkDeploymentControllerTest {
}
return args.stream();
}
+
+ @Test
+ public void testInitialSavepointOnError() throws Exception {
+ FlinkDeployment flinkDeployment = TestUtils.buildApplicationCluster();
+ flinkDeployment.getSpec().getJob().setInitialSavepointPath("msp");
+ flinkService.setDeployFailure(true);
+ try {
+ testController.reconcile(flinkDeployment, context);
+ fail();
+ } catch (Exception expected) {
+ }
+ flinkService.setDeployFailure(false);
+ testController.reconcile(flinkDeployment, context);
+ assertEquals("msp", flinkService.listJobs().get(0).f0);
+ }
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
index f3f93de6..b5d849bb 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
@@ -490,8 +490,23 @@ public class ApplicationObserverTest {
status.getJobManagerDeploymentStatus());
var specWithMeta = status.getReconciliationStatus().deserializeLastReconciledSpecWithMeta();
- assertEquals(321L, specWithMeta.f1.get("metadata").get("generation").asLong());
+ assertEquals(321L, specWithMeta.f1.getMetadata().getGeneration());
assertEquals(JobState.RUNNING, specWithMeta.f0.getJob().getState());
assertEquals(5, specWithMeta.f0.getJob().getParallelism());
}
+
+ @Test
+ public void validateLastReconciledClearedOnInitialFailure() {
+ FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+ deployment.getMetadata().setGeneration(123L);
+
+ ReconciliationUtils.updateStatusBeforeDeploymentAttempt(
+ deployment,
+ new FlinkConfigManager(new Configuration())
+ .getDeployConfig(deployment.getMetadata(), deployment.getSpec()));
+
+ assertFalse(deployment.getStatus().getReconciliationStatus().isFirstDeployment());
+ observer.observe(deployment, TestUtils.createEmptyContext());
+ assertTrue(deployment.getStatus().getReconciliationStatus().isFirstDeployment());
+ }
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserverTest.java
index a9e06049..2a048ef3 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserverTest.java
@@ -148,7 +148,7 @@ public class SessionObserverTest {
status.getJobManagerDeploymentStatus());
var specWithMeta = status.getReconciliationStatus().deserializeLastReconciledSpecWithMeta();
- assertEquals(321L, specWithMeta.f1.get("metadata").get("generation").asLong());
+ assertEquals(321L, specWithMeta.f1.getMetadata().getGeneration());
assertEquals("1", specWithMeta.f0.getFlinkConfiguration().get("k"));
}
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
index e7270bd7..f7423c84 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
@@ -43,6 +44,9 @@ import org.junit.jupiter.api.Test;
import java.util.Map;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
/** Tests for {@link SessionJobObserver}. */
@EnableKubernetesMockClient(crud = true)
public class SessionJobObserverTest {
@@ -341,4 +345,16 @@ public class SessionJobObserverTest {
Assertions.assertTrue(
exception.getMessage().contains("doesn't match upgrade target generation"));
}
+
+ @Test
+ public void validateLastReconciledClearedOnInitialFailure() {
+ var sessionJob = TestUtils.buildSessionJob();
+ sessionJob.getMetadata().setGeneration(123L);
+
+ ReconciliationUtils.updateStatusBeforeDeploymentAttempt(sessionJob, new Configuration());
+
+ assertFalse(sessionJob.getStatus().getReconciliationStatus().isFirstDeployment());
+ observer.observe(sessionJob, TestUtils.createContextWithReadyFlinkDeployment());
+ assertTrue(sessionJob.getStatus().getReconciliationStatus().isFirstDeployment());
+ }
}