You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/09/26 08:32:59 UTC
[flink-kubernetes-operator] branch main updated: [FLINK-29159] Harden initial deployment logic
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new d2161ac9 [FLINK-29159] Harden initial deployment logic
d2161ac9 is described below
commit d2161ac99e02af322b38b961b6df2660de812c59
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Sat Sep 24 17:39:46 2022 +0200
[FLINK-29159] Harden initial deployment logic
---
.../operator/crd/status/CommonStatus.java | 2 +-
.../operator/crd/status/ReconciliationStatus.java | 2 +-
.../deployment/AbstractDeploymentObserver.java | 2 +-
.../observer/sessionjob/SessionJobObserver.java | 2 +-
.../operator/reconciler/ReconciliationMetadata.java | 11 ++++++++++-
.../operator/reconciler/ReconciliationUtils.java | 8 ++++----
.../deployment/AbstractFlinkResourceReconciler.java | 2 +-
.../deployment/ApplicationReconciler.java | 15 +++------------
.../operator/validation/DefaultValidator.java | 4 ++--
.../deployment/ApplicationObserverTest.java | 11 +++++++++--
.../observer/sessionjob/SessionJobObserverTest.java | 4 ++--
.../deployment/ApplicationReconcilerTest.java | 21 +++++++++++++++++++++
12 files changed, 56 insertions(+), 28 deletions(-)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/CommonStatus.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/CommonStatus.java
index 44c9ebdc..b91efcf3 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/CommonStatus.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/CommonStatus.java
@@ -54,7 +54,7 @@ public abstract class CommonStatus<SPEC extends AbstractFlinkSpec> {
public ResourceLifecycleState getLifecycleState() {
var reconciliationStatus = getReconciliationStatus();
- if (reconciliationStatus.isFirstDeployment()) {
+ if (reconciliationStatus.isBeforeFirstDeployment()) {
return StringUtils.isEmpty(error)
? ResourceLifecycleState.CREATED
: ResourceLifecycleState.FAILED;
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/ReconciliationStatus.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/ReconciliationStatus.java
index fb16c50d..0e18d998 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/ReconciliationStatus.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/ReconciliationStatus.java
@@ -98,7 +98,7 @@ public abstract class ReconciliationStatus<SPEC extends AbstractFlinkSpec> {
}
@JsonIgnore
- public boolean isFirstDeployment() {
+ public boolean isBeforeFirstDeployment() {
return lastReconciledSpec == null;
}
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
index 3a41a1c3..e828a8ab 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
@@ -76,7 +76,7 @@ public abstract class AbstractDeploymentObserver implements Observer<FlinkDeploy
var reconciliationStatus = status.getReconciliationStatus();
// Nothing has been launched so skip observing
- if (reconciliationStatus.isFirstDeployment()
+ if (reconciliationStatus.isBeforeFirstDeployment()
|| reconciliationStatus.getState() == ReconciliationState.ROLLING_BACK) {
return;
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
index 6558dd19..760bd327 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
@@ -98,7 +98,7 @@ public class SessionJobObserver implements Observer<FlinkSessionJob> {
@Override
public void observe(FlinkSessionJob flinkSessionJob, Context<?> context) {
- if (flinkSessionJob.getStatus().getReconciliationStatus().isFirstDeployment()) {
+ if (flinkSessionJob.getStatus().getReconciliationStatus().isBeforeFirstDeployment()) {
return;
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationMetadata.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationMetadata.java
index a20f56f5..dce62ac8 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationMetadata.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationMetadata.java
@@ -18,6 +18,7 @@
package org.apache.flink.kubernetes.operator.reconciler;
import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
import com.fasterxml.jackson.annotation.JsonInclude;
import io.fabric8.kubernetes.api.model.ObjectMeta;
@@ -42,8 +43,16 @@ public class ReconciliationMetadata {
ObjectMeta metadata = new ObjectMeta();
metadata.setGeneration(resource.getMetadata().getGeneration());
- var firstDeploy = resource.getStatus().getReconciliationStatus().isFirstDeployment();
+ var firstDeploy =
+ resource.getStatus().getReconciliationStatus().isBeforeFirstDeployment()
+ || isFirstDeployment(resource);
return new ReconciliationMetadata(resource.getApiVersion(), metadata, firstDeploy);
}
+
+ private static boolean isFirstDeployment(AbstractFlinkResource<?, ?> resource) {
+ var reconStatus = resource.getStatus().getReconciliationStatus();
+ return reconStatus.getState() == ReconciliationState.DEPLOYED
+ && reconStatus.deserializeLastReconciledSpecWithMeta().f1.firstDeployment;
+ }
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index 389e59d6..cec8d1c2 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -107,6 +107,9 @@ public class ReconciliationUtils {
// Clear errors
status.setError("");
+ reconciliationStatus.setReconciliationTimestamp(System.currentTimeMillis());
+ reconciliationStatus.setState(
+ upgrading ? ReconciliationState.UPGRADING : ReconciliationState.DEPLOYED);
if (spec.getJob() != null) {
// For jobs we have to adjust the reconciled spec
@@ -135,10 +138,6 @@ public class ReconciliationUtils {
} else {
reconciliationStatus.serializeAndSetLastReconciledSpec(spec, target);
}
-
- reconciliationStatus.setReconciliationTimestamp(System.currentTimeMillis());
- reconciliationStatus.setState(
- upgrading ? ReconciliationState.UPGRADING : ReconciliationState.DEPLOYED);
}
public static <SPEC extends AbstractFlinkSpec> void updateLastReconciledSavepointTriggerNonce(
@@ -451,6 +450,7 @@ public class ReconciliationUtils {
if (lastSpecWithMeta.f1.isFirstDeployment()) {
reconStatus.setLastReconciledSpec(null);
+ reconStatus.setState(ReconciliationState.UPGRADING);
}
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index 436f96ec..859d32a6 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -99,7 +99,7 @@ public abstract class AbstractFlinkResourceReconciler<
// If this is the first deployment for the resource we simply submit the job and return.
// No further logic is required at this point.
- if (reconciliationStatus.isFirstDeployment()) {
+ if (reconciliationStatus.isBeforeFirstDeployment()) {
LOG.info("Deploying for the first time");
// Before we try to submit the job we record the current spec in the status so we can
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index 43520df3..a4384edd 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -27,7 +27,6 @@ import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
-import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.FlinkService;
@@ -135,16 +134,8 @@ public class ApplicationReconciler
deployment.getMetadata(), deployment.getStatus(), false);
flinkService.waitForClusterShutdown(deployConfig);
if (!flinkService.isHaMetadataAvailable(deployConfig)) {
- LOG.info(
- "Job never entered stable state. Clearing previous spec to reset for initial deploy");
- // TODO: lastSpecWithMeta.f1.isFirstDeployment() is false
- // ReconciliationUtils.clearLastReconciledSpecIfFirstDeploy(deployment);
- deployment.getStatus().getReconciliationStatus().setLastReconciledSpec(null);
- // UPGRADING triggers immediate reconciliation
- deployment
- .getStatus()
- .getReconciliationStatus()
- .setState(ReconciliationState.UPGRADING);
+ LOG.info("Job never entered stable state. Resetting status for initial deploy");
+ ReconciliationUtils.clearLastReconciledSpecIfFirstDeploy(deployment);
return Optional.empty();
} else {
// proceed with upgrade if deployment succeeded between check and delete
@@ -250,7 +241,7 @@ public class ApplicationReconciler
@SneakyThrows
protected DeleteControl cleanupInternal(FlinkDeployment deployment, Context<?> context) {
var status = deployment.getStatus();
- if (status.getReconciliationStatus().isFirstDeployment()) {
+ if (status.getReconciliationStatus().isBeforeFirstDeployment()) {
flinkService.deleteClusterDeployment(deployment.getMetadata(), status, true);
} else {
flinkService.cancelJob(
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
index 5486c7d6..03604b59 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
@@ -285,7 +285,7 @@ public class DefaultValidator implements FlinkResourceValidator {
FlinkDeployment deployment, Map<String, String> effectiveConfig) {
FlinkDeploymentSpec newSpec = deployment.getSpec();
- if (deployment.getStatus().getReconciliationStatus().isFirstDeployment()) {
+ if (deployment.getStatus().getReconciliationStatus().isBeforeFirstDeployment()) {
if (newSpec.getJob() != null && !newSpec.getJob().getState().equals(JobState.RUNNING)) {
return Optional.of("Job must start in running state");
}
@@ -417,7 +417,7 @@ public class DefaultValidator implements FlinkResourceValidator {
private Optional<String> validateSpecChange(FlinkSessionJob sessionJob) {
FlinkSessionJobSpec newSpec = sessionJob.getSpec();
- if (sessionJob.getStatus().getReconciliationStatus().isFirstDeployment()) {
+ if (sessionJob.getStatus().getReconciliationStatus().isBeforeFirstDeployment()) {
// New job
if (newSpec.getJob() != null && !newSpec.getJob().getState().equals(JobState.RUNNING)) {
return Optional.of("Job must start in running state");
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
index cbe59f85..c4575aa9 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
@@ -477,6 +477,10 @@ public class ApplicationObserverTest {
var reconStatus = status.getReconciliationStatus();
// New deployment
+ ReconciliationUtils.updateStatusBeforeDeploymentAttempt(
+ deployment,
+ new FlinkConfigManager(new Configuration())
+ .getDeployConfig(deployment.getMetadata(), deployment.getSpec()));
ReconciliationUtils.updateStatusForDeployedSpec(deployment, new Configuration());
// Test regular upgrades
@@ -528,9 +532,12 @@ public class ApplicationObserverTest {
deployment,
new FlinkConfigManager(new Configuration())
.getDeployConfig(deployment.getMetadata(), deployment.getSpec()));
+ var reconStatus = deployment.getStatus().getReconciliationStatus();
+
+ assertTrue(reconStatus.deserializeLastReconciledSpecWithMeta().f1.isFirstDeployment());
+ assertFalse(reconStatus.isBeforeFirstDeployment());
- assertFalse(deployment.getStatus().getReconciliationStatus().isFirstDeployment());
observer.observe(deployment, TestUtils.createEmptyContext());
- assertTrue(deployment.getStatus().getReconciliationStatus().isFirstDeployment());
+ assertTrue(reconStatus.isBeforeFirstDeployment());
}
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
index 05ad76f3..092fe899 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
@@ -352,8 +352,8 @@ public class SessionJobObserverTest {
ReconciliationUtils.updateStatusBeforeDeploymentAttempt(sessionJob, new Configuration());
- assertFalse(sessionJob.getStatus().getReconciliationStatus().isFirstDeployment());
+ assertFalse(sessionJob.getStatus().getReconciliationStatus().isBeforeFirstDeployment());
observer.observe(sessionJob, TestUtils.createContextWithReadyFlinkDeployment());
- assertTrue(sessionJob.getStatus().getReconciliationStatus().isFirstDeployment());
+ assertTrue(sessionJob.getStatus().getReconciliationStatus().isBeforeFirstDeployment());
}
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index b102cdb1..fce09a14 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -102,16 +102,37 @@ public class ApplicationReconcilerTest {
reconciler.reconcile(deployment, context);
var runningJobs = flinkService.listJobs();
verifyAndSetRunningJobsToStatus(deployment, runningJobs);
+ assertTrue(
+ deployment
+ .getStatus()
+ .getReconciliationStatus()
+ .deserializeLastReconciledSpecWithMeta()
+ .f1
+ .isFirstDeployment());
// Test stateless upgrade
FlinkDeployment statelessUpgrade = ReconciliationUtils.clone(deployment);
statelessUpgrade.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS);
statelessUpgrade.getSpec().getFlinkConfiguration().put("new", "conf");
reconciler.reconcile(statelessUpgrade, context);
+ assertFalse(
+ statelessUpgrade
+ .getStatus()
+ .getReconciliationStatus()
+ .deserializeLastReconciledSpecWithMeta()
+ .f1
+ .isFirstDeployment());
assertEquals(0, flinkService.getRunningCount());
reconciler.reconcile(statelessUpgrade, context);
+ assertFalse(
+ statelessUpgrade
+ .getStatus()
+ .getReconciliationStatus()
+ .deserializeLastReconciledSpecWithMeta()
+ .f1
+ .isFirstDeployment());
runningJobs = flinkService.listJobs();
assertEquals(1, flinkService.getRunningCount());