You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by GitBox <gi...@apache.org> on 2022/02/18 04:48:43 UTC

[GitHub] [flink-kubernetes-operator] tweise commented on a change in pull request #5: [FLINK-26135] Introduce ReconciliationStatus and improve error handling in controller flow

tweise commented on a change in pull request #5:
URL: https://github.com/apache/flink-kubernetes-operator/pull/5#discussion_r809671897



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
##########
@@ -90,37 +94,75 @@ public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
     @Override
     public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Context context) {
         LOG.info("Reconciling {}", flinkApp.getMetadata().getName());
+        if (flinkApp.getStatus() == null) {
+            flinkApp.setStatus(new FlinkDeploymentStatus());
+        }
 
         Configuration effectiveConfig = FlinkUtils.getEffectiveConfig(flinkApp);
 
-        boolean success = observer.observeFlinkJobStatus(flinkApp, effectiveConfig);
-        if (success) {
-            try {
-                success = reconcileFlinkDeployment(operatorNamespace, flinkApp, effectiveConfig);
-            } catch (Exception e) {
-                throw new RuntimeException(
-                        "Error while reconciling deployment change for "
-                                + flinkApp.getMetadata().getName(),
-                        e);
-            }
-        }
+        boolean successfulObserve = observer.observeFlinkJobStatus(flinkApp, effectiveConfig);
 
-        if (!success) {
+        if (!successfulObserve) {
+            // Cluster not accessible let's retry
             return UpdateControl.<FlinkDeployment>noUpdate()
-                    .rescheduleAfter(JOB_REFRESH_SECONDS, TimeUnit.SECONDS);
+                    .rescheduleAfter(OBSERVE_REFRESH_SECONDS, TimeUnit.SECONDS);
+        }
+
+        if (!specChanged(flinkApp)) {
+            // Successfully observed the cluster after reconciliation, no need to reschedule
+            return UpdateControl.updateStatus(flinkApp);
+        }
+
+        try {
+            reconcileFlinkDeployment(operatorNamespace, flinkApp, effectiveConfig);
+        } catch (Exception e) {
+            String err = "Error while reconciling deployment change: " + e.getMessage();
+            String lastErr = flinkApp.getStatus().getReconciliationStatus().getError();
+            if (!err.equals(lastErr)) {
+                // Log new errors on the first instance
+                LOG.error("Error while reconciling deployment change", e);
+                updateForReconciliationError(flinkApp, err);
+                return UpdateControl.updateStatus(flinkApp)
+                        .rescheduleAfter(RECONCILE_ERROR_REFRESH_SECONDS, TimeUnit.SECONDS);
+            } else {
+                return UpdateControl.<FlinkDeployment>noUpdate()
+                        .rescheduleAfter(RECONCILE_ERROR_REFRESH_SECONDS, TimeUnit.SECONDS);

Review comment:
       I think we will need to cap the attempts and move to a deploy failed status when exhausted.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org