You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/02/23 09:32:49 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-26236] Track and cap retries in ReconciliationStatus

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 7198c1d  [FLINK-26236] Track and cap retries in ReconciliationStatus
7198c1d is described below

commit 7198c1d36da1e35262057825341fe85639dcd18f
Author: Matyas Orhidi <ma...@apple.com>
AuthorDate: Mon Feb 21 22:22:28 2022 +0100

    [FLINK-26236] Track and cap retries in ReconciliationStatus
    
    Closes #13
---
 .../controller/FlinkDeploymentController.java      | 69 ++++++++--------------
 .../kubernetes/operator/crd/FlinkDeployment.java   |  8 ++-
 .../exception/ReconciliationException.java         | 34 +++++++++++
 .../operator/observer/JobStatusObserver.java       | 35 ++++-------
 .../controller/FlinkDeploymentControllerTest.java  | 20 ++-----
 .../operator/observer/JobStatusObserverTest.java   |  4 +-
 6 files changed, 84 insertions(+), 86 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index dbafd7d..3ab2638 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -19,8 +19,9 @@ package org.apache.flink.kubernetes.operator.controller;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
+import org.apache.flink.kubernetes.operator.exception.InvalidDeploymentException;
+import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
 import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
 import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
 import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
@@ -47,15 +48,14 @@ import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
 /** Controller that runs the main reconcile loop for Flink deployments. */
