You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/02/16 10:34:15 UTC

[flink-kubernetes-operator] 22/23: Rework tests to avoid using mockito

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit ce0ba97c5bb0b49f0ab52dfde72e4ad248c3487b
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Tue Feb 15 14:33:30 2022 +0100

    Rework tests to avoid using mockito
---
 .../kubernetes/operator/TestingFlinkService.java   | 100 +++++++++++++++++++++
 .../operator/observer/JobStatusObserverTest.java   |  56 ++++--------
 .../operator/reconciler/JobReconcilerTest.java     |  61 +++++--------
 pom.xml                                            |  10 ---
 4 files changed, 141 insertions(+), 86 deletions(-)

diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
new file mode 100644
index 0000000..9a915bc
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** Flink service mock for tests. */
+public class TestingFlinkService extends FlinkService {
+
+    public static final String SAVEPOINT = "savepoint";
+
+    private List<Tuple2<String, JobStatusMessage>> jobs = new ArrayList<>();
+    private Set<String> sessions = new HashSet<>();
+
+    public TestingFlinkService() {
+        super(null);
+    }
+
+    public void clear() {
+        jobs.clear();
+        sessions.clear();
+    }
+
+    @Override
+    public void submitApplicationCluster(FlinkDeployment deployment, Configuration conf) {
+        JobID jobID = new JobID();
+        JobStatusMessage jobStatusMessage =
+                new JobStatusMessage(
+                        jobID,
+                        deployment.getMetadata().getName(),
+                        JobStatus.RUNNING,
+                        System.currentTimeMillis());
+
+        jobs.add(Tuple2.of(conf.get(SavepointConfigOptions.SAVEPOINT_PATH), jobStatusMessage));
+    }
+
+    @Override
+    public void submitSessionCluster(FlinkDeployment deployment, Configuration conf) {
+        sessions.add(deployment.getMetadata().getName());
+    }
+
+    @Override
+    public List<JobStatusMessage> listJobs(Configuration conf) {
+        return jobs.stream().map(t -> t.f1).collect(Collectors.toList());
+    }
+
+    public List<Tuple2<String, JobStatusMessage>> listJobs() {
+        return new ArrayList<>(jobs);
+    }
+
+    @Override
+    public Optional<String> cancelJob(JobID jobID, UpgradeMode upgradeMode, Configuration conf)
+            throws Exception {
+
+        if (!jobs.removeIf(js -> js.f1.getJobId().equals(jobID))) {
+            throw new Exception("Job not found");
+        }
+
+        if (upgradeMode != UpgradeMode.STATELESS) {
+            return Optional.of(SAVEPOINT);
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    @Override
+    public void stopSessionCluster(FlinkDeployment deployment, Configuration conf) {
+        sessions.remove(deployment.getMetadata().getName());
+    }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
index 37da2f9..e2a1dfd 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
@@ -17,40 +17,27 @@
 
 package org.apache.flink.kubernetes.operator.observer;
 
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.TestingFlinkService;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
-import org.apache.flink.runtime.client.JobStatusMessage;
 
 import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
-
-import java.util.Arrays;
-import java.util.Collections;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 /** @link JobStatusObserver unit tests */
 public class JobStatusObserverTest {
 
-    public static final String JOB_NAME = "test1";
-
-    private FlinkService flinkService = Mockito.mock(FlinkService.class);
-
     @Test
     public void observeSessionCluster() {
+        FlinkService flinkService = new TestingFlinkService();
         JobStatusObserver observer = new JobStatusObserver(flinkService);
         FlinkDeployment deployment = TestUtils.buildSessionCluster();
         deployment.setStatus(new FlinkDeploymentStatus());
@@ -61,36 +48,27 @@ public class JobStatusObserverTest {
     }
 
     @Test
-    public void observeApplicationCluster() throws Exception {
+    public void observeApplicationCluster() {
+        TestingFlinkService flinkService = new TestingFlinkService();
         JobStatusObserver observer = new JobStatusObserver(flinkService);
         FlinkDeployment deployment = TestUtils.buildApplicationCluster();
-        assertTrue(
-                observer.observeFlinkJobStatus(
-                        deployment, FlinkUtils.getEffectiveConfig(deployment)));
+        Configuration conf = FlinkUtils.getEffectiveConfig(deployment);
+
+        assertTrue(observer.observeFlinkJobStatus(deployment, conf));
         deployment.setStatus(new FlinkDeploymentStatus());
         deployment.getStatus().setSpec(deployment.getSpec());
-        verify(flinkService, times(0)).listJobs(any(Configuration.class));
 
-        when(flinkService.listJobs(any(Configuration.class))).thenReturn(Collections.emptyList());
-        assertFalse(
-                observer.observeFlinkJobStatus(
-                        deployment, FlinkUtils.getEffectiveConfig(deployment)));
-        verify(flinkService, times(1)).listJobs(any(Configuration.class));
+        assertFalse(observer.observeFlinkJobStatus(deployment, conf));
+
+        flinkService.submitApplicationCluster(deployment, conf);
+        assertTrue(observer.observeFlinkJobStatus(deployment, conf));
+
+        assertEquals(
+                deployment.getMetadata().getName(),
+                deployment.getStatus().getJobStatus().getJobName());
 
-        when(flinkService.listJobs(any(Configuration.class)))
-                .thenReturn(
-                        Arrays.asList(
-                                new JobStatusMessage(
-                                        new JobID(), JOB_NAME, JobStatus.RUNNING, 1L)));
-        assertTrue(
-                observer.observeFlinkJobStatus(
-                        deployment, FlinkUtils.getEffectiveConfig(deployment)));
-        verify(flinkService, times(2)).listJobs(any(Configuration.class));
-        assertEquals(JOB_NAME, deployment.getStatus().getJobStatus().getJobName());
         deployment.getSpec().getJob().setState(JobState.SUSPENDED);
-        assertTrue(
-                observer.observeFlinkJobStatus(
-                        deployment, FlinkUtils.getEffectiveConfig(deployment)));
-        verify(flinkService, times(2)).listJobs(any(Configuration.class));
+        flinkService.clear();
+        assertTrue(observer.observeFlinkJobStatus(deployment, conf));
     }
 }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
index 77da7b4..2e2bf78 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
@@ -17,51 +17,43 @@
 
 package org.apache.flink.kubernetes.operator.reconciler;
 
-import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.TestingFlinkService;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
-import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
-import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
+import org.apache.flink.runtime.client.JobStatusMessage;
 
-import io.fabric8.kubernetes.client.KubernetesClient;
 import org.junit.jupiter.api.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Mockito;
 
-import java.util.Optional;
+import java.util.List;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.times;
+import static org.junit.jupiter.api.Assertions.assertNull;
 
 /** @link JobStatusObserver unit tests */
 public class JobReconcilerTest {
 
-    public static final String JOB_NAME = "test1";
-    public static final String JOB_ID = "fd72014d4c864993a2e5a9287b4a9c5d";
-
-    private FlinkService flinkService = Mockito.mock(FlinkService.class);
-
     @Test
     public void testUpgrade() throws Exception {
-        KubernetesClient kubernetesClient = Mockito.mock(KubernetesClient.class);
-        JobReconciler reconciler = new JobReconciler(kubernetesClient, flinkService);
+        TestingFlinkService flinkService = new TestingFlinkService();
+
+        JobReconciler reconciler = new JobReconciler(null, flinkService);
         FlinkDeployment deployment = TestUtils.buildApplicationCluster();
         Configuration config = FlinkUtils.getEffectiveConfig(deployment);
 
         reconciler.reconcile("test", deployment, config);
-        Mockito.verify(flinkService, times(1)).submitApplicationCluster(eq(deployment), eq(config));
-        Mockito.clearInvocations(flinkService);
+        List<Tuple2<String, JobStatusMessage>> runningJobs = flinkService.listJobs();
+        assertEquals(1, runningJobs.size());
+        assertNull(runningJobs.get(0).f0);
         deployment.getStatus().setSpec(deployment.getSpec());
 
         JobStatus jobStatus = new JobStatus();
-        jobStatus.setJobName(JOB_NAME);
-        jobStatus.setJobId(JOB_ID);
+        jobStatus.setJobName(runningJobs.get(0).f1.getJobName());
+        jobStatus.setJobId(runningJobs.get(0).f1.getJobId().toHexString());
         jobStatus.setState("RUNNING");
 
         deployment.getStatus().setJobStatus(jobStatus);
@@ -71,30 +63,25 @@ public class JobReconcilerTest {
         statelessUpgrade.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS);
         statelessUpgrade.getSpec().getFlinkConfiguration().put("new", "conf");
         reconciler.reconcile("test", statelessUpgrade, config);
-        Mockito.verify(flinkService, times(1))
-                .cancelJob(eq(JobID.fromHexString(JOB_ID)), eq(UpgradeMode.STATELESS), eq(config));
 
-        Mockito.verify(flinkService, times(1))
-                .submitApplicationCluster(eq(statelessUpgrade), eq(config));
+        runningJobs = flinkService.listJobs();
+        assertEquals(1, runningJobs.size());
+        assertNull(runningJobs.get(0).f0);
 
-        Mockito.clearInvocations(flinkService);
+        deployment
+                .getStatus()
+                .getJobStatus()
+                .setJobId(runningJobs.get(0).f1.getJobId().toHexString());
 
         // Test stateful upgrade
         FlinkDeployment statefulUpgrade = TestUtils.clone(deployment);
         statefulUpgrade.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
-        statefulUpgrade.getSpec().getFlinkConfiguration().put("new", "conf");
-
-        Mockito.doReturn(Optional.of("sp")).when(flinkService).cancelJob(any(), any(), any());
+        statefulUpgrade.getSpec().getFlinkConfiguration().put("new", "conf2");
 
         reconciler.reconcile("test", statefulUpgrade, new Configuration(config));
-        Mockito.verify(flinkService, times(1))
-                .cancelJob(eq(JobID.fromHexString(JOB_ID)), eq(UpgradeMode.SAVEPOINT), eq(config));
-
-        ArgumentCaptor<Configuration> argument = ArgumentCaptor.forClass(Configuration.class);
-        Mockito.verify(flinkService, times(1))
-                .submitApplicationCluster(eq(statefulUpgrade), argument.capture());
-        assertEquals("sp", argument.getValue().get(SavepointConfigOptions.SAVEPOINT_PATH));
 
-        Mockito.verifyNoMoreInteractions(flinkService);
+        runningJobs = flinkService.listJobs();
+        assertEquals(1, runningJobs.size());
+        assertEquals(TestingFlinkService.SAVEPOINT, runningJobs.get(0).f0);
     }
 }
diff --git a/pom.xml b/pom.xml
index 7f31747..8be4638 100644
--- a/pom.xml
+++ b/pom.xml
@@ -70,7 +70,6 @@ under the License.
 
         <spotless.version>2.4.2</spotless.version>
         <it.skip>true</it.skip>
-        <mockito.version>3.12.4</mockito.version>
     </properties>
 
     <dependencies>
@@ -81,15 +80,6 @@ under the License.
             <scope>provided</scope>
         </dependency>
 
-        <!-- Testing -->
-
-        <dependency>
-            <groupId>org.mockito</groupId>
-            <artifactId>mockito-core</artifactId>
-            <version>${mockito.version}</version>
-            <scope>test</scope>
-        </dependency>
-
         <!-- Logging -->
 
         <dependency>