You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/03/28 04:57:38 UTC
[flink-kubernetes-operator] branch main updated: [FLINK-26473] test coverage for listJobs error propagation
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 444a784 [FLINK-26473] test coverage for listJobs error propagation
444a784 is described below
commit 444a78407bb5bdb9507ad1aa303d2496a79ff025
Author: Thomas Weise <th...@apache.org>
AuthorDate: Sun Mar 27 20:16:53 2022 -0700
[FLINK-26473] test coverage for listJobs error propagation
---
.../flink/kubernetes/operator/TestUtils.java | 25 +++++++++++++++++
.../controller/FlinkDeploymentControllerTest.java | 29 ++------------------
.../operator/observer/JobObserverTest.java | 32 ++++++++++++++++++++++
3 files changed, 59 insertions(+), 27 deletions(-)
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index 84d65f4..fde3bfd 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -30,11 +30,17 @@ import org.apache.flink.kubernetes.operator.crd.spec.Resource;
import org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ContainerStatus;
+import io.fabric8.kubernetes.api.model.ContainerStatusBuilder;
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodList;
+import io.fabric8.kubernetes.api.model.PodListBuilder;
import io.fabric8.kubernetes.api.model.PodSpec;
+import io.fabric8.kubernetes.api.model.PodStatusBuilder;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
@@ -114,6 +120,25 @@ public class TestUtils {
return pod;
}
+ public static PodList createFailedPodList(String crashLoopMessage) {
+ ContainerStatus cs =
+ new ContainerStatusBuilder()
+ .withNewState()
+ .withNewWaiting()
+ .withReason(DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF)
+ .withMessage(crashLoopMessage)
+ .endWaiting()
+ .endState()
+ .build();
+
+ Pod pod = TestUtils.getTestPod("host", "apiVersion", Collections.emptyList());
+ pod.setStatus(
+ new PodStatusBuilder()
+ .withContainerStatuses(Collections.singletonList(cs))
+ .build());
+ return new PodListBuilder().withItems(pod).build();
+ }
+
public static Context createEmptyContext() {
return new Context() {
@Override
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 308c894..03013cc 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -36,13 +36,7 @@ import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.validation.DefaultDeploymentValidator;
import org.apache.flink.runtime.client.JobStatusMessage;
-import io.fabric8.kubernetes.api.model.ContainerStatus;
-import io.fabric8.kubernetes.api.model.ContainerStatusBuilder;
import io.fabric8.kubernetes.api.model.EventBuilder;
-import io.fabric8.kubernetes.api.model.Pod;
-import io.fabric8.kubernetes.api.model.PodList;
-import io.fabric8.kubernetes.api.model.PodListBuilder;
-import io.fabric8.kubernetes.api.model.PodStatusBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
@@ -218,7 +212,7 @@ public class FlinkDeploymentControllerTest {
.once();
String crashLoopMessage = "container fails";
- flinkService.setJmPodList(createFailedPodList(crashLoopMessage));
+ flinkService.setJmPodList(TestUtils.createFailedPodList(crashLoopMessage));
FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
UpdateControl<FlinkDeployment> updateControl;
@@ -504,7 +498,7 @@ public class FlinkDeploymentControllerTest {
@Test
public void testSuccessfulObservationShouldClearErrors() {
final String crashLoopMessage = "deploy errors";
- flinkService.setJmPodList(createFailedPodList(crashLoopMessage));
+ flinkService.setJmPodList(TestUtils.createFailedPodList(crashLoopMessage));
FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
@@ -531,25 +525,6 @@ public class FlinkDeploymentControllerTest {
assertNull(reconciliationStatus.getError());
}
- private PodList createFailedPodList(String crashLoopMessage) {
- ContainerStatus cs =
- new ContainerStatusBuilder()
- .withNewState()
- .withNewWaiting()
- .withReason(DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF)
- .withMessage(crashLoopMessage)
- .endWaiting()
- .endState()
- .build();
-
- Pod pod = TestUtils.getTestPod("host", "apiVersion", Collections.emptyList());
- pod.setStatus(
- new PodStatusBuilder()
- .withContainerStatuses(Collections.singletonList(cs))
- .build());
- return new PodListBuilder().withItems(pod).build();
- }
-
private FlinkDeploymentController createTestController(
KubernetesClient kubernetesClient, TestingFlinkService flinkService) {
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java
index 2db9425..b7aed87 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobObserverTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
@@ -33,6 +34,7 @@ import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
/** {@link JobObserver} unit tests. */
@@ -194,4 +196,34 @@ public class JobObserverTest {
deployment.getStatus().setJobStatus(jobStatus);
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
}
+
+ @Test
+ public void observeListJobsError() {
+ TestingFlinkService flinkService = new TestingFlinkService();
+ JobObserver observer =
+ new JobObserver(
+ flinkService,
+ FlinkOperatorConfiguration.fromConfiguration(new Configuration()));
+ FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+ Configuration conf = FlinkUtils.getEffectiveConfig(deployment, new Configuration());
+ bringToReadyStatus(deployment);
+ observer.observe(deployment, readyContext, conf);
+ assertEquals(
+ JobManagerDeploymentStatus.READY,
+ deployment.getStatus().getJobManagerDeploymentStatus());
+ // simulate deployment failure
+ String podFailedMessage = "list jobs error";
+ flinkService.setJmPodList(TestUtils.createFailedPodList(podFailedMessage));
+ flinkService.setPortReady(false);
+ Exception exception =
+ assertThrows(
+ DeploymentFailedException.class,
+ () -> {
+ observer.observe(
+ deployment,
+ TestUtils.createContextWithInProgressDeployment(),
+ conf);
+ });
+ assertEquals(podFailedMessage, exception.getMessage());
+ }
}