You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by th...@apache.org on 2022/08/31 17:02:28 UTC
[flink-kubernetes-operator] branch main updated: [FLINK-29100] Relax upgrade checks to allow stateless restart when no stable spec is present
This is an automated email from the ASF dual-hosted git repository.
thw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 6d7c8cee [FLINK-29100] Relax upgrade checks to allow stateless restart when no stable spec is present
6d7c8cee is described below
commit 6d7c8ceebb9ee0b83eb7036c57003177b7f2fd82
Author: Thomas Weise <th...@apache.org>
AuthorDate: Tue Aug 30 21:17:33 2022 -0400
[FLINK-29100] Relax upgrade checks to allow stateless restart when no stable spec is present
---
.../deployment/ApplicationReconciler.java | 41 +++++++++++++++++++---
.../deployment/ApplicationReconcilerTest.java | 38 ++++++++++++++++++++
2 files changed, 75 insertions(+), 4 deletions(-)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index 42aabaad..43520df3 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -27,6 +27,7 @@ import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.FlinkService;
@@ -98,12 +99,19 @@ public class ApplicationReconciler
.OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED)
&& FlinkUtils.isKubernetesHAActivated(deployConfig)
&& FlinkUtils.isKubernetesHAActivated(observeConfig)
- && flinkService.isHaMetadataAvailable(deployConfig)
&& !flinkVersionChanged(
ReconciliationUtils.getDeployedSpec(deployment), deployment.getSpec())) {
- LOG.info(
- "Job is not running but HA metadata is available for last state restore, ready for upgrade");
- return Optional.of(UpgradeMode.LAST_STATE);
+
+ if (!flinkService.isHaMetadataAvailable(deployConfig)) {
+ if (deployment.getStatus().getReconciliationStatus().getLastStableSpec() == null) {
+ // initial deployment failure, reset to allow for spec change to proceed
+ return resetOnMissingStableSpec(deployment, deployConfig);
+ }
+ } else {
+ LOG.info(
+ "Job is not running but HA metadata is available for last state restore, ready for upgrade");
+ return Optional.of(UpgradeMode.LAST_STATE);
+ }
}
if (status.getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.MISSING
@@ -120,6 +128,31 @@ public class ApplicationReconciler
return Optional.empty();
}
+ private Optional<UpgradeMode> resetOnMissingStableSpec(
+ FlinkDeployment deployment, Configuration deployConfig) {
+ // initial deployment failure, reset to allow for spec change to proceed
+ flinkService.deleteClusterDeployment(
+ deployment.getMetadata(), deployment.getStatus(), false);
+ flinkService.waitForClusterShutdown(deployConfig);
+ if (!flinkService.isHaMetadataAvailable(deployConfig)) {
+ LOG.info(
+ "Job never entered stable state. Clearing previous spec to reset for initial deploy");
+ // TODO: lastSpecWithMeta.f1.isFirstDeployment() is false
+ // ReconciliationUtils.clearLastReconciledSpecIfFirstDeploy(deployment);
+ deployment.getStatus().getReconciliationStatus().setLastReconciledSpec(null);
+ // UPGRADING triggers immediate reconciliation
+ deployment
+ .getStatus()
+ .getReconciliationStatus()
+ .setState(ReconciliationState.UPGRADING);
+ return Optional.empty();
+ } else {
+ // proceed with upgrade if deployment succeeded between check and delete
+ LOG.info("Found HA state after deployment deletion, falling back to stateful upgrade");
+ return Optional.of(UpgradeMode.LAST_STATE);
+ }
+ }
+
@Override
protected void deploy(
FlinkDeployment relatedResource,
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index 73a707b0..e1f04ec5 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -147,6 +147,11 @@ public class ApplicationReconcilerTest {
deployment.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
deployment.getSpec().setRestartNonce(100L);
+ deployment
+ .getStatus()
+ .getReconciliationStatus()
+ .setLastStableSpec(
+ deployment.getStatus().getReconciliationStatus().getLastReconciledSpec());
flinkService.setHaDataAvailable(false);
deployment.getStatus().getJobStatus().setState("RECONCILING");
@@ -184,6 +189,39 @@ public class ApplicationReconcilerTest {
assertEquals("finished_sp", runningJobs.get(0).f0);
}
+ @ParameterizedTest
+ @EnumSource(UpgradeMode.class)
+ public void testUpgradeBeforeReachingStableSpec(UpgradeMode upgradeMode) throws Exception {
+ flinkService.setHaDataAvailable(false);
+
+ final FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+
+ reconciler.reconcile(deployment, context);
+ assertEquals(
+ JobManagerDeploymentStatus.DEPLOYING,
+ deployment.getStatus().getJobManagerDeploymentStatus());
+
+ // Ready for spec changes, the reconciliation should be performed
+ final String newImage = "new-image-1";
+ deployment.getSpec().getJob().setUpgradeMode(upgradeMode);
+ deployment.getSpec().setImage(newImage);
+ reconciler.reconcile(deployment, context);
+ if (!UpgradeMode.STATELESS.equals(upgradeMode)) {
+ assertNull(deployment.getStatus().getReconciliationStatus().getLastReconciledSpec());
+ assertEquals(
+ ReconciliationState.UPGRADING,
+ deployment.getStatus().getReconciliationStatus().getState());
+ reconciler.reconcile(deployment, context);
+ }
+ assertEquals(
+ newImage,
+ deployment
+ .getStatus()
+ .getReconciliationStatus()
+ .deserializeLastReconciledSpec()
+ .getImage());
+ }
+
@Test
public void testUpgradeModeChangeFromSavepointToLastState() throws Exception {
final String expectedSavepointPath = "savepoint_0";