You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/12 06:07:33 UTC

[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #165: [FLINK-26140] Support rollback strategies

Aitozi commented on code in PR #165:
URL: https://github.com/apache/flink-kubernetes-operator/pull/165#discussion_r847992477


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java:
##########
@@ -144,4 +148,22 @@ private void observeSavepointStatus(
         logger.info("Savepoint status updated with latest completed savepoint info");
         savepointInfo.updateLastSavepoint(savepointFetchResult.getSavepoint());
     }
+
+    @Override
+    protected void updateStableSpec(FlinkDeployment deployment) {
+        FlinkDeploymentStatus status = deployment.getStatus();
+        ReconciliationStatus reconciliationStatus = status.getReconciliationStatus();
+
+        // The last reconciled spec is already considered stable, nothing to do...
+        if (isCurrentSpecStable(reconciliationStatus)) {

Review Comment:
   It seems we do not have to check this. Since if it's same, it will do nothing. 
   
   nit: I think the naming `isCurrentSpecStable` looks like it will check the stability, but it only compare the `lastReconciledSpec` with `lastStableSpec`, It's a bit misleading



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java:
##########
@@ -62,40 +64,42 @@ public static void updateForSpecReconciliationSuccess(
             clonedSpec.getJob().setState(stateAfterReconcile);
         }
         reconciliationStatus.serializeAndSetLastReconciledSpec(clonedSpec);
+        reconciliationStatus.setReconciliationTimestamp(System.currentTimeMillis());
+        reconciliationStatus.setRolledBack(false);
     }
 
     public static void updateSavepointReconciliationSuccess(FlinkDeployment flinkApp) {
         ReconciliationStatus reconciliationStatus = flinkApp.getStatus().getReconciliationStatus();
-        reconciliationStatus.setSuccess(true);
-        reconciliationStatus.setError(null);
+        flinkApp.getStatus().setError(null);
         FlinkDeploymentSpec lastReconciledSpec =
                 reconciliationStatus.deserializeLastReconciledSpec();
         lastReconciledSpec
                 .getJob()
                 .setSavepointTriggerNonce(flinkApp.getSpec().getJob().getSavepointTriggerNonce());
         reconciliationStatus.serializeAndSetLastReconciledSpec(lastReconciledSpec);
+        reconciliationStatus.setReconciliationTimestamp(System.currentTimeMillis());
     }
 
     public static void updateForReconciliationError(FlinkDeployment flinkApp, String err) {
-        ReconciliationStatus reconciliationStatus = flinkApp.getStatus().getReconciliationStatus();
-        reconciliationStatus.setSuccess(false);
-        reconciliationStatus.setError(err);
+        flinkApp.getStatus().setError(err);
     }
 
     public static void updateForSpecReconciliationSuccess(FlinkSessionJob sessionJob) {
         FlinkSessionJobReconciliationStatus reconciliationStatus =
                 sessionJob.getStatus().getReconciliationStatus();
-        reconciliationStatus.setSuccess(true);
-        reconciliationStatus.setError(null);
+        sessionJob.getStatus().setError(null);
         FlinkSessionJobSpec clonedSpec = clone(sessionJob.getSpec());
         reconciliationStatus.serializeAndSetLastReconciledSpec(clonedSpec);
     }
 
     public static void updateForReconciliationError(FlinkSessionJob flinkSessionJob, String err) {
-        FlinkSessionJobReconciliationStatus reconciliationStatus =
-                flinkSessionJob.getStatus().getReconciliationStatus();
-        reconciliationStatus.setSuccess(false);
-        reconciliationStatus.setError(err);
+        flinkSessionJob.getStatus().setError(err);
+    }
+
+    public static void updateLastStableSpec(FlinkDeployment deployment) {

Review Comment:
   I have a question here, Can we put these util method be part of the `FlinkSessionJob` and `FlinkDeployment`. One drawback is it may pollute the simple crd objects, but I think it's more suitable to put there.  It's out this PR, leaving it here for discussion.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java:
##########
@@ -111,12 +114,34 @@ public void reconcile(FlinkDeployment flinkApp, Context context, Configuration e
             }
             IngressUtils.updateIngressRules(flinkApp, effectiveConfig, kubernetesClient);
             ReconciliationUtils.updateForSpecReconciliationSuccess(flinkApp, stateAfterReconcile);
+        } else if (ReconciliationUtils.shouldRollBack(flinkApp)) {
+            rollbackApplication(flinkApp);
         } else if (SavepointUtils.shouldTriggerSavepoint(flinkApp) && isJobRunning(flinkApp)) {
             triggerSavepoint(flinkApp, effectiveConfig);
             ReconciliationUtils.updateSavepointReconciliationSuccess(flinkApp);
         }
     }
 
+    private void rollbackApplication(FlinkDeployment flinkApp) throws Exception {
+        LOG.warn("Rolling back deployment to last stable spec");
+        Configuration rollbackConfig =
+                FlinkUtils.getEffectiveConfig(
+                        flinkApp.getMetadata(),
+                        flinkApp.getStatus().getReconciliationStatus().deserializeLastStableSpec(),
+                        defaultConfig.getFlinkConfig());
+        suspendJob(flinkApp, UpgradeMode.LAST_STATE, rollbackConfig);
+        deployFlinkJob(
+                flinkApp,
+                rollbackConfig,
+                Optional.ofNullable(
+                                flinkApp.getStatus()
+                                        .getJobStatus()
+                                        .getSavepointInfo()
+                                        .getLastSavepoint())
+                        .map(Savepoint::getLocation));
+        flinkApp.getStatus().getReconciliationStatus().setRolledBack(true);

Review Comment:
   Do we have to update the `lastReconciledSpec` to `lastStableSpec` after rollback?



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java:
##########
@@ -111,12 +114,34 @@ public void reconcile(FlinkDeployment flinkApp, Context context, Configuration e
             }
             IngressUtils.updateIngressRules(flinkApp, effectiveConfig, kubernetesClient);
             ReconciliationUtils.updateForSpecReconciliationSuccess(flinkApp, stateAfterReconcile);
+        } else if (ReconciliationUtils.shouldRollBack(flinkApp)) {
+            rollbackApplication(flinkApp);
         } else if (SavepointUtils.shouldTriggerSavepoint(flinkApp) && isJobRunning(flinkApp)) {
             triggerSavepoint(flinkApp, effectiveConfig);
             ReconciliationUtils.updateSavepointReconciliationSuccess(flinkApp);
         }
     }
 
+    private void rollbackApplication(FlinkDeployment flinkApp) throws Exception {
+        LOG.warn("Rolling back deployment to last stable spec");
+        Configuration rollbackConfig =
+                FlinkUtils.getEffectiveConfig(
+                        flinkApp.getMetadata(),
+                        flinkApp.getStatus().getReconciliationStatus().deserializeLastStableSpec(),
+                        defaultConfig.getFlinkConfig());
+        suspendJob(flinkApp, UpgradeMode.LAST_STATE, rollbackConfig);
+        deployFlinkJob(

Review Comment:
   Why not use the two step to rollback the job `suspend -> reconcile -> deploy`?



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java:
##########
@@ -50,4 +52,9 @@ public void observeIfClusterReady(
             }
         }
     }
+
+    @Override
+    protected void updateStableSpec(FlinkDeployment deployment) {

Review Comment:
   what will be the stable condition for the session cluster after upgrade?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org