You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/02/16 10:34:07 UTC

[flink-kubernetes-operator] 14/23: Adding JobStatusObserverTest

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit a8cb3814189c7dcf2848d4f8dccafd7bc449cf7d
Author: Matyas Orhidi <ma...@apple.com>
AuthorDate: Mon Feb 7 16:42:22 2022 +0100

    Adding JobStatusObserverTest
---
 pom.xml                                            |   9 ++
 .../operator/crd/spec/FlinkDeploymentSpec.java     |   4 +
 .../operator/crd/spec/JobManagerSpec.java          |   2 +
 .../kubernetes/operator/crd/spec/JobSpec.java      |   4 +
 .../kubernetes/operator/crd/spec/Resource.java     |   2 +
 .../operator/crd/spec/TaskManagerSpec.java         |   4 +-
 .../operator/crd/status/FlinkDeploymentStatus.java |   2 +
 .../kubernetes/operator/crd/status/JobStatus.java  |   4 +
 .../controller/observer/JobStatusObserverTest.java | 140 +++++++++++++++++++++
 9 files changed, 170 insertions(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index 4838678..709e48d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -44,6 +44,7 @@ under the License.
         <spotless.version>2.4.2</spotless.version>
         <awaitility.version>4.1.0</awaitility.version>
         <it.skip>true</it.skip>
+        <mockito.version>2.21.0</mockito.version>
     </properties>
 
     <dependencies>
@@ -81,6 +82,14 @@ under the License.
         </dependency>
 
         <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>${mockito.version}</version>
+            <type>jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
             <groupId>org.projectlombok</groupId>
             <artifactId>lombok</artifactId>
             <version>${lombok.version}</version>
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
index cacc430..8abaf5c 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
@@ -18,6 +18,8 @@
 package org.apache.flink.kubernetes.operator.crd.spec;
 
 import io.fabric8.kubernetes.api.model.Pod;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
@@ -26,6 +28,8 @@ import java.util.Map;
 /** Spec that describes a Flink application deployment. */
 @Data
 @NoArgsConstructor
+@AllArgsConstructor
+@Builder
 public class FlinkDeploymentSpec {
     private String image;
     private String imagePullPolicy;
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java
index 09404ba..0f81b7e 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java
@@ -18,12 +18,14 @@
 package org.apache.flink.kubernetes.operator.crd.spec;
 
 import io.fabric8.kubernetes.api.model.Pod;
+import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
 /** JobManager spec. */
 @Data
 @NoArgsConstructor
+@AllArgsConstructor
 public class JobManagerSpec {
     private Resource resource;
     private int replicas;
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobSpec.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobSpec.java
index bf7e0e6..062a5c0 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobSpec.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobSpec.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.kubernetes.operator.crd.spec;
 
+import lombok.AllArgsConstructor;
+import lombok.Builder;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
@@ -24,6 +26,8 @@ import lombok.NoArgsConstructor;
 /** Flink job spec. */
 @Data
 @NoArgsConstructor
+@AllArgsConstructor
+@Builder
 public class JobSpec {
     private String jarURI;
     private int parallelism;
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/Resource.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/Resource.java
index e0e73e1..c86f9a5 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/Resource.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/Resource.java
@@ -17,12 +17,14 @@
 
 package org.apache.flink.kubernetes.operator.crd.spec;
 
+import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
 /** Resource spec. */
 @Data
 @NoArgsConstructor
+@AllArgsConstructor
 public class Resource {
     private double cpu;
     // 1024m, 1g
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java
index f6bd361..8b25127 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java
@@ -18,14 +18,16 @@
 package org.apache.flink.kubernetes.operator.crd.spec;
 
 import io.fabric8.kubernetes.api.model.Pod;
+import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
 /** TaskManager spec. */
 @Data
 @NoArgsConstructor
+@AllArgsConstructor
 public class TaskManagerSpec {
-    private int taskSlots;
     private Resource resource;
+    private int taskSlots;
     private Pod podTemplate;
 }
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
index e2bf8a8..041bd0a 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
@@ -19,12 +19,14 @@ package org.apache.flink.kubernetes.operator.crd.status;
 
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
 
+import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
 /** Current status of the Flink deployment. */
 @Data
 @NoArgsConstructor
+@AllArgsConstructor
 public class FlinkDeploymentStatus {
     private JobStatus jobStatus;
     private FlinkDeploymentSpec spec;
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/status/JobStatus.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/status/JobStatus.java
index 19c1ed0..5a8528f 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/crd/status/JobStatus.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/status/JobStatus.java
@@ -17,12 +17,16 @@
 
 package org.apache.flink.kubernetes.operator.crd.status;
 
+import lombok.AllArgsConstructor;
+import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
 /** Status of an individual job within the Flink deployment. */
 @Data
 @NoArgsConstructor
+@AllArgsConstructor
+@Builder
 public class JobStatus {
     private String jobName;
     private String jobId;
diff --git a/src/test/java/org/apache/flink/kubernetes/operator/controller/observer/JobStatusObserverTest.java b/src/test/java/org/apache/flink/kubernetes/operator/controller/observer/JobStatusObserverTest.java
new file mode 100644
index 0000000..a1999e1
--- /dev/null
+++ b/src/test/java/org/apache/flink/kubernetes/operator/controller/observer/JobStatusObserverTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller.observer;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobManagerSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.spec.Resource;
+import org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+import org.apache.flink.runtime.client.JobStatusMessage;
+
+import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/** @link JobStatusObserver unit tests */
+public class JobStatusObserverTest {
+
+    private static final String TEST_NAMESPACE = "flink-operator-test";
+    private static final String SERVICE_ACCOUNT = "flink-operator";
+    private static final String FLINK_VERSION = "latest";
+    private static final String IMAGE = String.format("flink:%s", FLINK_VERSION);
+    private static final String JOB_NAME = "test1";
+
+    private FlinkService flinkService = Mockito.mock(FlinkService.class);
+
+    @Test
+    public void observeSessionCluster() throws Exception {
+        JobStatusObserver observer = new JobStatusObserver(flinkService);
+        FlinkDeployment deployment = buildSessionCluster();
+        deployment.setStatus(new FlinkDeploymentStatus());
+        deployment.getStatus().setSpec(deployment.getSpec());
+        assertTrue(
+                observer.observeFlinkJobStatus(
+                        deployment, FlinkUtils.getEffectiveConfig(deployment)));
+    }
+
+    @Test
+    public void observeApplicationCluster() throws Exception {
+        JobStatusObserver observer = new JobStatusObserver(flinkService);
+        FlinkDeployment deployment = buildApplicationCluster();
+        assertTrue(
+                observer.observeFlinkJobStatus(
+                        deployment, FlinkUtils.getEffectiveConfig(deployment)));
+        deployment.setStatus(new FlinkDeploymentStatus());
+        deployment.getStatus().setSpec(deployment.getSpec());
+        verify(flinkService, times(0)).listJobs(any(Configuration.class));
+
+        when(flinkService.listJobs(any(Configuration.class))).thenReturn(Collections.emptyList());
+        assertFalse(
+                observer.observeFlinkJobStatus(
+                        deployment, FlinkUtils.getEffectiveConfig(deployment)));
+        verify(flinkService, times(1)).listJobs(any(Configuration.class));
+
+        when(flinkService.listJobs(any(Configuration.class)))
+                .thenReturn(
+                        Arrays.asList(
+                                new JobStatusMessage(
+                                        new JobID(), JOB_NAME, JobStatus.RUNNING, 1L)));
+        assertTrue(
+                observer.observeFlinkJobStatus(
+                        deployment, FlinkUtils.getEffectiveConfig(deployment)));
+        verify(flinkService, times(2)).listJobs(any(Configuration.class));
+        assertEquals(JOB_NAME, deployment.getStatus().getJobStatus().getJobName());
+        deployment.getSpec().getJob().setState(JobState.SUSPENDED);
+        assertTrue(
+                observer.observeFlinkJobStatus(
+                        deployment, FlinkUtils.getEffectiveConfig(deployment)));
+        verify(flinkService, times(2)).listJobs(any(Configuration.class));
+    }
+
+    private static FlinkDeployment buildSessionCluster() {
+        FlinkDeployment deployment = new FlinkDeployment();
+        deployment.setMetadata(
+                new ObjectMetaBuilder()
+                        .withName("test-cluster")
+                        .withNamespace(TEST_NAMESPACE)
+                        .build());
+        deployment.setSpec(
+                FlinkDeploymentSpec.builder()
+                        .image(IMAGE)
+                        .flinkVersion(FLINK_VERSION)
+                        .flinkConfiguration(
+                                Collections.singletonMap(
+                                        KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT.key(),
+                                        SERVICE_ACCOUNT))
+                        .jobManager(new JobManagerSpec(new Resource(1, "2048m"), 1, null))
+                        .taskManager(new TaskManagerSpec(new Resource(1, "2048m"), 2, null))
+                        .build());
+        return deployment;
+    }
+
+    private FlinkDeployment buildApplicationCluster() {
+        FlinkDeployment deployment = buildSessionCluster();
+        deployment
+                .getSpec()
+                .setJob(
+                        JobSpec.builder()
+                                .jarURI("local:///tmp/sample.jar")
+                                .state(JobState.RUNNING)
+                                .build());
+        return deployment;
+    }
+}