-@ControllerConfiguration
+@ControllerConfiguration(generationAwareEventProcessing = false)
 public class FlinkDeploymentController
         implements Reconciler<FlinkDeployment>,
                 ErrorStatusHandler<FlinkDeployment>,
                 EventSourceInitializer<FlinkDeployment> {
     private static final Logger LOG = LoggerFactory.getLogger(FlinkDeploymentController.class);
 
-    public static final int OBSERVE_REFRESH_SECONDS = 10;
-    public static final int RECONCILE_ERROR_REFRESH_SECONDS = 5;
+    public static final int REFRESH_SECONDS = 60;
 
     private final KubernetesClient kubernetesClient;
 
@@ -94,51 +94,23 @@ public class FlinkDeploymentController
     @Override
     public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Context context) {
         LOG.info("Reconciling {}", flinkApp.getMetadata().getName());
-        if (flinkApp.getStatus() == null) {
-            flinkApp.setStatus(new FlinkDeploymentStatus());
-        }
 
         Configuration effectiveConfig = FlinkUtils.getEffectiveConfig(flinkApp);
-
-        boolean successfulObserve = observer.observeFlinkJobStatus(flinkApp, effectiveConfig);
-
-        if (!successfulObserve) {
-            // Cluster not accessible let's retry
-            return UpdateControl.<FlinkDeployment>noUpdate()
-                    .rescheduleAfter(OBSERVE_REFRESH_SECONDS, TimeUnit.SECONDS);
-        }
-
-        if (!specChanged(flinkApp)) {
-            // Successfully observed the cluster after reconciliation, no need to reschedule
-            return UpdateControl.updateStatus(flinkApp);
-        }
-
         try {
-            reconcileFlinkDeployment(operatorNamespace, flinkApp, effectiveConfig);
-        } catch (Exception e) {
-            String err = "Error while reconciling deployment change: " + e.getMessage();
-            String lastErr = flinkApp.getStatus().getReconciliationStatus().getError();
-            if (!err.equals(lastErr)) {
-                // Log new errors on the first instance
-                LOG.error("Error while reconciling deployment change", e);
-                updateForReconciliationError(flinkApp, err);
-                return UpdateControl.updateStatus(flinkApp)
-                        .rescheduleAfter(RECONCILE_ERROR_REFRESH_SECONDS, TimeUnit.SECONDS);
-            } else {
-                return UpdateControl.<FlinkDeployment>noUpdate()
-                        .rescheduleAfter(RECONCILE_ERROR_REFRESH_SECONDS, TimeUnit.SECONDS);
+            boolean successfulObserve = observer.observeFlinkJobStatus(flinkApp, effectiveConfig);
+            if (successfulObserve) {
+                reconcileFlinkDeployment(operatorNamespace, flinkApp, effectiveConfig);
+                updateForReconciliationSuccess(flinkApp);
             }
+        } catch (InvalidDeploymentException ide) {
+            LOG.error("Reconciliation failed", ide);
+            updateForReconciliationError(flinkApp, ide.getMessage());
+            return UpdateControl.updateStatus(flinkApp);
+        } catch (Exception e) {
+            throw new ReconciliationException(e);
         }
-
-        // Everything went well, update status and reschedule for observation
-        updateForReconciliationSuccess(flinkApp);
         return UpdateControl.updateStatus(flinkApp)
-                .rescheduleAfter(OBSERVE_REFRESH_SECONDS, TimeUnit.SECONDS);
-    }
-
-    private boolean specChanged(FlinkDeployment flinkApp) {
-        return !flinkApp.getSpec()
-                .equals(flinkApp.getStatus().getReconciliationStatus().getLastReconciledSpec());
+                .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
     }
 
     private void reconcileFlinkDeployment(
@@ -178,7 +150,14 @@ public class FlinkDeploymentController
     @Override
     public Optional<FlinkDeployment> updateErrorStatus(
             FlinkDeployment flinkApp, RetryInfo retryInfo, RuntimeException e) {
-        LOG.warn("TODO: handle error status");
-        return Optional.empty();
+        LOG.warn(
+                "attempt count: {}, last attempt: {}",
+                retryInfo.getAttemptCount(),
+                retryInfo.isLastAttempt());
+
+        updateForReconciliationError(
+                flinkApp,
+                (e instanceof ReconciliationException) ? e.getCause().toString() : e.toString());
+        return Optional.of(flinkApp);
     }
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkDeployment.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkDeployment.java
index 7a01bb8..2eb1bbf 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkDeployment.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkDeployment.java
@@ -35,4 +35,10 @@ import io.fabric8.kubernetes.model.annotation.Version;
 @Version("v1alpha1")
 @ShortNames({"flinkdep"})
 public class FlinkDeployment extends CustomResource<FlinkDeploymentSpec, FlinkDeploymentStatus>
-        implements Namespaced {}
+        implements Namespaced {
+
+    @Override
+    protected FlinkDeploymentStatus initStatus() {
+        return new FlinkDeploymentStatus();
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/ReconciliationException.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/ReconciliationException.java
new file mode 100644
index 0000000..742494b
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/ReconciliationException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.exception;
+
+/** Exception for wrapping reconciliation errors. */
+public class ReconciliationException extends RuntimeException {
+
+    public ReconciliationException(Throwable cause) {
+        super(cause);
+    }
+
+    public ReconciliationException(String msg) {
+        super(msg);
+    }
+
+    public ReconciliationException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
index 6297365..c0dabcb 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
@@ -46,7 +46,8 @@ public class JobStatusObserver {
         this.flinkService = flinkService;
     }
 
-    public boolean observeFlinkJobStatus(FlinkDeployment flinkApp, Configuration effectiveConfig) {
+    public boolean observeFlinkJobStatus(FlinkDeployment flinkApp, Configuration effectiveConfig)
+            throws Exception {
         FlinkDeploymentSpec lastReconciledSpec =
                 flinkApp.getStatus().getReconciliationStatus().getLastReconciledSpec();
 
@@ -68,31 +69,17 @@ public class JobStatusObserver {
         }
         LOG.info("Getting job statuses for {}", flinkApp.getMetadata().getName());
         FlinkDeploymentStatus flinkAppStatus = flinkApp.getStatus();
-        try {
-            Collection<JobStatusMessage> clusterJobStatuses =
-                    flinkService.listJobs(effectiveConfig);
-            if (clusterJobStatuses.isEmpty()) {
-                LOG.info("No jobs found on {} yet, retrying...", flinkApp.getMetadata().getName());
-                return false;
-            } else {
-                flinkAppStatus.setJobStatus(
-                        mergeJobStatus(
-                                flinkAppStatus.getJobStatus(),
-                                new ArrayList<>(clusterJobStatuses)));
-                LOG.info("Job statuses updated for {}", flinkApp.getMetadata().getName());
-                return true;
-            }
-
-        } catch (Exception e) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Failed to list jobs for {}", flinkApp, e);
-            } else {
-                LOG.warn(
-                        "Failed to list jobs for {}, retrying...",
-                        flinkApp.getMetadata().getName());
-            }
 
+        Collection<JobStatusMessage> clusterJobStatuses = flinkService.listJobs(effectiveConfig);
+        if (clusterJobStatuses.isEmpty()) {
+            LOG.info("No jobs found on {} yet", flinkApp.getMetadata().getName());
             return false;
+        } else {
+            flinkAppStatus.setJobStatus(
+                    mergeJobStatus(
+                            flinkAppStatus.getJobStatus(), new ArrayList<>(clusterJobStatuses)));
+            LOG.info("Job statuses updated for {}", flinkApp.getMetadata().getName());
+            return true;
         }
     }
 
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 3b4ae63..730bc80 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -51,7 +51,7 @@ public class FlinkDeploymentControllerTest {
         updateControl = testController.reconcile(appCluster, null);
         assertTrue(updateControl.isUpdateStatus());
         assertEquals(
-                FlinkDeploymentController.OBSERVE_REFRESH_SECONDS * 1000,
+                FlinkDeploymentController.REFRESH_SECONDS * 1000,
                 (long) updateControl.getScheduleDelay().get());
 
         // Validate reconciliation status
@@ -63,7 +63,9 @@ public class FlinkDeploymentControllerTest {
 
         updateControl = testController.reconcile(appCluster, null);
         assertTrue(updateControl.isUpdateStatus());
-        assertFalse(updateControl.getScheduleDelay().isPresent());
+        assertEquals(
+                FlinkDeploymentController.REFRESH_SECONDS * 1000,
+                (long) updateControl.getScheduleDelay().get());
 
         // Validate job status
         JobStatus jobStatus = appCluster.getStatus().getJobStatus();
@@ -77,23 +79,13 @@ public class FlinkDeploymentControllerTest {
         appCluster.getSpec().setJob(null);
         updateControl = testController.reconcile(appCluster, null);
         assertTrue(updateControl.isUpdateStatus());
-        assertEquals(
-                FlinkDeploymentController.RECONCILE_ERROR_REFRESH_SECONDS * 1000,
-                (long) updateControl.getScheduleDelay().get());
+        assertFalse(updateControl.getScheduleDelay().isPresent());
 
         reconciliationStatus = appCluster.getStatus().getReconciliationStatus();
         assertFalse(reconciliationStatus.isSuccess());
-        assertEquals(
-                "Error while reconciling deployment change: Cannot switch from job to session cluster",
-                reconciliationStatus.getError());
+        assertEquals("Cannot switch from job to session cluster", reconciliationStatus.getError());
         assertNotNull(reconciliationStatus.getLastReconciledSpec().getJob());
 
-        updateControl = testController.reconcile(appCluster, null);
-        assertTrue(updateControl.isNoUpdate());
-        assertEquals(
-                FlinkDeploymentController.RECONCILE_ERROR_REFRESH_SECONDS * 1000,
-                (long) updateControl.getScheduleDelay().get());
-
         // Validate job status correct even with error
         jobStatus = appCluster.getStatus().getJobStatus();
         expectedJobStatus = flinkService.listJobs().get(0).f1;
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
index 48f7b9e..07a28f6 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
@@ -36,7 +36,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public class JobStatusObserverTest {
 
     @Test
-    public void observeSessionCluster() {
+    public void observeSessionCluster() throws Exception {
         FlinkService flinkService = new TestingFlinkService();
         JobStatusObserver observer = new JobStatusObserver(flinkService);
         FlinkDeployment deployment = TestUtils.buildSessionCluster();
@@ -50,7 +50,7 @@ public class JobStatusObserverTest {
     }
 
     @Test
-    public void observeApplicationCluster() {
+    public void observeApplicationCluster() throws Exception {
         TestingFlinkService flinkService = new TestingFlinkService();
         JobStatusObserver observer = new JobStatusObserver(flinkService);
         FlinkDeployment deployment = TestUtils.buildApplicationCluster();