You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/05/15 12:36:01 UTC

[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #212: [FLINK-27572][FLINK-27594] Ensure HA metadata is present before restoring job with last state

gyfora commented on code in PR #212:
URL: https://github.com/apache/flink-kubernetes-operator/pull/212#discussion_r873163000


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java:
##########
@@ -181,35 +181,62 @@ private void rollbackApplication(FlinkDeployment flinkApp) throws Exception {
     private void recoverJmDeployment(FlinkDeployment deployment) throws Exception {
         LOG.info("Missing Flink Cluster deployment, trying to recover...");
         FlinkDeploymentSpec specToRecover = ReconciliationUtils.getDeployedSpec(deployment);
-        restoreFromLastSavepoint(
+        restoreJob(
                 deployment.getMetadata(),
                 specToRecover.getJob(),
                 deployment.getStatus(),
-                configManager.getDeployConfig(deployment.getMetadata(), specToRecover));
+                configManager.getDeployConfig(deployment.getMetadata(), specToRecover),
+                true);
     }
 
-    private boolean inUpgradeableState(FlinkDeployment deployment) {
-        if (deployment.getSpec().getJob().getUpgradeMode() != UpgradeMode.SAVEPOINT
-                && !ReconciliationUtils.isUpgradeModeChangedToLastStateAndHADisabledPreviously(
-                        deployment, configManager)) {
-            // Only savepoint upgrade mode or changed from stateless/savepoint to last-state while
-            // HA disabled previously need a running job
-            return true;
+    private Optional<UpgradeMode> getAvailableUpgradeMode(FlinkDeployment deployment) {
+        var status = deployment.getStatus();
+        var upgradeMode = deployment.getSpec().getJob().getUpgradeMode();
+        var changedToLastStateWithoutHa =
+                ReconciliationUtils.isUpgradeModeChangedToLastStateAndHADisabledPreviously(
+                        deployment, configManager);
+
+        if (upgradeMode == UpgradeMode.STATELESS) {
+            LOG.debug("Stateless job, ready for upgrade");
+            return Optional.of(upgradeMode);
+        }
+
+        if (ReconciliationUtils.isJobInTerminalState(status)) {
+            LOG.debug("Job is terminal state, ready for upgrade");
+            return Optional.of(upgradeMode);
         }
 
-        FlinkDeploymentStatus status = deployment.getStatus();
+        if (ReconciliationUtils.isJobRunning(status)) {
+            LOG.debug("Job is running state, ready for upgrade");
+            if (changedToLastStateWithoutHa) {
+                LOG.debug(
+                        "Using savepoint upgrade mode when switching to last-state without HA previously enabled");
+                return Optional.of(UpgradeMode.SAVEPOINT);
+            } else {
+                return Optional.of(upgradeMode);
+            }
+        }
 
-        if (ReconciliationUtils.jmMissingOrErrorForRunningDep(status)) {
-            // JobManager is missing for savepoint upgrade, we cannot roll back
+        if (flinkService.isHaMetadataAvailable(configManager.getObserveConfig(deployment))) {
+            LOG.debug(
+                    "Job is not running but HA metadata is available for last state restore, ready for upgrade");
+            return Optional.of(UpgradeMode.LAST_STATE);
+        }
+
+        if (status.getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.MISSING
+                || status.getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.ERROR) {
             throw new DeploymentFailedException(
-                    "Cannot perform savepoint upgrade on missing/failed JobManager deployment",
+                    "JobManager deployment is missing and HA data is not available to make stateful upgrades. "
+                            + "It is possible that the job has finished or terminally failed, or the configmaps have been deleted. "
+                            + "Manual restore required.",

Review Comment:
   I have a ticket open for these docs, I will work on that next week to have it for the release.
   
   By the way this is the exact situation that answers your question regarding the "stuck" upgrade progress. This is a situation where the JM deployment is missing, and also HA is not available to get last state info.
   
   We cannot deal with it, that's why we throw an error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org