You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/01 02:57:18 UTC

[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #283: [FLINK-28228] Never skip generations when observing already upgraded deployment

Aitozi commented on code in PR #283:
URL: https://github.com/apache/flink-kubernetes-operator/pull/283#discussion_r911568620


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java:
##########
@@ -99,15 +103,20 @@ public final void reconcile(CR cr, Context ctx) throws Exception {
         // No further logic is required at this point.
         if (firstDeployment) {
             LOG.info("Deploying for the first time");
+
+            // Before we try to submit the job we record the current spec in the status so we can
+            // handle subsequent deployment and status update errors
+            ReconciliationUtils.updateStatusBeforeDeploymentAttempt(cr, deployConfig);
+            statusRecorder.patchAndCacheStatus(cr);
+
             deploy(
                     cr,
                     spec,
                     status,
                     deployConfig,
                     Optional.ofNullable(spec.getJob()).map(JobSpec::getInitialSavepointPath),
                     false);
-            ReconciliationUtils.updateForSpecReconciliationSuccess(
-                    cr, JobState.RUNNING, deployConfig);
+            ReconciliationUtils.updateStatusForDeployedSpec(cr, deployConfig);

Review Comment:
   If we throw exception in the deploy, the resource may not be truly deployed. So, I think the `firstDeployment` condition is not full. IMO, It should includes :
   
   - lastReconciledSpec == null
   - Or state in the `ReconciliationState.UPGRADING` and `JobManagerDeploymentStatus.Missing`



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java:
##########
@@ -262,8 +262,11 @@ private void onMissingDeployment(FlinkDeployment deployment) {
      * @param context Context for reconciliation.
      */
     private void checkIfAlreadyUpgraded(FlinkDeployment flinkDep, Context context) {
-        Optional<Deployment> depOpt = context.getSecondaryResource(Deployment.class);
         var status = flinkDep.getStatus();
+        if (status.getReconciliationStatus().getLastReconciledSpec() == null) {

Review Comment:
   +1, and I think the condition should be improve Since the lastReconciliationStatus will be materialized before the `deploy` so there is the case it was not really finished. We should take this into account.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java:
##########
@@ -106,19 +107,27 @@ protected void reconcileSpecChange(
             // We must record the upgrade mode used to the status later
             currentDeploySpec.getJob().setUpgradeMode(availableUpgradeMode.get());
             cancelJob(resource, availableUpgradeMode.get(), observeConfig);
-            newState = JobState.SUSPENDED;
+            if (desiredJobState == JobState.RUNNING) {
+                ReconciliationUtils.updateStatusBeforeDeploymentAttempt(resource, deployConfig);

Review Comment:
   Is this redundant, we do not do upgrade in this branch. This will be done in the `if (currentJobState == JobState.SUSPENDED && desiredJobState == JobState.RUNNING)` I think.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java:
##########
@@ -133,7 +135,16 @@ protected void rollback(FlinkDeployment deployment, Context ctx, Configuration o
         FlinkDeploymentSpec rollbackSpec = reconciliationStatus.deserializeLastStableSpec();
         Configuration rollbackConfig =
                 configManager.getDeployConfig(deployment.getMetadata(), rollbackSpec);
-        upgradeSessionCluster(deployment, rollbackSpec, rollbackConfig);
+
+        deleteSessionCluster(deployment, observeConfig);
+        deploy(

Review Comment:
   Why in the rollback we do not have to record the target upgrade status ? I think the rollback should deploy the stable spec with the corresponding generation, WDYT 



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java:
##########
@@ -133,7 +135,16 @@ protected void rollback(FlinkDeployment deployment, Context ctx, Configuration o
         FlinkDeploymentSpec rollbackSpec = reconciliationStatus.deserializeLastStableSpec();
         Configuration rollbackConfig =
                 configManager.getDeployConfig(deployment.getMetadata(), rollbackSpec);
-        upgradeSessionCluster(deployment, rollbackSpec, rollbackConfig);
+
+        deleteSessionCluster(deployment, observeConfig);
+        deploy(

Review Comment:
   I also found that the general flow of the deploy become the 
   - record the target upgrade/firstDeploy in status
   - deploy
   - update the deployed spec
   
   So I think we could include it in the `deploy` directly



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java:
##########
@@ -99,15 +103,20 @@ public final void reconcile(CR cr, Context ctx) throws Exception {
         // No further logic is required at this point.
         if (firstDeployment) {
             LOG.info("Deploying for the first time");
+
+            // Before we try to submit the job we record the current spec in the status so we can
+            // handle subsequent deployment and status update errors
+            ReconciliationUtils.updateStatusBeforeDeploymentAttempt(cr, deployConfig);
+            statusRecorder.patchAndCacheStatus(cr);
+
             deploy(
                     cr,
                     spec,
                     status,
                     deployConfig,
                     Optional.ofNullable(spec.getJob()).map(JobSpec::getInitialSavepointPath),
                     false);
-            ReconciliationUtils.updateForSpecReconciliationSuccess(
-                    cr, JobState.RUNNING, deployConfig);
+            ReconciliationUtils.updateStatusForDeployedSpec(cr, deployConfig);

Review Comment:
   I'm second to @morhidi  to make the `isFirstDeploy` as a dedicated methods. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org