You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/07/05 18:48:39 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-28389] Correct spec and status updates in FlinkDeploymentControllerTest

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new d75e45e  [FLINK-28389] Correct spec and status updates in FlinkDeploymentControllerTest
d75e45e is described below

commit d75e45e45600f3c550b671d4883fb5ed9dcd96dc
Author: Matyas Orhidi <ma...@apple.com>
AuthorDate: Tue Jul 5 11:29:16 2022 +0200

    [FLINK-28389] Correct spec and status updates in FlinkDeploymentControllerTest
---
 .../flink/kubernetes/operator/TestUtils.java       |  31 -----
 .../controller/DeploymentRecoveryTest.java         |   9 +-
 .../controller/FlinkDeploymentControllerTest.java  |  26 ++--
 .../operator/controller/RollbackTest.java          |   8 +-
 .../TestingFlinkDeploymentController.java          | 147 +++++++++++++++++++++
 5 files changed, 163 insertions(+), 58 deletions(-)

diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index cdbaa8e..f8b6290 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -21,8 +21,6 @@ import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
-import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
-import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
@@ -38,13 +36,6 @@ import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
-import org.apache.flink.kubernetes.operator.metrics.MetricManager;
-import org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory;
-import org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
-import org.apache.flink.kubernetes.operator.utils.EventRecorder;
-import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
-import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
-import org.apache.flink.metrics.testutils.MetricListener;
 
 import io.fabric8.kubernetes.api.model.Container;
 import io.fabric8.kubernetes.api.model.ContainerStatus;
@@ -60,7 +51,6 @@ import io.fabric8.kubernetes.api.model.apps.Deployment;
 import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
 import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
 import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
-import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.mockwebserver.utils.ResponseProvider;
 import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
@@ -431,27 +421,6 @@ public class TestUtils {
         }
     }
 
