You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/07/03 10:31:17 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-28331] Persist status after every observe loop

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new a2db6c3  [FLINK-28331] Persist status after every observe loop
a2db6c3 is described below

commit a2db6c31e21d79715324ed45d7138cb8f6d6149e
Author: Matyas Orhidi <ma...@apple.com>
AuthorDate: Fri Jul 1 14:24:33 2022 +0200

    [FLINK-28331] Persist status after every observe loop
---
 .../controller/FlinkDeploymentController.java      |  1 +
 .../controller/FlinkSessionJobController.java      |  1 +
 .../operator/observer/SavepointObserver.java       | 30 --------
 .../flink/kubernetes/operator/TestUtils.java       |  6 +-
 .../controller/DeploymentRecoveryTest.java         |  7 +-
 .../controller/FlinkDeploymentControllerTest.java  | 79 +++++++++++++++++++++-
 .../operator/controller/RollbackTest.java          |  4 +-
 7 files changed, 90 insertions(+), 38 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index d36f96b..d75d29c 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -117,6 +117,7 @@ public class FlinkDeploymentController
                         previousDeployment,
                         false);
             }
+            statusRecorder.patchAndCacheStatus(flinkApp);
             reconcilerFactory.getOrCreate(flinkApp).reconcile(flinkApp, context);
         } catch (DeploymentFailedException dfe) {
             handleDeploymentFailed(flinkApp, dfe);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
index 30ff1ea..7f2f6bd 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
@@ -96,6 +96,7 @@ public class FlinkSessionJobController
         }
 
         try {
+            statusRecorder.patchAndCacheStatus(flinkSessionJob);
             reconciler.reconcile(flinkSessionJob, context);
         } catch (Exception e) {
             throw new ReconciliationException(e);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java
index 74f3d35..aa3590a 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java
@@ -40,7 +40,6 @@ import org.slf4j.LoggerFactory;
 import java.time.Duration;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Optional;
 
 /** An observer of savepoint progress. */
 public class SavepointObserver<STATUS extends CommonStatus<?>> {
@@ -69,10 +68,6 @@ public class SavepointObserver<STATUS extends CommonStatus<?>> {
         var jobStatus = resource.getStatus().getJobStatus();
         var savepointInfo = jobStatus.getSavepointInfo();
         var jobId = jobStatus.getJobId();
-        var previousLastSpPath =
-                Optional.ofNullable(savepointInfo.getLastSavepoint())
-                        .map(Savepoint::getLocation)
-                        .orElse(null);
 
         // If any manual or periodic savepoint is in progress, observe it
         if (SavepointUtils.savepointInProgress(jobStatus)) {
@@ -83,8 +78,6 @@ public class SavepointObserver<STATUS extends CommonStatus<?>> {
         if (ReconciliationUtils.isJobInTerminalState(resource.getStatus())) {
             observeLatestSavepoint(savepointInfo, jobId, deployedConfig);
         }
-
-        patchStatusOnSavepointChange(resource, savepointInfo, previousLastSpPath);
     }
 
     /**
@@ -201,27 +194,4 @@ public class SavepointObserver<STATUS extends CommonStatus<?>> {
             throw new ReconciliationException(e);
         }
     }
-
-    /**
-     * Patch the Kubernetes Flink resource status if we observed a new last savepoint. This is
-     * crucial to not lose this information once the reconciler shuts down the cluster.
-     */
-    private void patchStatusOnSavepointChange(
-            AbstractFlinkResource<?, STATUS> resource,
-            SavepointInfo savepointInfo,
-            String previousLastSpPath) {
-        var currentLastSpPath =
-                Optional.ofNullable(savepointInfo.getLastSavepoint())
-                        .map(Savepoint::getLocation)
-                        .orElse(null);
-
-        // If the last savepoint information changes we need to patch the status
-        // to avoid losing this in case of an operator failure after the cluster was shut down
-        if (currentLastSpPath != null && !currentLastSpPath.equals(previousLastSpPath)) {
-            LOG.info(
-                    "Updating resource status after observing new last savepoint {}",
-                    currentLastSpPath);
-            statusRecorder.patchAndCacheStatus(resource);
-        }
-    }
 }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index 034904d..cdbaa8e 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -434,10 +434,8 @@ public class TestUtils {
     public static FlinkDeploymentController createTestController(
             FlinkConfigManager configManager,
             KubernetesClient kubernetesClient,
-            TestingFlinkService flinkService) {
-
-        var statusRecorder =
-                new StatusRecorder<FlinkDeploymentStatus>(kubernetesClient, (r, s) -> {});
+            TestingFlinkService flinkService,
+            StatusRecorder statusRecorder) {
         var eventRecorder = new EventRecorder(kubernetesClient, (r, e) -> {});
         return new FlinkDeploymentController(
                 configManager,
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java
index a9d59fe..1425dcd 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
@@ -60,7 +61,11 @@ public class DeploymentRecoveryTest {
         flinkService = new TestingFlinkService(kubernetesClient);
         context = flinkService.getContext();
         testController =
-                TestUtils.createTestController(configManager, kubernetesClient, flinkService);
+                TestUtils.createTestController(
+                        configManager,
+                        kubernetesClient,
+                        flinkService,
+                        new StatusRecorder<>(kubernetesClient, (a, c) -> {}));
         kubernetesClient.resource(TestUtils.buildApplicationCluster()).createOrReplace();
     }
 
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index b7c1f1c..c809418 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -24,12 +24,14 @@ import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.crd.spec.IngressSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
@@ -38,6 +40,7 @@ import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler;
 import org.apache.flink.kubernetes.operator.utils.IngressUtils;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 import org.apache.flink.runtime.client.JobStatusMessage;
 
 import io.fabric8.kubernetes.api.model.EventBuilder;
@@ -60,6 +63,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.function.BiConsumer;
 import java.util.stream.Stream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -82,24 +86,41 @@ public class FlinkDeploymentControllerTest {
 
     private KubernetesMockServer mockServer;
     private KubernetesClient kubernetesClient;
+    private StatusUpdateCounter statusUpdateCounter = new StatusUpdateCounter();
 
     @BeforeEach
     public void setup() {
         flinkService = new TestingFlinkService(kubernetesClient);
         context = flinkService.getContext();
         testController =
-                TestUtils.createTestController(configManager, kubernetesClient, flinkService);
+                TestUtils.createTestController(
+                        configManager,
+                        kubernetesClient,
+                        flinkService,
+                        new StatusRecorder<>(kubernetesClient, statusUpdateCounter));
         kubernetesClient.resource(TestUtils.buildApplicationCluster()).createOrReplace();
     }
 
     @ParameterizedTest
     @EnumSource(FlinkVersion.class)
     public void verifyBasicReconcileLoop(FlinkVersion flinkVersion) throws Exception {
-        FlinkDeployment appCluster = TestUtils.buildApplicationCluster(flinkVersion);
 
         UpdateControl<FlinkDeployment> updateControl;
 
+        FlinkDeployment appCluster = TestUtils.buildApplicationCluster(FlinkVersion.v1_16);
+        assertEquals(
+                JobManagerDeploymentStatus.MISSING,
+                appCluster.getStatus().getJobManagerDeploymentStatus());
+        assertNull(appCluster.getStatus().getJobStatus().getState());
+
         updateControl = testController.reconcile(appCluster, context);
+        assertEquals(
+                JobManagerDeploymentStatus.DEPLOYING,
+                appCluster.getStatus().getJobManagerDeploymentStatus());
+        assertEquals(
+                org.apache.flink.api.common.JobStatus.RECONCILING.name(),
+                appCluster.getStatus().getJobStatus().getState());
+        assertEquals(2, statusUpdateCounter.getCount());
         assertFalse(updateControl.isUpdateStatus());
         assertEquals(
                 Optional.of(
@@ -117,6 +138,13 @@ public class FlinkDeploymentControllerTest {
         assertNull(appCluster.getStatus().getReconciliationStatus().getLastStableSpec());
 
         updateControl = testController.reconcile(appCluster, context);
+        assertEquals(
+                JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
+                appCluster.getStatus().getJobManagerDeploymentStatus());
+        assertEquals(
+                org.apache.flink.api.common.JobStatus.RECONCILING.name(),
+                appCluster.getStatus().getJobStatus().getState());
+        assertEquals(3, statusUpdateCounter.getCount());
         assertFalse(updateControl.isUpdateStatus());
         assertEquals(
                 Optional.of(
@@ -124,6 +152,28 @@ public class FlinkDeploymentControllerTest {
                 updateControl.getScheduleDelay());
 
         updateControl = testController.reconcile(appCluster, context);
+        assertEquals(
+                JobManagerDeploymentStatus.READY,
+                appCluster.getStatus().getJobManagerDeploymentStatus());
+        assertEquals(
+                org.apache.flink.api.common.JobStatus.RUNNING.name(),
+                appCluster.getStatus().getJobStatus().getState());
+        assertEquals(4, statusUpdateCounter.getCount());
+        assertFalse(updateControl.isUpdateStatus());
+        assertEquals(
+                Optional.of(
+                        configManager.getOperatorConfiguration().getReconcileInterval().toMillis()),
+                updateControl.getScheduleDelay());
+
+        // Stable loop
+        updateControl = testController.reconcile(appCluster, context);
+        assertEquals(
+                JobManagerDeploymentStatus.READY,
+                appCluster.getStatus().getJobManagerDeploymentStatus());
+        assertEquals(
+                org.apache.flink.api.common.JobStatus.RUNNING.name(),
+                appCluster.getStatus().getJobStatus().getState());
+        assertEquals(4, statusUpdateCounter.getCount());
         assertFalse(updateControl.isUpdateStatus());
         assertEquals(
                 Optional.of(
@@ -143,6 +193,13 @@ public class FlinkDeploymentControllerTest {
         // Send in invalid update
         appCluster.getSpec().setJob(null);
         updateControl = testController.reconcile(appCluster, context);
+        assertEquals(
+                JobManagerDeploymentStatus.READY,
+                appCluster.getStatus().getJobManagerDeploymentStatus());
+        assertEquals(
+                org.apache.flink.api.common.JobStatus.RUNNING.name(),
+                appCluster.getStatus().getJobStatus().getState());
+        assertEquals(5, statusUpdateCounter.getCount());
         assertFalse(updateControl.isUpdateStatus());
 
         reconciliationStatus = appCluster.getStatus().getReconciliationStatus();
@@ -808,4 +865,22 @@ public class FlinkDeploymentControllerTest {
         }
         return args.stream();
     }
+
+    private static class StatusUpdateCounter
+            implements BiConsumer<
+                    AbstractFlinkResource<?, FlinkDeploymentStatus>, FlinkDeploymentStatus> {
+        private int counter;
+
+        @Override
+        public void accept(
+                AbstractFlinkResource<?, FlinkDeploymentStatus>
+                        flinkDeploymentStatusAbstractFlinkResource,
+                FlinkDeploymentStatus flinkDeploymentStatus) {
+            counter++;
+        }
+
+        public int getCount() {
+            return counter;
+        }
+    }
 }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
index 1f53a50..b591c7e 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatu
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
 import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
 import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 import org.apache.flink.util.function.ThrowingRunnable;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
@@ -70,7 +71,8 @@ public class RollbackTest {
                 TestUtils.createTestController(
                         new FlinkConfigManager(new Configuration()),
                         kubernetesClient,
-                        flinkService);
+                        flinkService,
+                        new StatusRecorder<>(kubernetesClient, (a, c) -> {}));
         kubernetesClient.resource(TestUtils.buildApplicationCluster()).createOrReplace();
     }