You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/07/05 18:48:39 UTC
[flink-kubernetes-operator] branch main updated: [FLINK-28389] Correct spec and status updates in FlinkDeploymentControllerTest
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new d75e45e [FLINK-28389] Correct spec and status updates in FlinkDeploymentControllerTest
d75e45e is described below
commit d75e45e45600f3c550b671d4883fb5ed9dcd96dc
Author: Matyas Orhidi <ma...@apple.com>
AuthorDate: Tue Jul 5 11:29:16 2022 +0200
[FLINK-28389] Correct spec and status updates in FlinkDeploymentControllerTest
---
.../flink/kubernetes/operator/TestUtils.java | 31 -----
.../controller/DeploymentRecoveryTest.java | 9 +-
.../controller/FlinkDeploymentControllerTest.java | 26 ++--
.../operator/controller/RollbackTest.java | 8 +-
.../TestingFlinkDeploymentController.java | 147 +++++++++++++++++++++
5 files changed, 163 insertions(+), 58 deletions(-)
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index cdbaa8e..f8b6290 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -21,8 +21,6 @@ import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
-import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
-import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
@@ -38,13 +36,6 @@ import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
-import org.apache.flink.kubernetes.operator.metrics.MetricManager;
-import org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory;
-import org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
-import org.apache.flink.kubernetes.operator.utils.EventRecorder;
-import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
-import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
-import org.apache.flink.metrics.testutils.MetricListener;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ContainerStatus;
@@ -60,7 +51,6 @@ import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
-import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.mockwebserver.utils.ResponseProvider;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.Context;
@@ -431,27 +421,6 @@ public class TestUtils {
}
}
- public static FlinkDeploymentController createTestController(
- FlinkConfigManager configManager,
- KubernetesClient kubernetesClient,
- TestingFlinkService flinkService,
- StatusRecorder statusRecorder) {
- var eventRecorder = new EventRecorder(kubernetesClient, (r, e) -> {});
- return new FlinkDeploymentController(
- configManager,
- ValidatorUtils.discoverValidators(configManager),
- new ReconcilerFactory(
- kubernetesClient,
- flinkService,
- configManager,
- eventRecorder,
- statusRecorder),
- new ObserverFactory(flinkService, configManager, statusRecorder, eventRecorder),
- new MetricManager<>(new MetricListener().getMetricGroup()),
- statusRecorder,
- eventRecorder);
- }
-
/** Testing ResponseProvider. */
public static class ValidatingResponseProvider<T> implements ResponseProvider<Object> {
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java
index 1425dcd..7ecd28f 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
-import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
@@ -52,7 +51,7 @@ public class DeploymentRecoveryTest {
private TestingFlinkService flinkService;
private Context context;
- private FlinkDeploymentController testController;
+ private TestingFlinkDeploymentController testController;
private KubernetesClient kubernetesClient;
@@ -61,11 +60,7 @@ public class DeploymentRecoveryTest {
flinkService = new TestingFlinkService(kubernetesClient);
context = flinkService.getContext();
testController =
- TestUtils.createTestController(
- configManager,
- kubernetesClient,
- flinkService,
- new StatusRecorder<>(kubernetesClient, (a, c) -> {}));
+ new TestingFlinkDeploymentController(configManager, kubernetesClient, flinkService);
kubernetesClient.resource(TestUtils.buildApplicationCluster()).createOrReplace();
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 788b770..54e0ace 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -40,7 +40,6 @@ import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
-import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.runtime.client.JobStatusMessage;
import io.fabric8.kubernetes.api.model.EventBuilder;
@@ -75,7 +74,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.params.provider.Arguments.arguments;
-/** @link FlinkDeploymentController tests */
+/** {@link FlinkDeploymentController} tests. */
@EnableKubernetesMockClient(crud = true)
public class FlinkDeploymentControllerTest {
@@ -83,22 +82,18 @@ public class FlinkDeploymentControllerTest {
private TestingFlinkService flinkService;
private Context context;
- private FlinkDeploymentController testController;
+ private TestingFlinkDeploymentController testController;
private KubernetesMockServer mockServer;
private KubernetesClient kubernetesClient;
- private StatusUpdateCounter statusUpdateCounter = new StatusUpdateCounter();
@BeforeEach
public void setup() {
flinkService = new TestingFlinkService(kubernetesClient);
context = flinkService.getContext();
+
testController =
- TestUtils.createTestController(
- configManager,
- kubernetesClient,
- flinkService,
- new StatusRecorder<>(kubernetesClient, statusUpdateCounter));
+ new TestingFlinkDeploymentController(configManager, kubernetesClient, flinkService);
kubernetesClient.resource(TestUtils.buildApplicationCluster()).createOrReplace();
}
@@ -121,7 +116,7 @@ public class FlinkDeploymentControllerTest {
assertEquals(
org.apache.flink.api.common.JobStatus.RECONCILING.name(),
appCluster.getStatus().getJobStatus().getState());
- assertEquals(2, statusUpdateCounter.getCount());
+ assertEquals(2, testController.getInternalStatusUpdateCount());
assertFalse(updateControl.isUpdateStatus());
assertEquals(
Optional.of(
@@ -145,7 +140,7 @@ public class FlinkDeploymentControllerTest {
assertEquals(
org.apache.flink.api.common.JobStatus.RECONCILING.name(),
appCluster.getStatus().getJobStatus().getState());
- assertEquals(3, statusUpdateCounter.getCount());
+ assertEquals(3, testController.getInternalStatusUpdateCount());
assertFalse(updateControl.isUpdateStatus());
assertEquals(
Optional.of(
@@ -159,7 +154,7 @@ public class FlinkDeploymentControllerTest {
assertEquals(
org.apache.flink.api.common.JobStatus.RUNNING.name(),
appCluster.getStatus().getJobStatus().getState());
- assertEquals(4, statusUpdateCounter.getCount());
+ assertEquals(4, testController.getInternalStatusUpdateCount());
assertFalse(updateControl.isUpdateStatus());
assertEquals(
Optional.of(
@@ -174,7 +169,7 @@ public class FlinkDeploymentControllerTest {
assertEquals(
org.apache.flink.api.common.JobStatus.RUNNING.name(),
appCluster.getStatus().getJobStatus().getState());
- assertEquals(4, statusUpdateCounter.getCount());
+ assertEquals(4, testController.getInternalStatusUpdateCount());
assertFalse(updateControl.isUpdateStatus());
assertEquals(
Optional.of(
@@ -200,7 +195,7 @@ public class FlinkDeploymentControllerTest {
assertEquals(
org.apache.flink.api.common.JobStatus.RUNNING.name(),
appCluster.getStatus().getJobStatus().getState());
- assertEquals(5, statusUpdateCounter.getCount());
+ assertEquals(5, testController.getInternalStatusUpdateCount());
assertFalse(updateControl.isUpdateStatus());
reconciliationStatus = appCluster.getStatus().getReconciliationStatus();
@@ -408,7 +403,7 @@ public class FlinkDeploymentControllerTest {
.getSavepointInfo()
.getSavepointHistory()
.isEmpty());
- assertEquals(0, testController.reconcile(appCluster, context).getScheduleDelay().get());
+ assertEquals(0L, testController.reconcile(appCluster, context).getScheduleDelay().get());
assertEquals(
JobState.SUSPENDED,
appCluster
@@ -564,6 +559,7 @@ public class FlinkDeploymentControllerTest {
var appCluster = TestUtils.buildApplicationCluster(flinkVersion);
appCluster.getSpec().getJob().setUpgradeMode(upgradeMode);
testUpgradeNotReadyCluster(ReconciliationUtils.clone(appCluster));
+ assertEquals(upgradeMode, appCluster.getSpec().getJob().getUpgradeMode());
}
@Test
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
index b591c7e..92ead48 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
@@ -31,7 +31,6 @@ import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatu
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
-import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.util.function.ThrowingRunnable;
import io.fabric8.kubernetes.client.KubernetesClient;
@@ -59,7 +58,7 @@ public class RollbackTest {
private TestingFlinkService flinkService;
private Context context;
- private FlinkDeploymentController testController;
+ private TestingFlinkDeploymentController testController;
private KubernetesClient kubernetesClient;
@@ -68,11 +67,10 @@ public class RollbackTest {
flinkService = new TestingFlinkService(kubernetesClient);
context = flinkService.getContext();
testController =
- TestUtils.createTestController(
+ new TestingFlinkDeploymentController(
new FlinkConfigManager(new Configuration()),
kubernetesClient,
- flinkService,
- new StatusRecorder<>(kubernetesClient, (a, c) -> {}));
+ flinkService);
kubernetesClient.resource(TestUtils.buildApplicationCluster()).createOrReplace();
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
new file mode 100644
index 0000000..d3909b4
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller;
+
+import org.apache.flink.kubernetes.operator.TestingFlinkService;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.metrics.MetricManager;
+import org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
+import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
+import org.apache.flink.metrics.testutils.MetricListener;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Cleaner;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
+import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl;
+import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
+import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
+import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import io.javaoperatorsdk.operator.processing.event.source.EventSource;
+import org.junit.jupiter.api.Assertions;
+
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+/** A wrapper around {@link FlinkDeploymentController} used by unit tests. */
+public class TestingFlinkDeploymentController
+ implements Reconciler<FlinkDeployment>,
+ ErrorStatusHandler<FlinkDeployment>,
+ EventSourceInitializer<FlinkDeployment>,
+ Cleaner<FlinkDeployment> {
+
+ private FlinkDeploymentController flinkDeploymentController;
+ private StatusUpdateCounter statusUpdateCounter = new StatusUpdateCounter();
+ private EventRecorder eventRecorder;
+ private StatusRecorder statusRecorder;
+
+ public TestingFlinkDeploymentController(
+ FlinkConfigManager configManager,
+ KubernetesClient kubernetesClient,
+ TestingFlinkService flinkService) {
+ eventRecorder = new EventRecorder(kubernetesClient, (r, e) -> {});
+ statusRecorder = new StatusRecorder<>(kubernetesClient, statusUpdateCounter);
+
+ flinkDeploymentController =
+ new FlinkDeploymentController(
+ configManager,
+ ValidatorUtils.discoverValidators(configManager),
+ new ReconcilerFactory(
+ kubernetesClient,
+ flinkService,
+ configManager,
+ eventRecorder,
+ statusRecorder),
+ new ObserverFactory(
+ flinkService, configManager, statusRecorder, eventRecorder),
+ new MetricManager<>(new MetricListener().getMetricGroup()),
+ statusRecorder,
+ eventRecorder);
+ }
+
+ @Override
+ public UpdateControl<FlinkDeployment> reconcile(
+ FlinkDeployment flinkDeployment, Context<FlinkDeployment> context) throws Exception {
+ FlinkDeployment cloned = ReconciliationUtils.clone(flinkDeployment);
+ statusUpdateCounter.setCurrent(flinkDeployment);
+ UpdateControl<FlinkDeployment> updateControl =
+ flinkDeploymentController.reconcile(cloned, context);
+ Assertions.assertTrue(updateControl.isNoUpdate());
+ return updateControl;
+ }
+
+ @Override
+ public DeleteControl cleanup(
+ FlinkDeployment flinkDeployment, Context<FlinkDeployment> context) {
+ FlinkDeployment cloned = ReconciliationUtils.clone(flinkDeployment);
+ statusUpdateCounter.setCurrent(flinkDeployment);
+ return flinkDeploymentController.cleanup(cloned, context);
+ }
+
+ @Override
+ public ErrorStatusUpdateControl<FlinkDeployment> updateErrorStatus(
+ FlinkDeployment flinkDeployment, Context<FlinkDeployment> context, Exception e) {
+ FlinkDeployment cloned = ReconciliationUtils.clone(flinkDeployment);
+ statusUpdateCounter.setCurrent(flinkDeployment);
+ return flinkDeploymentController.updateErrorStatus(cloned, context, e);
+ }
+
+ @Override
+ public Map<String, EventSource> prepareEventSources(
+ EventSourceContext<FlinkDeployment> eventSourceContext) {
+ throw new UnsupportedOperationException();
+ }
+
+ private static class StatusUpdateCounter
+ implements BiConsumer<
+ AbstractFlinkResource<?, FlinkDeploymentStatus>, FlinkDeploymentStatus> {
+
+ private FlinkDeployment currentResource;
+ private int counter;
+
+ @Override
+ public void accept(
+ AbstractFlinkResource<?, FlinkDeploymentStatus>
+ flinkDeploymentStatusAbstractFlinkResource,
+ FlinkDeploymentStatus flinkDeploymentStatus) {
+ currentResource.setStatus(flinkDeploymentStatusAbstractFlinkResource.getStatus());
+ counter++;
+ }
+
+ public void setCurrent(FlinkDeployment currentResource) {
+ this.currentResource = currentResource;
+ }
+
+ public int getCount() {
+ return counter;
+ }
+ }
+
+ public int getInternalStatusUpdateCount() {
+ return statusUpdateCounter.getCount();
+ }
+}