-    public static FlinkDeploymentController createTestController(
-            FlinkConfigManager configManager,
-            KubernetesClient kubernetesClient,
-            TestingFlinkService flinkService,
-            StatusRecorder statusRecorder) {
-        var eventRecorder = new EventRecorder(kubernetesClient, (r, e) -> {});
-        return new FlinkDeploymentController(
-                configManager,
-                ValidatorUtils.discoverValidators(configManager),
-                new ReconcilerFactory(
-                        kubernetesClient,
-                        flinkService,
-                        configManager,
-                        eventRecorder,
-                        statusRecorder),
-                new ObserverFactory(flinkService, configManager, statusRecorder, eventRecorder),
-                new MetricManager<>(new MetricListener().getMetricGroup()),
-                statusRecorder,
-                eventRecorder);
-    }
-
     /** Testing ResponseProvider. */
     public static class ValidatingResponseProvider<T> implements ResponseProvider<Object> {
 
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java
index 1425dcd..7ecd28f 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
-import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
@@ -52,7 +51,7 @@ public class DeploymentRecoveryTest {
 
     private TestingFlinkService flinkService;
     private Context context;
-    private FlinkDeploymentController testController;
+    private TestingFlinkDeploymentController testController;
 
     private KubernetesClient kubernetesClient;
 
@@ -61,11 +60,7 @@ public class DeploymentRecoveryTest {
         flinkService = new TestingFlinkService(kubernetesClient);
         context = flinkService.getContext();
         testController =
-                TestUtils.createTestController(
-                        configManager,
-                        kubernetesClient,
-                        flinkService,
-                        new StatusRecorder<>(kubernetesClient, (a, c) -> {}));
+                new TestingFlinkDeploymentController(configManager, kubernetesClient, flinkService);
         kubernetesClient.resource(TestUtils.buildApplicationCluster()).createOrReplace();
     }
 
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 788b770..54e0ace 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -40,7 +40,6 @@ import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler;
 import org.apache.flink.kubernetes.operator.utils.IngressUtils;
-import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 import org.apache.flink.runtime.client.JobStatusMessage;
 
 import io.fabric8.kubernetes.api.model.EventBuilder;
@@ -75,7 +74,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.params.provider.Arguments.arguments;
 
-/** @link FlinkDeploymentController tests */
+/** {@link FlinkDeploymentController} tests. */
 @EnableKubernetesMockClient(crud = true)
 public class FlinkDeploymentControllerTest {
 
@@ -83,22 +82,18 @@ public class FlinkDeploymentControllerTest {
 
     private TestingFlinkService flinkService;
     private Context context;
-    private FlinkDeploymentController testController;
+    private TestingFlinkDeploymentController testController;
 
     private KubernetesMockServer mockServer;
     private KubernetesClient kubernetesClient;
-    private StatusUpdateCounter statusUpdateCounter = new StatusUpdateCounter();
 
     @BeforeEach
     public void setup() {
         flinkService = new TestingFlinkService(kubernetesClient);
         context = flinkService.getContext();
+
         testController =
-                TestUtils.createTestController(
-                        configManager,
-                        kubernetesClient,
-                        flinkService,
-                        new StatusRecorder<>(kubernetesClient, statusUpdateCounter));
+                new TestingFlinkDeploymentController(configManager, kubernetesClient, flinkService);
         kubernetesClient.resource(TestUtils.buildApplicationCluster()).createOrReplace();
     }
 
@@ -121,7 +116,7 @@ public class FlinkDeploymentControllerTest {
         assertEquals(
                 org.apache.flink.api.common.JobStatus.RECONCILING.name(),
                 appCluster.getStatus().getJobStatus().getState());
-        assertEquals(2, statusUpdateCounter.getCount());
+        assertEquals(2, testController.getInternalStatusUpdateCount());
         assertFalse(updateControl.isUpdateStatus());
         assertEquals(
                 Optional.of(
@@ -145,7 +140,7 @@ public class FlinkDeploymentControllerTest {
         assertEquals(
                 org.apache.flink.api.common.JobStatus.RECONCILING.name(),
                 appCluster.getStatus().getJobStatus().getState());
-        assertEquals(3, statusUpdateCounter.getCount());
+        assertEquals(3, testController.getInternalStatusUpdateCount());
         assertFalse(updateControl.isUpdateStatus());
         assertEquals(
                 Optional.of(
@@ -159,7 +154,7 @@ public class FlinkDeploymentControllerTest {
         assertEquals(
                 org.apache.flink.api.common.JobStatus.RUNNING.name(),
                 appCluster.getStatus().getJobStatus().getState());
-        assertEquals(4, statusUpdateCounter.getCount());
+        assertEquals(4, testController.getInternalStatusUpdateCount());
         assertFalse(updateControl.isUpdateStatus());
         assertEquals(
                 Optional.of(
@@ -174,7 +169,7 @@ public class FlinkDeploymentControllerTest {
         assertEquals(
                 org.apache.flink.api.common.JobStatus.RUNNING.name(),
                 appCluster.getStatus().getJobStatus().getState());
-        assertEquals(4, statusUpdateCounter.getCount());
+        assertEquals(4, testController.getInternalStatusUpdateCount());
         assertFalse(updateControl.isUpdateStatus());
         assertEquals(
                 Optional.of(
@@ -200,7 +195,7 @@ public class FlinkDeploymentControllerTest {
         assertEquals(
                 org.apache.flink.api.common.JobStatus.RUNNING.name(),
                 appCluster.getStatus().getJobStatus().getState());
-        assertEquals(5, statusUpdateCounter.getCount());
+        assertEquals(5, testController.getInternalStatusUpdateCount());
         assertFalse(updateControl.isUpdateStatus());
 
         reconciliationStatus = appCluster.getStatus().getReconciliationStatus();
@@ -408,7 +403,7 @@ public class FlinkDeploymentControllerTest {
                         .getSavepointInfo()
                         .getSavepointHistory()
                         .isEmpty());
-        assertEquals(0, testController.reconcile(appCluster, context).getScheduleDelay().get());
+        assertEquals(0L, testController.reconcile(appCluster, context).getScheduleDelay().get());
         assertEquals(
                 JobState.SUSPENDED,
                 appCluster
@@ -564,6 +559,7 @@ public class FlinkDeploymentControllerTest {
         var appCluster = TestUtils.buildApplicationCluster(flinkVersion);
         appCluster.getSpec().getJob().setUpgradeMode(upgradeMode);
         testUpgradeNotReadyCluster(ReconciliationUtils.clone(appCluster));
+        assertEquals(upgradeMode, appCluster.getSpec().getJob().getUpgradeMode());
     }
 
     @Test
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
index b591c7e..92ead48 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
@@ -31,7 +31,6 @@ import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatu
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
 import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
 import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
-import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 import org.apache.flink.util.function.ThrowingRunnable;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
@@ -59,7 +58,7 @@ public class RollbackTest {
     private TestingFlinkService flinkService;
     private Context context;
 
-    private FlinkDeploymentController testController;
+    private TestingFlinkDeploymentController testController;
 
     private KubernetesClient kubernetesClient;
 
@@ -68,11 +67,10 @@ public class RollbackTest {
         flinkService = new TestingFlinkService(kubernetesClient);
         context = flinkService.getContext();
         testController =
-                TestUtils.createTestController(
+                new TestingFlinkDeploymentController(
                         new FlinkConfigManager(new Configuration()),
                         kubernetesClient,
-                        flinkService,
-                        new StatusRecorder<>(kubernetesClient, (a, c) -> {}));
+                        flinkService);
         kubernetesClient.resource(TestUtils.buildApplicationCluster()).createOrReplace();
     }
 
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
new file mode 100644
index 0000000..d3909b4
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller;
+
+import org.apache.flink.kubernetes.operator.TestingFlinkService;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.metrics.MetricManager;
+import org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
+import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
+import org.apache.flink.metrics.testutils.MetricListener;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Cleaner;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
+import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl;
+import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
+import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
+import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import io.javaoperatorsdk.operator.processing.event.source.EventSource;
+import org.junit.jupiter.api.Assertions;
+
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+/** A wrapper around {@link FlinkDeploymentController} used by unit tests. */
+public class TestingFlinkDeploymentController
+        implements Reconciler<FlinkDeployment>,
+                ErrorStatusHandler<FlinkDeployment>,
+                EventSourceInitializer<FlinkDeployment>,
+                Cleaner<FlinkDeployment> {
+
+    private FlinkDeploymentController flinkDeploymentController;
+    private StatusUpdateCounter statusUpdateCounter = new StatusUpdateCounter();
+    private EventRecorder eventRecorder;
+    private StatusRecorder statusRecorder;
+
+    public TestingFlinkDeploymentController(
+            FlinkConfigManager configManager,
+            KubernetesClient kubernetesClient,
+            TestingFlinkService flinkService) {
+        eventRecorder = new EventRecorder(kubernetesClient, (r, e) -> {});
+        statusRecorder = new StatusRecorder<>(kubernetesClient, statusUpdateCounter);
+
+        flinkDeploymentController =
+                new FlinkDeploymentController(
+                        configManager,
+                        ValidatorUtils.discoverValidators(configManager),
+                        new ReconcilerFactory(
+                                kubernetesClient,
+                                flinkService,
+                                configManager,
+                                eventRecorder,
+                                statusRecorder),
+                        new ObserverFactory(
+                                flinkService, configManager, statusRecorder, eventRecorder),
+                        new MetricManager<>(new MetricListener().getMetricGroup()),
+                        statusRecorder,
+                        eventRecorder);
+    }
+
+    @Override
+    public UpdateControl<FlinkDeployment> reconcile(
+            FlinkDeployment flinkDeployment, Context<FlinkDeployment> context) throws Exception {
+        FlinkDeployment cloned = ReconciliationUtils.clone(flinkDeployment);
+        statusUpdateCounter.setCurrent(flinkDeployment);
+        UpdateControl<FlinkDeployment> updateControl =
+                flinkDeploymentController.reconcile(cloned, context);
+        Assertions.assertTrue(updateControl.isNoUpdate());
+        return updateControl;
+    }
+
+    @Override
+    public DeleteControl cleanup(
+            FlinkDeployment flinkDeployment, Context<FlinkDeployment> context) {
+        FlinkDeployment cloned = ReconciliationUtils.clone(flinkDeployment);
+        statusUpdateCounter.setCurrent(flinkDeployment);
+        return flinkDeploymentController.cleanup(cloned, context);
+    }
+
+    @Override
+    public ErrorStatusUpdateControl<FlinkDeployment> updateErrorStatus(
+            FlinkDeployment flinkDeployment, Context<FlinkDeployment> context, Exception e) {
+        FlinkDeployment cloned = ReconciliationUtils.clone(flinkDeployment);
+        statusUpdateCounter.setCurrent(flinkDeployment);
+        return flinkDeploymentController.updateErrorStatus(cloned, context, e);
+    }
+
+    @Override
+    public Map<String, EventSource> prepareEventSources(
+            EventSourceContext<FlinkDeployment> eventSourceContext) {
+        throw new UnsupportedOperationException();
+    }
+
+    private static class StatusUpdateCounter
+            implements BiConsumer<
+                    AbstractFlinkResource<?, FlinkDeploymentStatus>, FlinkDeploymentStatus> {
+
+        private FlinkDeployment currentResource;
+        private int counter;
+
+        @Override
+        public void accept(
+                AbstractFlinkResource<?, FlinkDeploymentStatus>
+                        flinkDeploymentStatusAbstractFlinkResource,
+                FlinkDeploymentStatus flinkDeploymentStatus) {
+            currentResource.setStatus(flinkDeploymentStatusAbstractFlinkResource.getStatus());
+            counter++;
+        }
+
+        public void setCurrent(FlinkDeployment currentResource) {
+            this.currentResource = currentResource;
+        }
+
+        public int getCount() {
+            return counter;
+        }
+    }
+
+    public int getInternalStatusUpdateCount() {
+        return statusUpdateCounter.getCount();
+    }
+}