You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/02/16 10:34:15 UTC
[flink-kubernetes-operator] 22/23: Rework tests to avoid using mockito
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit ce0ba97c5bb0b49f0ab52dfde72e4ad248c3487b
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Tue Feb 15 14:33:30 2022 +0100
Rework tests to avoid using mockito
---
.../kubernetes/operator/TestingFlinkService.java | 100 +++++++++++++++++++++
.../operator/observer/JobStatusObserverTest.java | 56 ++++--------
.../operator/reconciler/JobReconcilerTest.java | 61 +++++--------
pom.xml | 10 ---
4 files changed, 141 insertions(+), 86 deletions(-)
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
new file mode 100644
index 0000000..9a915bc
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** Flink service mock for tests. */
+public class TestingFlinkService extends FlinkService {
+
+ public static final String SAVEPOINT = "savepoint";
+
+ private List<Tuple2<String, JobStatusMessage>> jobs = new ArrayList<>();
+ private Set<String> sessions = new HashSet<>();
+
+ public TestingFlinkService() {
+ super(null);
+ }
+
+ public void clear() {
+ jobs.clear();
+ sessions.clear();
+ }
+
+ @Override
+ public void submitApplicationCluster(FlinkDeployment deployment, Configuration conf) {
+ JobID jobID = new JobID();
+ JobStatusMessage jobStatusMessage =
+ new JobStatusMessage(
+ jobID,
+ deployment.getMetadata().getName(),
+ JobStatus.RUNNING,
+ System.currentTimeMillis());
+
+ jobs.add(Tuple2.of(conf.get(SavepointConfigOptions.SAVEPOINT_PATH), jobStatusMessage));
+ }
+
+ @Override
+ public void submitSessionCluster(FlinkDeployment deployment, Configuration conf) {
+ sessions.add(deployment.getMetadata().getName());
+ }
+
+ @Override
+ public List<JobStatusMessage> listJobs(Configuration conf) {
+ return jobs.stream().map(t -> t.f1).collect(Collectors.toList());
+ }
+
+ public List<Tuple2<String, JobStatusMessage>> listJobs() {
+ return new ArrayList<>(jobs);
+ }
+
+ @Override
+ public Optional<String> cancelJob(JobID jobID, UpgradeMode upgradeMode, Configuration conf)
+ throws Exception {
+
+ if (!jobs.removeIf(js -> js.f1.getJobId().equals(jobID))) {
+ throw new Exception("Job not found");
+ }
+
+ if (upgradeMode != UpgradeMode.STATELESS) {
+ return Optional.of(SAVEPOINT);
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ @Override
+ public void stopSessionCluster(FlinkDeployment deployment, Configuration conf) {
+ sessions.remove(deployment.getMetadata().getName());
+ }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
index 37da2f9..e2a1dfd 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
@@ -17,40 +17,27 @@
package org.apache.flink.kubernetes.operator.observer;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
-import org.apache.flink.runtime.client.JobStatusMessage;
import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
-
-import java.util.Arrays;
-import java.util.Collections;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
/** @link JobStatusObserver unit tests */
public class JobStatusObserverTest {
- public static final String JOB_NAME = "test1";
-
- private FlinkService flinkService = Mockito.mock(FlinkService.class);
-
@Test
public void observeSessionCluster() {
+ FlinkService flinkService = new TestingFlinkService();
JobStatusObserver observer = new JobStatusObserver(flinkService);
FlinkDeployment deployment = TestUtils.buildSessionCluster();
deployment.setStatus(new FlinkDeploymentStatus());
@@ -61,36 +48,27 @@ public class JobStatusObserverTest {
}
@Test
- public void observeApplicationCluster() throws Exception {
+ public void observeApplicationCluster() {
+ TestingFlinkService flinkService = new TestingFlinkService();
JobStatusObserver observer = new JobStatusObserver(flinkService);
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
- assertTrue(
- observer.observeFlinkJobStatus(
- deployment, FlinkUtils.getEffectiveConfig(deployment)));
+ Configuration conf = FlinkUtils.getEffectiveConfig(deployment);
+
+ assertTrue(observer.observeFlinkJobStatus(deployment, conf));
deployment.setStatus(new FlinkDeploymentStatus());
deployment.getStatus().setSpec(deployment.getSpec());
- verify(flinkService, times(0)).listJobs(any(Configuration.class));
- when(flinkService.listJobs(any(Configuration.class))).thenReturn(Collections.emptyList());
- assertFalse(
- observer.observeFlinkJobStatus(
- deployment, FlinkUtils.getEffectiveConfig(deployment)));
- verify(flinkService, times(1)).listJobs(any(Configuration.class));
+ assertFalse(observer.observeFlinkJobStatus(deployment, conf));
+
+ flinkService.submitApplicationCluster(deployment, conf);
+ assertTrue(observer.observeFlinkJobStatus(deployment, conf));
+
+ assertEquals(
+ deployment.getMetadata().getName(),
+ deployment.getStatus().getJobStatus().getJobName());
- when(flinkService.listJobs(any(Configuration.class)))
- .thenReturn(
- Arrays.asList(
- new JobStatusMessage(
- new JobID(), JOB_NAME, JobStatus.RUNNING, 1L)));
- assertTrue(
- observer.observeFlinkJobStatus(
- deployment, FlinkUtils.getEffectiveConfig(deployment)));
- verify(flinkService, times(2)).listJobs(any(Configuration.class));
- assertEquals(JOB_NAME, deployment.getStatus().getJobStatus().getJobName());
deployment.getSpec().getJob().setState(JobState.SUSPENDED);
- assertTrue(
- observer.observeFlinkJobStatus(
- deployment, FlinkUtils.getEffectiveConfig(deployment)));
- verify(flinkService, times(2)).listJobs(any(Configuration.class));
+ flinkService.clear();
+ assertTrue(observer.observeFlinkJobStatus(deployment, conf));
}
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
index 77da7b4..2e2bf78 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
@@ -17,51 +17,43 @@
package org.apache.flink.kubernetes.operator.reconciler;
-import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
-import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
-import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
+import org.apache.flink.runtime.client.JobStatusMessage;
-import io.fabric8.kubernetes.client.KubernetesClient;
import org.junit.jupiter.api.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Mockito;
-import java.util.Optional;
+import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.times;
+import static org.junit.jupiter.api.Assertions.assertNull;
/** @link JobStatusObserver unit tests */
public class JobReconcilerTest {
- public static final String JOB_NAME = "test1";
- public static final String JOB_ID = "fd72014d4c864993a2e5a9287b4a9c5d";
-
- private FlinkService flinkService = Mockito.mock(FlinkService.class);
-
@Test
public void testUpgrade() throws Exception {
- KubernetesClient kubernetesClient = Mockito.mock(KubernetesClient.class);
- JobReconciler reconciler = new JobReconciler(kubernetesClient, flinkService);
+ TestingFlinkService flinkService = new TestingFlinkService();
+
+ JobReconciler reconciler = new JobReconciler(null, flinkService);
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
Configuration config = FlinkUtils.getEffectiveConfig(deployment);
reconciler.reconcile("test", deployment, config);
- Mockito.verify(flinkService, times(1)).submitApplicationCluster(eq(deployment), eq(config));
- Mockito.clearInvocations(flinkService);
+ List<Tuple2<String, JobStatusMessage>> runningJobs = flinkService.listJobs();
+ assertEquals(1, runningJobs.size());
+ assertNull(runningJobs.get(0).f0);
deployment.getStatus().setSpec(deployment.getSpec());
JobStatus jobStatus = new JobStatus();
- jobStatus.setJobName(JOB_NAME);
- jobStatus.setJobId(JOB_ID);
+ jobStatus.setJobName(runningJobs.get(0).f1.getJobName());
+ jobStatus.setJobId(runningJobs.get(0).f1.getJobId().toHexString());
jobStatus.setState("RUNNING");
deployment.getStatus().setJobStatus(jobStatus);
@@ -71,30 +63,25 @@ public class JobReconcilerTest {
statelessUpgrade.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS);
statelessUpgrade.getSpec().getFlinkConfiguration().put("new", "conf");
reconciler.reconcile("test", statelessUpgrade, config);
- Mockito.verify(flinkService, times(1))
- .cancelJob(eq(JobID.fromHexString(JOB_ID)), eq(UpgradeMode.STATELESS), eq(config));
- Mockito.verify(flinkService, times(1))
- .submitApplicationCluster(eq(statelessUpgrade), eq(config));
+ runningJobs = flinkService.listJobs();
+ assertEquals(1, runningJobs.size());
+ assertNull(runningJobs.get(0).f0);
- Mockito.clearInvocations(flinkService);
+ deployment
+ .getStatus()
+ .getJobStatus()
+ .setJobId(runningJobs.get(0).f1.getJobId().toHexString());
// Test stateful upgrade
FlinkDeployment statefulUpgrade = TestUtils.clone(deployment);
statefulUpgrade.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
- statefulUpgrade.getSpec().getFlinkConfiguration().put("new", "conf");
-
- Mockito.doReturn(Optional.of("sp")).when(flinkService).cancelJob(any(), any(), any());
+ statefulUpgrade.getSpec().getFlinkConfiguration().put("new", "conf2");
reconciler.reconcile("test", statefulUpgrade, new Configuration(config));
- Mockito.verify(flinkService, times(1))
- .cancelJob(eq(JobID.fromHexString(JOB_ID)), eq(UpgradeMode.SAVEPOINT), eq(config));
-
- ArgumentCaptor<Configuration> argument = ArgumentCaptor.forClass(Configuration.class);
- Mockito.verify(flinkService, times(1))
- .submitApplicationCluster(eq(statefulUpgrade), argument.capture());
- assertEquals("sp", argument.getValue().get(SavepointConfigOptions.SAVEPOINT_PATH));
- Mockito.verifyNoMoreInteractions(flinkService);
+ runningJobs = flinkService.listJobs();
+ assertEquals(1, runningJobs.size());
+ assertEquals(TestingFlinkService.SAVEPOINT, runningJobs.get(0).f0);
}
}
diff --git a/pom.xml b/pom.xml
index 7f31747..8be4638 100644
--- a/pom.xml
+++ b/pom.xml
@@ -70,7 +70,6 @@ under the License.
<spotless.version>2.4.2</spotless.version>
<it.skip>true</it.skip>
- <mockito.version>3.12.4</mockito.version>
</properties>
<dependencies>
@@ -81,15 +80,6 @@ under the License.
<scope>provided</scope>
</dependency>
- <!-- Testing -->
-
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
- <version>${mockito.version}</version>
- <scope>test</scope>
- </dependency>
-
<!-- Logging -->
<dependency>