You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/02/16 10:34:07 UTC
[flink-kubernetes-operator] 14/23: Adding JobStatusObserverTest
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit a8cb3814189c7dcf2848d4f8dccafd7bc449cf7d
Author: Matyas Orhidi <ma...@apple.com>
AuthorDate: Mon Feb 7 16:42:22 2022 +0100
Adding JobStatusObserverTest
---
pom.xml | 9 ++
.../operator/crd/spec/FlinkDeploymentSpec.java | 4 +
.../operator/crd/spec/JobManagerSpec.java | 2 +
.../kubernetes/operator/crd/spec/JobSpec.java | 4 +
.../kubernetes/operator/crd/spec/Resource.java | 2 +
.../operator/crd/spec/TaskManagerSpec.java | 4 +-
.../operator/crd/status/FlinkDeploymentStatus.java | 2 +
.../kubernetes/operator/crd/status/JobStatus.java | 4 +
.../controller/observer/JobStatusObserverTest.java | 140 +++++++++++++++++++++
9 files changed, 170 insertions(+), 1 deletion(-)
diff --git a/pom.xml b/pom.xml
index 4838678..709e48d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -44,6 +44,7 @@ under the License.
<spotless.version>2.4.2</spotless.version>
<awaitility.version>4.1.0</awaitility.version>
<it.skip>true</it.skip>
+ <mockito.version>2.21.0</mockito.version>
</properties>
<dependencies>
@@ -81,6 +82,14 @@ under the License.
</dependency>
<dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>${mockito.version}</version>
+ <type>jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
index cacc430..8abaf5c 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
@@ -18,6 +18,8 @@
package org.apache.flink.kubernetes.operator.crd.spec;
import io.fabric8.kubernetes.api.model.Pod;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@@ -26,6 +28,8 @@ import java.util.Map;
/** Spec that describes a Flink application deployment. */
@Data
@NoArgsConstructor
+@AllArgsConstructor
+@Builder
public class FlinkDeploymentSpec {
private String image;
private String imagePullPolicy;
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java
index 09404ba..0f81b7e 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobManagerSpec.java
@@ -18,12 +18,14 @@
package org.apache.flink.kubernetes.operator.crd.spec;
import io.fabric8.kubernetes.api.model.Pod;
+import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/** JobManager spec. */
@Data
@NoArgsConstructor
+@AllArgsConstructor
public class JobManagerSpec {
private Resource resource;
private int replicas;
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobSpec.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobSpec.java
index bf7e0e6..062a5c0 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobSpec.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/JobSpec.java
@@ -17,6 +17,8 @@
package org.apache.flink.kubernetes.operator.crd.spec;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
@@ -24,6 +26,8 @@ import lombok.NoArgsConstructor;
/** Flink job spec. */
@Data
@NoArgsConstructor
+@AllArgsConstructor
+@Builder
public class JobSpec {
private String jarURI;
private int parallelism;
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/Resource.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/Resource.java
index e0e73e1..c86f9a5 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/Resource.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/Resource.java
@@ -17,12 +17,14 @@
package org.apache.flink.kubernetes.operator.crd.spec;
+import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/** Resource spec. */
@Data
@NoArgsConstructor
+@AllArgsConstructor
public class Resource {
private double cpu;
// 1024m, 1g
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java
index f6bd361..8b25127 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java
@@ -18,14 +18,16 @@
package org.apache.flink.kubernetes.operator.crd.spec;
import io.fabric8.kubernetes.api.model.Pod;
+import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/** TaskManager spec. */
@Data
@NoArgsConstructor
+@AllArgsConstructor
public class TaskManagerSpec {
- private int taskSlots;
private Resource resource;
+ private int taskSlots;
private Pod podTemplate;
}
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
index e2bf8a8..041bd0a 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
@@ -19,12 +19,14 @@ package org.apache.flink.kubernetes.operator.crd.status;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/** Current status of the Flink deployment. */
@Data
@NoArgsConstructor
+@AllArgsConstructor
public class FlinkDeploymentStatus {
private JobStatus jobStatus;
private FlinkDeploymentSpec spec;
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/crd/status/JobStatus.java b/src/main/java/org/apache/flink/kubernetes/operator/crd/status/JobStatus.java
index 19c1ed0..5a8528f 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/crd/status/JobStatus.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/crd/status/JobStatus.java
@@ -17,12 +17,16 @@
package org.apache.flink.kubernetes.operator.crd.status;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/** Status of an individual job within the Flink deployment. */
@Data
@NoArgsConstructor
+@AllArgsConstructor
+@Builder
public class JobStatus {
private String jobName;
private String jobId;
diff --git a/src/test/java/org/apache/flink/kubernetes/operator/controller/observer/JobStatusObserverTest.java b/src/test/java/org/apache/flink/kubernetes/operator/controller/observer/JobStatusObserverTest.java
new file mode 100644
index 0000000..a1999e1
--- /dev/null
+++ b/src/test/java/org/apache/flink/kubernetes/operator/controller/observer/JobStatusObserverTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller.observer;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobManagerSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.spec.Resource;
+import org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+import org.apache.flink.runtime.client.JobStatusMessage;
+
+import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/** @link JobStatusObserver unit tests */
+public class JobStatusObserverTest {
+
+ private static final String TEST_NAMESPACE = "flink-operator-test";
+ private static final String SERVICE_ACCOUNT = "flink-operator";
+ private static final String FLINK_VERSION = "latest";
+ private static final String IMAGE = String.format("flink:%s", FLINK_VERSION);
+ private static final String JOB_NAME = "test1";
+
+ private FlinkService flinkService = Mockito.mock(FlinkService.class);
+
+ @Test
+ public void observeSessionCluster() throws Exception {
+ JobStatusObserver observer = new JobStatusObserver(flinkService);
+ FlinkDeployment deployment = buildSessionCluster();
+ deployment.setStatus(new FlinkDeploymentStatus());
+ deployment.getStatus().setSpec(deployment.getSpec());
+ assertTrue(
+ observer.observeFlinkJobStatus(
+ deployment, FlinkUtils.getEffectiveConfig(deployment)));
+ }
+
+ @Test
+ public void observeApplicationCluster() throws Exception {
+ JobStatusObserver observer = new JobStatusObserver(flinkService);
+ FlinkDeployment deployment = buildApplicationCluster();
+ assertTrue(
+ observer.observeFlinkJobStatus(
+ deployment, FlinkUtils.getEffectiveConfig(deployment)));
+ deployment.setStatus(new FlinkDeploymentStatus());
+ deployment.getStatus().setSpec(deployment.getSpec());
+ verify(flinkService, times(0)).listJobs(any(Configuration.class));
+
+ when(flinkService.listJobs(any(Configuration.class))).thenReturn(Collections.emptyList());
+ assertFalse(
+ observer.observeFlinkJobStatus(
+ deployment, FlinkUtils.getEffectiveConfig(deployment)));
+ verify(flinkService, times(1)).listJobs(any(Configuration.class));
+
+ when(flinkService.listJobs(any(Configuration.class)))
+ .thenReturn(
+ Arrays.asList(
+ new JobStatusMessage(
+ new JobID(), JOB_NAME, JobStatus.RUNNING, 1L)));
+ assertTrue(
+ observer.observeFlinkJobStatus(
+ deployment, FlinkUtils.getEffectiveConfig(deployment)));
+ verify(flinkService, times(2)).listJobs(any(Configuration.class));
+ assertEquals(JOB_NAME, deployment.getStatus().getJobStatus().getJobName());
+ deployment.getSpec().getJob().setState(JobState.SUSPENDED);
+ assertTrue(
+ observer.observeFlinkJobStatus(
+ deployment, FlinkUtils.getEffectiveConfig(deployment)));
+ verify(flinkService, times(2)).listJobs(any(Configuration.class));
+ }
+
+ private static FlinkDeployment buildSessionCluster() {
+ FlinkDeployment deployment = new FlinkDeployment();
+ deployment.setMetadata(
+ new ObjectMetaBuilder()
+ .withName("test-cluster")
+ .withNamespace(TEST_NAMESPACE)
+ .build());
+ deployment.setSpec(
+ FlinkDeploymentSpec.builder()
+ .image(IMAGE)
+ .flinkVersion(FLINK_VERSION)
+ .flinkConfiguration(
+ Collections.singletonMap(
+ KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT.key(),
+ SERVICE_ACCOUNT))
+ .jobManager(new JobManagerSpec(new Resource(1, "2048m"), 1, null))
+ .taskManager(new TaskManagerSpec(new Resource(1, "2048m"), 2, null))
+ .build());
+ return deployment;
+ }
+
+ private FlinkDeployment buildApplicationCluster() {
+ FlinkDeployment deployment = buildSessionCluster();
+ deployment
+ .getSpec()
+ .setJob(
+ JobSpec.builder()
+ .jarURI("local:///tmp/sample.jar")
+ .state(JobState.RUNNING)
+ .build());
+ return deployment;
+ }
+}