You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by GitBox <gi...@apache.org> on 2022/02/17 06:47:35 UTC

[GitHub] [flink-kubernetes-operator] gyfora opened a new pull request #5: [FLINK-26135] Introduce ReconciliationStatus and improve error handling in controller flow

gyfora opened a new pull request #5:
URL: https://github.com/apache/flink-kubernetes-operator/pull/5


   Improvements to flink deployment status handling.
   
   Introduce `ReconciliationStatus` to allow capturing error that do not necessarily affect the running jobs. The PR does not introduce new validation logic for the deployments, that is left as a separate ticket for now
   
   The PR also improves the reonciliation flow to avoid rescheduling reconciliation when not necessary + introduces a controller test to verify the basic flow.
   
   cc @wangyang0918 @tweise 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #5: [FLINK-26135] Introduce ReconciliationStatus and improve error handling in controller flow

Posted by GitBox <gi...@apache.org>.
gyfora commented on pull request #5:
URL: https://github.com/apache/flink-kubernetes-operator/pull/5#issuecomment-1043902384


   Thanks @tweise for the comments regarding capping the attempt count. We have considered this but could not come up with the best way yet that's why we left it lit this for this PR.
   
   We can track count + timestamps and make the retry settings configurable.
   
   We should also distinguish recovarable and fatal errors because some errors should not be retried ever as they will never go away (such as config errors).
   
   I will open a followup jira for this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] asfgit closed pull request #5: [FLINK-26135] Introduce ReconciliationStatus and improve error handling in controller flow

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #5:
URL: https://github.com/apache/flink-kubernetes-operator/pull/5


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] tweise commented on a change in pull request #5: [FLINK-26135] Introduce ReconciliationStatus and improve error handling in controller flow

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #5:
URL: https://github.com/apache/flink-kubernetes-operator/pull/5#discussion_r809672525



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
##########
@@ -90,37 +94,75 @@ public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
     @Override
     public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Context context) {
         LOG.info("Reconciling {}", flinkApp.getMetadata().getName());
+        if (flinkApp.getStatus() == null) {
+            flinkApp.setStatus(new FlinkDeploymentStatus());
+        }
 
         Configuration effectiveConfig = FlinkUtils.getEffectiveConfig(flinkApp);
 
-        boolean success = observer.observeFlinkJobStatus(flinkApp, effectiveConfig);
-        if (success) {
-            try {
-                success = reconcileFlinkDeployment(operatorNamespace, flinkApp, effectiveConfig);
-            } catch (Exception e) {
-                throw new RuntimeException(
-                        "Error while reconciling deployment change for "
-                                + flinkApp.getMetadata().getName(),
-                        e);
-            }
-        }
+        boolean successfulObserve = observer.observeFlinkJobStatus(flinkApp, effectiveConfig);
 
-        if (!success) {
+        if (!successfulObserve) {
+            // Cluster not accessible let's retry
             return UpdateControl.<FlinkDeployment>noUpdate()
-                    .rescheduleAfter(JOB_REFRESH_SECONDS, TimeUnit.SECONDS);
+                    .rescheduleAfter(OBSERVE_REFRESH_SECONDS, TimeUnit.SECONDS);
+        }
+
+        if (!specChanged(flinkApp)) {
+            // Successfully observed the cluster after reconciliation, no need to reschedule
+            return UpdateControl.updateStatus(flinkApp);
+        }
+
+        try {
+            reconcileFlinkDeployment(operatorNamespace, flinkApp, effectiveConfig);
+        } catch (Exception e) {
+            String err = "Error while reconciling deployment change: " + e.getMessage();
+            String lastErr = flinkApp.getStatus().getReconciliationStatus().getError();
+            if (!err.equals(lastErr)) {
+                // Log new errors on the first instance
+                LOG.error("Error while reconciling deployment change", e);
+                updateForReconciliationError(flinkApp, err);
+                return UpdateControl.updateStatus(flinkApp)
+                        .rescheduleAfter(RECONCILE_ERROR_REFRESH_SECONDS, TimeUnit.SECONDS);
+            } else {
+                return UpdateControl.<FlinkDeployment>noUpdate()
+                        .rescheduleAfter(RECONCILE_ERROR_REFRESH_SECONDS, TimeUnit.SECONDS);
+            }
         }
 
-        flinkApp.getStatus().setSpec(flinkApp.getSpec());
+        // Everything went well, update status and reschedule for observation
+        updateForReconciliationSuccess(flinkApp);
         return UpdateControl.updateStatus(flinkApp)
-                .rescheduleAfter(JOB_REFRESH_SECONDS, TimeUnit.SECONDS);
+                .rescheduleAfter(OBSERVE_REFRESH_SECONDS, TimeUnit.SECONDS);
     }
 
-    private boolean reconcileFlinkDeployment(
+    private boolean specChanged(FlinkDeployment flinkApp) {
+        return !flinkApp.getSpec()
+                .equals(flinkApp.getStatus().getReconciliationStatus().getLastReconciledSpec());
+    }
+
+    private void reconcileFlinkDeployment(
             String operatorNamespace, FlinkDeployment flinkApp, Configuration effectiveConfig)
             throws Exception {
-        return flinkApp.getSpec().getJob() == null
-                ? sessionReconciler.reconcile(operatorNamespace, flinkApp, effectiveConfig)
-                : jobReconciler.reconcile(operatorNamespace, flinkApp, effectiveConfig);
+
+        if (flinkApp.getSpec().getJob() == null) {
+            sessionReconciler.reconcile(operatorNamespace, flinkApp, effectiveConfig);
+        } else {
+            jobReconciler.reconcile(operatorNamespace, flinkApp, effectiveConfig);
+        }
+    }
+
+    private void updateForReconciliationSuccess(FlinkDeployment flinkApp) {
+        ReconciliationStatus reconciliationStatus = flinkApp.getStatus().getReconciliationStatus();
+        reconciliationStatus.setSuccess(true);
+        reconciliationStatus.setError(null);
+        reconciliationStatus.setLastReconciledSpec(flinkApp.getSpec());
+    }
+
+    private void updateForReconciliationError(FlinkDeployment flinkApp, String err) {
+        ReconciliationStatus reconciliationStatus = flinkApp.getStatus().getReconciliationStatus();

Review comment:
       Should we also track the attempt count? Could also be a metric (or both).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] tweise commented on a change in pull request #5: [FLINK-26135] Introduce ReconciliationStatus and improve error handling in controller flow

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #5:
URL: https://github.com/apache/flink-kubernetes-operator/pull/5#discussion_r809671897



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
##########
@@ -90,37 +94,75 @@ public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
     @Override
     public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Context context) {
         LOG.info("Reconciling {}", flinkApp.getMetadata().getName());
+        if (flinkApp.getStatus() == null) {
+            flinkApp.setStatus(new FlinkDeploymentStatus());
+        }
 
         Configuration effectiveConfig = FlinkUtils.getEffectiveConfig(flinkApp);
 
-        boolean success = observer.observeFlinkJobStatus(flinkApp, effectiveConfig);
-        if (success) {
-            try {
-                success = reconcileFlinkDeployment(operatorNamespace, flinkApp, effectiveConfig);
-            } catch (Exception e) {
-                throw new RuntimeException(
-                        "Error while reconciling deployment change for "
-                                + flinkApp.getMetadata().getName(),
-                        e);
-            }
-        }
+        boolean successfulObserve = observer.observeFlinkJobStatus(flinkApp, effectiveConfig);
 
-        if (!success) {
+        if (!successfulObserve) {
+            // Cluster not accessible let's retry
             return UpdateControl.<FlinkDeployment>noUpdate()
-                    .rescheduleAfter(JOB_REFRESH_SECONDS, TimeUnit.SECONDS);
+                    .rescheduleAfter(OBSERVE_REFRESH_SECONDS, TimeUnit.SECONDS);
+        }
+
+        if (!specChanged(flinkApp)) {
+            // Successfully observed the cluster after reconciliation, no need to reschedule
+            return UpdateControl.updateStatus(flinkApp);
+        }
+
+        try {
+            reconcileFlinkDeployment(operatorNamespace, flinkApp, effectiveConfig);
+        } catch (Exception e) {
+            String err = "Error while reconciling deployment change: " + e.getMessage();
+            String lastErr = flinkApp.getStatus().getReconciliationStatus().getError();
+            if (!err.equals(lastErr)) {
+                // Log new errors on the first instance
+                LOG.error("Error while reconciling deployment change", e);
+                updateForReconciliationError(flinkApp, err);
+                return UpdateControl.updateStatus(flinkApp)
+                        .rescheduleAfter(RECONCILE_ERROR_REFRESH_SECONDS, TimeUnit.SECONDS);
+            } else {
+                return UpdateControl.<FlinkDeployment>noUpdate()
+                        .rescheduleAfter(RECONCILE_ERROR_REFRESH_SECONDS, TimeUnit.SECONDS);

Review comment:
       I think we will need to cap the attempts and move to a deploy failed status when exhausted.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org