You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2023/01/01 16:15:35 UTC
[flink-kubernetes-operator] branch main updated: [FLINK-30528] Reconcile other changes if upgrade is not available
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new a1842d4c [FLINK-30528] Reconcile other changes if upgrade is not available
a1842d4c is described below
commit a1842d4c0170feb008293963ec51c0343f42771d
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Thu Dec 29 15:46:05 2022 +0100
[FLINK-30528] Reconcile other changes if upgrade is not available
---
.../AbstractFlinkResourceReconciler.java | 20 +++++----
.../deployment/AbstractJobReconciler.java | 5 ++-
.../deployment/ApplicationReconciler.java | 7 ++--
.../reconciler/deployment/SessionReconciler.java | 3 +-
.../deployment/ApplicationReconcilerTest.java | 47 ++++++++++++++++++++++
5 files changed, 69 insertions(+), 13 deletions(-)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index 411b58f6..0ec8d9ed 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -145,9 +145,7 @@ public abstract class AbstractFlinkResourceReconciler<
|| reconciliationStatus.getState() == ReconciliationState.UPGRADING;
var observeConfig = getObserveConfig(cr, ctx);
-
if (specChanged) {
-
if (checkNewSpecAlreadyDeployed(cr, deployConfig)) {
return;
}
@@ -162,11 +160,18 @@ public abstract class AbstractFlinkResourceReconciler<
EventRecorder.Component.JobManagerDeployment,
specChangeMessage);
}
- boolean scale = scaleCluster(cr, ctx, observeConfig, deployConfig, specDiff.getType());
- if (!scale) {
- reconcileSpecChange(cr, ctx, observeConfig, deployConfig, specDiff.getType());
+ boolean reconciled =
+ scaleCluster(cr, ctx, observeConfig, deployConfig, specDiff.getType())
+ || reconcileSpecChange(
+ cr, ctx, observeConfig, deployConfig, specDiff.getType());
+ if (reconciled) {
+ // If we executed a scale or spec upgrade action we return, otherwise we continue to
+ // reconcile other changes
+ return;
}
- } else if (shouldRollBack(cr, observeConfig, flinkService)) {
+ }
+
+ if (shouldRollBack(cr, observeConfig, flinkService)) {
// Rollbacks are executed in two steps, we initiate it first then return
if (initiateRollBack(status)) {
return;
@@ -255,8 +260,9 @@ public abstract class AbstractFlinkResourceReconciler<
* @param observeConfig Observe configuration.
* @param deployConfig Deployment configuration.
* @throws Exception Error during spec upgrade.
+ * @return True if spec change reconciliation was executed
*/
- protected abstract void reconcileSpecChange(
+ protected abstract boolean reconcileSpecChange(
CR cr,
Context<?> ctx,
Configuration observeConfig,
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index 5140f14a..efa803e9 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -82,7 +82,7 @@ public abstract class AbstractJobReconciler<
}
@Override
- protected void reconcileSpecChange(
+ protected boolean reconcileSpecChange(
CR resource,
Context<?> ctx,
Configuration observeConfig,
@@ -104,7 +104,7 @@ public abstract class AbstractJobReconciler<
Optional<UpgradeMode> availableUpgradeMode =
getAvailableUpgradeMode(resource, ctx, deployConfig, observeConfig);
if (availableUpgradeMode.isEmpty()) {
- return;
+ return false;
}
eventRecorder.triggerEvent(
@@ -145,6 +145,7 @@ public abstract class AbstractJobReconciler<
ReconciliationUtils.updateStatusForDeployedSpec(resource, deployConfig);
}
+ return true;
}
protected Optional<UpgradeMode> getAvailableUpgradeMode(
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index 918ec0ec..95110dd3 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -135,8 +135,9 @@ public class ApplicationReconciler
return getAvailableUpgradeMode(deployment, ctx, deployConfig, observeConfig);
}
- if (jmDeployStatus == JobManagerDeploymentStatus.MISSING
- || jmDeployStatus == JobManagerDeploymentStatus.ERROR) {
+ if ((jmDeployStatus == JobManagerDeploymentStatus.MISSING
+ || jmDeployStatus == JobManagerDeploymentStatus.ERROR)
+ && !flinkService.isHaMetadataAvailable(deployConfig)) {
throw new RecoveryFailureException(
"JobManager deployment is missing and HA data is not available to make stateful upgrades. "
+ "It is possible that the job has finished or terminally failed, or the configmaps have been deleted. "
@@ -145,7 +146,7 @@ public class ApplicationReconciler
}
LOG.info(
- "Job is not running yet and HA metadata is not available, waiting for upgradeable state");
+ "Job is not running and HA metadata is not available or usable for executing the upgrade, waiting for upgradeable state");
return Optional.empty();
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
index 24e7ecea..c0489292 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
@@ -91,7 +91,7 @@ public class SessionReconciler
}
@Override
- protected void reconcileSpecChange(
+ protected boolean reconcileSpecChange(
FlinkDeployment deployment,
Context<?> ctx,
Configuration observeConfig,
@@ -113,6 +113,7 @@ public class SessionReconciler
Optional.empty(),
false);
ReconciliationUtils.updateStatusForDeployedSpec(deployment, deployConfig);
+ return true;
}
private void deleteSessionCluster(FlinkDeployment deployment, Configuration effectiveConfig) {
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index 813c4908..da30bd9a 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -432,6 +432,7 @@ public class ApplicationReconcilerTest {
.toBuilder()
.jobId(runningJobs.get(0).f1.getJobId().toHexString())
.jobName(runningJobs.get(0).f1.getJobName())
+ .updateTime(Long.toString(System.currentTimeMillis()))
.state("RUNNING")
.build());
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
@@ -706,4 +707,50 @@ public class ApplicationReconcilerTest {
reconciler.reconcile(deployment, context);
Assertions.assertEquals(MSG_RESTART_UNHEALTHY, eventCollector.events.remove().getMessage());
}
+
+ @Test
+ public void testReconcileIfUpgradeModeNotAvailable() throws Exception {
+ FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+ deployment.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
+
+ // We disable last state fallback as we want to test that the deployment is properly
+ // recovered before upgrade
+ deployment
+ .getSpec()
+ .getFlinkConfiguration()
+ .put(
+ KubernetesOperatorConfigOptions
+ .OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED
+ .key(),
+ "false");
+
+ // Initial deployment
+ reconciler.reconcile(deployment, context);
+
+ // Trigger upgrade but set jobmanager status to missing -> savepoint upgrade not available
+ deployment.getSpec().setRestartNonce(123L);
+ deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
+ flinkService.clear();
+
+ reconciler.reconcile(deployment, context);
+ // We verify that deployment was recovered before upgrade
+ assertEquals(
+ JobManagerDeploymentStatus.DEPLOYING,
+ deployment.getStatus().getJobManagerDeploymentStatus());
+
+ var lastReconciledSpec =
+ deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
+ assertNotEquals(
+ deployment.getSpec().getRestartNonce(), lastReconciledSpec.getRestartNonce());
+
+ // Set to running to let savepoint upgrade proceed
+ verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
+
+ reconciler.reconcile(deployment, context);
+ // Make sure upgrade is properly triggered now
+ lastReconciledSpec =
+ deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
+ assertEquals(deployment.getSpec().getRestartNonce(), lastReconciledSpec.getRestartNonce());
+ assertEquals(JobState.SUSPENDED, lastReconciledSpec.getJob().getState());
+ }
}