You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/03/28 04:57:38 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-26473] test coverage for listJobs error propagation

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 444a784  [FLINK-26473] test coverage for listJobs error propagation
444a784 is described below

commit 444a78407bb5bdb9507ad1aa303d2496a79ff025
Author: Thomas Weise <th...@apache.org>
AuthorDate: Sun Mar 27 20:16:53 2022 -0700

    [FLINK-26473] test coverage for listJobs error propagation
---
 .../flink/kubernetes/operator/TestUtils.java       | 25 +++++++++++++++++
 .../controller/FlinkDeploymentControllerTest.java  | 29 ++------------------
 .../operator/observer/JobObserverTest.java         | 32 ++++++++++++++++++++++
 3 files changed, 59 insertions(+), 27 deletions(-)

diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index 84d65f4..fde3bfd 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -30,11 +30,17 @@ import org.apache.flink.kubernetes.operator.crd.spec.Resource;
 import org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
 
 import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ContainerStatus;
+import io.fabric8.kubernetes.api.model.ContainerStatusBuilder;
 import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
 import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodList;
+import io.fabric8.kubernetes.api.model.PodListBuilder;
 import io.fabric8.kubernetes.api.model.PodSpec;
+import io.fabric8.kubernetes.api.model.PodStatusBuilder;
 import io.fabric8.kubernetes.api.model.apps.Deployment;
 import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
 import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
@@ -114,6 +120,25 @@ public class TestUtils {
         return pod;
     }
 
+    public static PodList createFailedPodList(String crashLoopMessage) {
+        ContainerStatus cs =
+                new ContainerStatusBuilder()
+                        .withNewState()
+                        .withNewWaiting()
+                        .withReason(DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF)
+                        .withMessage(crashLoopMessage)
+                        .endWaiting()
+                        .endState()
+                        .build();
+
+        Pod pod = TestUtils.getTestPod("host", "apiVersion", Collections.emptyList());
+        pod.setStatus(
+                new PodStatusBuilder()
+                        .withContainerStatuses(Collections.singletonList(cs))
+                        .build());
+        return new PodListBuilder().withItems(pod).build();
+    }
+
     public static Context createEmptyContext() {
         return new Context() {
             @Override
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 308c894..03013cc 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -36,13 +36,7 @@ import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.validation.DefaultDeploymentValidator;
 import org.apache.flink.runtime.client.JobStatusMessage;
 
-import io.fabric8.kubernetes.api.model.ContainerStatus;
-import io.fabric8.kubernetes.api.model.ContainerStatusBuilder;
 import io.fabric8.kubernetes.api.model.EventBuilder;
-import io.fabric8.kubernetes.api.model.Pod;
-import io.fabric8.kubernetes.api.model.PodList;
-import io.fabric8.kubernetes.api.model.PodListBuilder;
-import io.fabric8.kubernetes.api.model.PodStatusBuilder;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
@@ -218,7 +212,7 @@ public class FlinkDeploymentControllerTest {
                 .once();
 
         String crashLoopMessage = "container fails";
-        flinkService.setJmPodList(createFailedPodList(crashLoopMessage));
+        flinkService.setJmPodList(TestUtils.createFailedPodList(crashLoopMessage));
 
         FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
         UpdateControl<FlinkDeployment> updateControl;
@@ -504,7 +498,7 @@ public class FlinkDeploymentControllerTest {
     @Test
     public void testSuccessfulObservationShouldClearErrors() {
         final String crashLoopMessage = "deploy errors";
-        flinkService.setJmPodList(createFailedPodList(crashLoopMessage));
+        flinkService.setJmPodList(TestUtils.createFailedPodList(crashLoopMessage));
 
         FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
 
@@ -531,25 +525,6 @@ public class FlinkDeploymentControllerTest {
         assertNull(reconciliationStatus.getError());
     }
 
-    private PodList createFailedPodList(String crashLoopMessage) {
-        ContainerStatus cs =
-                new ContainerStatusBuilder()
-                        .withNewState()
-                        .withNewWaiting()
-                        .withReason(DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF)
-                        .withMessage(crashLoopMessage)
-                        .endWaiting()
-                        .endState()
-                        .build();
-
-        Pod pod = TestUtils.getTestPod("host", "apiVersion", Collections.emptyList());
-        pod.setStatus(
-                new PodStatusBuilder()
-                        .withContainerStatuses(Collections.singletonList(cs))
-                        .build());
-        return new PodListBuilder().withItems(pod).build();
-    }
-
     private FlinkDeploymentController createTestController(
             KubernetesClient kubernetesClient, TestingFlinkService flinkService) {
 
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java
index 2db9425..b7aed87 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 
@@ -33,6 +34,7 @@ import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** {@link JobObserver} unit tests. */
@@ -194,4 +196,34 @@ public class JobObserverTest {
         deployment.getStatus().setJobStatus(jobStatus);
         deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
     }
+
+    @Test
+    public void observeListJobsError() {
+        TestingFlinkService flinkService = new TestingFlinkService();
+        JobObserver observer =
+                new JobObserver(
+                        flinkService,
+                        FlinkOperatorConfiguration.fromConfiguration(new Configuration()));
+        FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+        Configuration conf = FlinkUtils.getEffectiveConfig(deployment, new Configuration());
+        bringToReadyStatus(deployment);
+        observer.observe(deployment, readyContext, conf);
+        assertEquals(
+                JobManagerDeploymentStatus.READY,
+                deployment.getStatus().getJobManagerDeploymentStatus());
+        // simulate deployment failure
+        String podFailedMessage = "list jobs error";
+        flinkService.setJmPodList(TestUtils.createFailedPodList(podFailedMessage));
+        flinkService.setPortReady(false);
+        Exception exception =
+                assertThrows(
+                        DeploymentFailedException.class,
+                        () -> {
+                            observer.observe(
+                                    deployment,
+                                    TestUtils.createContextWithInProgressDeployment(),
+                                    conf);
+                        });
+        assertEquals(podFailedMessage, exception.getMessage());
+    }
 }