You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2023/01/24 21:21:31 UTC

[flink-kubernetes-operator] branch main updated (410a0a32 -> e1682cc5)

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


    from 410a0a32 [FLINK-30705] Fix CR ref gen to use JsonProperty annotation values
     new cb30a70d [FLINK-30766][tests] Do not parameterize tests with all Flink versions
     new e1682cc5 [tests] Improve flaky RollbackTest

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../operator/reconciler/ReconciliationUtils.java   |  28 +++++-
 .../AbstractFlinkResourceReconciler.java           |  10 +-
 .../deployment/AbstractJobReconciler.java          |   9 +-
 .../reconciler/deployment/SessionReconciler.java   |   4 +-
 .../flink/kubernetes/operator/TestUtils.java       |  20 ++++
 .../operator/config/FlinkConfigBuilderTest.java    |   4 +-
 .../controller/DeploymentRecoveryTest.java         |  21 +---
 .../controller/FailedDeploymentRestartTest.java    |  18 +---
 .../controller/FlinkDeploymentControllerTest.java  |  25 +----
 .../operator/controller/RollbackTest.java          | 108 +++++++--------------
 .../TestingFlinkDeploymentController.java          |  11 ++-
 .../controller/UnhealthyDeploymentRestartTest.java |  18 +---
 .../deployment/ApplicationReconcilerTest.java      |   6 +-
 .../ApplicationReconcilerUpgradeModeTest.java      |  19 ++--
 .../operator/service/NativeFlinkServiceTest.java   |   4 +-
 15 files changed, 126 insertions(+), 179 deletions(-)


[flink-kubernetes-operator] 02/02: [tests] Improve flaky RollbackTest

Posted by gy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit e1682cc5ce48c7bfff390843f597fd0abb279bed
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Sat Jan 21 22:09:21 2023 +0100

    [tests] Improve flaky RollbackTest
---
 .../operator/reconciler/ReconciliationUtils.java   |  28 ++++-
 .../AbstractFlinkResourceReconciler.java           |  10 +-
 .../deployment/AbstractJobReconciler.java          |   9 +-
 .../reconciler/deployment/SessionReconciler.java   |   4 +-
 .../operator/controller/RollbackTest.java          | 116 +++++++--------------
 .../TestingFlinkDeploymentController.java          |  11 +-
 6 files changed, 80 insertions(+), 98 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index 6a7ec3de..20d7bd4c 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.kubernetes.operator.reconciler;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
@@ -46,6 +47,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.Clock;
 import java.time.Duration;
 import java.util.Optional;
 
@@ -68,12 +70,20 @@ public class ReconciliationUtils {
      *
      * @param target Target Flink resource.
      * @param conf Deployment configuration.
+     * @param clock Clock for getting time.
      * @param <SPEC> Spec type.
      */
     public static <SPEC extends AbstractFlinkSpec> void updateStatusForDeployedSpec(
-            AbstractFlinkResource<SPEC, ?> target, Configuration conf) {
+            AbstractFlinkResource<SPEC, ?> target, Configuration conf, Clock clock) {
         var job = target.getSpec().getJob();
-        updateStatusForSpecReconciliation(target, job != null ? job.getState() : null, conf, false);
+        updateStatusForSpecReconciliation(
+                target, job != null ? job.getState() : null, conf, false, clock);
+    }
+
+    @VisibleForTesting
+    public static <SPEC extends AbstractFlinkSpec> void updateStatusForDeployedSpec(
+            AbstractFlinkResource<SPEC, ?> target, Configuration conf) {
+        updateStatusForDeployedSpec(target, conf, Clock.systemDefaultZone());
     }
 
     /**
@@ -85,18 +95,26 @@ public class ReconciliationUtils {
      *
      * @param target Target Flink resource.
      * @param conf Deployment configuration.
+     * @param clock Clock for getting the current time.
      * @param <SPEC> Spec type.
      */
+    public static <SPEC extends AbstractFlinkSpec> void updateStatusBeforeDeploymentAttempt(
+            AbstractFlinkResource<SPEC, ?> target, Configuration conf, Clock clock) {
+        updateStatusForSpecReconciliation(target, JobState.SUSPENDED, conf, true, clock);
+    }
+
+    @VisibleForTesting
     public static <SPEC extends AbstractFlinkSpec> void updateStatusBeforeDeploymentAttempt(
             AbstractFlinkResource<SPEC, ?> target, Configuration conf) {
-        updateStatusForSpecReconciliation(target, JobState.SUSPENDED, conf, true);
+        updateStatusBeforeDeploymentAttempt(target, conf, Clock.systemDefaultZone());
     }
 
     private static <SPEC extends AbstractFlinkSpec> void updateStatusForSpecReconciliation(
             AbstractFlinkResource<SPEC, ?> target,
             JobState stateAfterReconcile,
             Configuration conf,
-            boolean upgrading) {
+            boolean upgrading,
+            Clock clock) {
 
         var status = target.getStatus();
         var spec = target.getSpec();
@@ -104,7 +122,7 @@ public class ReconciliationUtils {
 
         // Clear errors
         status.setError(null);
-        reconciliationStatus.setReconciliationTimestamp(System.currentTimeMillis());
+        reconciliationStatus.setReconciliationTimestamp(clock.instant().toEpochMilli());
         reconciliationStatus.setState(
                 upgrading ? ReconciliationState.UPGRADING : ReconciliationState.DEPLOYED);
 
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index 60cdf8e9..249d2f80 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -119,7 +119,7 @@ public abstract class AbstractFlinkResourceReconciler<
                     Optional.ofNullable(spec.getJob()).map(JobSpec::getInitialSavepointPath),
                     false);
 
-            ReconciliationUtils.updateStatusForDeployedSpec(cr, deployConfig);
+            ReconciliationUtils.updateStatusForDeployedSpec(cr, deployConfig, clock);
             return;
         }
 
@@ -203,7 +203,7 @@ public abstract class AbstractFlinkResourceReconciler<
 
             spec.getJob().setUpgradeMode(initialUpgradeMode);
         }
-        ReconciliationUtils.updateStatusBeforeDeploymentAttempt(cr, deployConfig);
+        ReconciliationUtils.updateStatusBeforeDeploymentAttempt(cr, deployConfig, clock);
         // Before we try to submit the job we record the current spec in the status so we can
         // handle subsequent deployment and status update errors
         statusRecorder.patchAndCacheStatus(cr);
@@ -298,7 +298,7 @@ public abstract class AbstractFlinkResourceReconciler<
         if (resource.getSpec().equals(deployedSpec)) {
             LOG.info(
                     "The new spec matches the currently deployed last stable spec. No upgrade needed.");
-            ReconciliationUtils.updateStatusForDeployedSpec(resource, deployConf);
+            ReconciliationUtils.updateStatusForDeployedSpec(resource, deployConf, clock);
             return true;
         }
         return false;
@@ -324,7 +324,7 @@ public abstract class AbstractFlinkResourceReconciler<
         boolean scaled = flinkService.scale(cr.getMetadata(), cr.getSpec().getJob(), deployConfig);
         if (scaled) {
             LOG.info("Scaling succeeded");
-            ReconciliationUtils.updateStatusForDeployedSpec(cr, deployConfig);
+            ReconciliationUtils.updateStatusForDeployedSpec(cr, deployConfig, clock);
             return true;
         }
         return false;
@@ -467,7 +467,7 @@ public abstract class AbstractFlinkResourceReconciler<
     }
 
     @VisibleForTesting
-    protected void setClock(Clock clock) {
+    public void setClock(Clock clock) {
         this.clock = clock;
     }
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index 5ba277e4..e9c170ca 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -109,9 +109,10 @@ public abstract class AbstractJobReconciler<
             currentDeploySpec.getJob().setUpgradeMode(availableUpgradeMode.get());
             cancelJob(ctx, availableUpgradeMode.get());
             if (desiredJobState == JobState.RUNNING) {
-                ReconciliationUtils.updateStatusBeforeDeploymentAttempt(resource, deployConfig);
+                ReconciliationUtils.updateStatusBeforeDeploymentAttempt(
+                        resource, deployConfig, clock);
             } else {
-                ReconciliationUtils.updateStatusForDeployedSpec(resource, deployConfig);
+                ReconciliationUtils.updateStatusForDeployedSpec(resource, deployConfig, clock);
             }
         }
 
@@ -123,7 +124,7 @@ public abstract class AbstractJobReconciler<
                         .setUpgradeMode(lastReconciledSpec.getJob().getUpgradeMode());
             }
             // We record the target spec into an upgrading state before deploying
-            ReconciliationUtils.updateStatusBeforeDeploymentAttempt(resource, deployConfig);
+            ReconciliationUtils.updateStatusBeforeDeploymentAttempt(resource, deployConfig, clock);
             statusRecorder.patchAndCacheStatus(resource);
 
             restoreJob(
@@ -133,7 +134,7 @@ public abstract class AbstractJobReconciler<
                     // We decide to enforce HA based on how job was previously suspended
                     lastReconciledSpec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE);
 
-            ReconciliationUtils.updateStatusForDeployedSpec(resource, deployConfig);
+            ReconciliationUtils.updateStatusForDeployedSpec(resource, deployConfig, clock);
         }
         return true;
     }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
index b8fb9a7e..fe5ad865 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
@@ -74,11 +74,11 @@ public class SessionReconciler
         deleteSessionCluster(ctx);
 
         // We record the target spec into an upgrading state before deploying
-        ReconciliationUtils.updateStatusBeforeDeploymentAttempt(deployment, deployConfig);
+        ReconciliationUtils.updateStatusBeforeDeploymentAttempt(deployment, deployConfig, clock);
         statusRecorder.patchAndCacheStatus(deployment);
 
         deploy(ctx, deployment.getSpec(), deployConfig, Optional.empty(), false);
-        ReconciliationUtils.updateStatusForDeployedSpec(deployment, deployConfig);
+        ReconciliationUtils.updateStatusForDeployedSpec(deployment, deployConfig, clock);
         return true;
     }
 
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
index b2194ff1..ecce2487 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
@@ -17,12 +17,10 @@
 
 package org.apache.flink.kubernetes.operator.controller;
 
-import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.api.spec.JobState;
 import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
@@ -31,18 +29,20 @@ import org.apache.flink.kubernetes.operator.api.status.Savepoint;
 import org.apache.flink.kubernetes.operator.api.status.SavepointTriggerType;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler;
 import org.apache.flink.util.function.ThrowingRunnable;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
 
-import java.util.ArrayList;
+import java.time.Clock;
+import java.time.Duration;
 import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -62,7 +62,7 @@ public class RollbackTest {
 
     private KubernetesClient kubernetesClient;
 
-    private static final int ROLLBACK_DELAY = 600;
+    private Clock testClock = Clock.systemDefaultZone();
 
     @BeforeEach
     public void setup() {
@@ -78,57 +78,12 @@ public class RollbackTest {
 
     @ParameterizedTest
     @EnumSource(
-            value = FlinkVersion.class,
-            names = {"v1_14", "v1_15"})
-    public void testRollbackWithSavepoint(FlinkVersion flinkVersion) throws Exception {
-        var dep = TestUtils.buildApplicationCluster(flinkVersion);
-        dep.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
-        var flinkConfiguration = dep.getSpec().getFlinkConfiguration();
-        flinkConfiguration.put(CheckpointingOptions.SAVEPOINT_DIRECTORY.key(), "sd");
-
-        List<String> savepoints = new ArrayList<>();
-        testRollback(
-                dep,
-                () -> {
-                    dep.getSpec().getJob().setParallelism(9999);
-                    testController.reconcile(dep, context);
-                    savepoints.add(
-                            dep.getStatus()
-                                    .getJobStatus()
-                                    .getSavepointInfo()
-                                    .getLastSavepoint()
-                                    .getLocation());
-                    assertEquals(
-                            JobState.SUSPENDED,
-                            dep.getStatus()
-                                    .getReconciliationStatus()
-                                    .deserializeLastReconciledSpec()
-                                    .getJob()
-                                    .getState());
-                    testController.reconcile(dep, context);
-
-                    // Trigger rollback by delaying the recovery
-                    Thread.sleep(ROLLBACK_DELAY);
-                    testController.reconcile(dep, context);
-                },
-                () -> {
-                    assertEquals("RUNNING", dep.getStatus().getJobStatus().getState());
-                    assertEquals(1, flinkService.listJobs().size());
-                    // Make sure we rolled back using the savepoint taken during upgrade
-                    assertEquals(savepoints.get(0), flinkService.listJobs().get(0).f0);
-                    dep.getSpec().setRestartNonce(10L);
-                    testController.reconcile(dep, context);
-                },
-                false);
-    }
-
-    @ParameterizedTest
-    @EnumSource(
-            value = FlinkVersion.class,
-            names = {"v1_14", "v1_15"})
-    public void testRollbackWithLastState(FlinkVersion flinkVersion) throws Exception {
-        var dep = TestUtils.buildApplicationCluster(flinkVersion);
-        dep.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
+            value = UpgradeMode.class,
+            names = {"SAVEPOINT", "LAST_STATE"})
+    public void testStatefulRollback(UpgradeMode upgradeMode) throws Exception {
+        var dep = TestUtils.buildApplicationCluster();
+        dep.getSpec().getJob().setUpgradeMode(upgradeMode);
+        offsetReconcilerClock(dep, Duration.ZERO);
 
         testRollback(
                 dep,
@@ -145,7 +100,7 @@ public class RollbackTest {
                     testController.reconcile(dep, context);
 
                     // Trigger rollback by delaying the recovery
-                    Thread.sleep(ROLLBACK_DELAY);
+                    offsetReconcilerClock(dep, Duration.ofSeconds(15));
                     testController.reconcile(dep, context);
                 },
                 () -> {
@@ -157,14 +112,12 @@ public class RollbackTest {
                 true);
     }
 
-    @ParameterizedTest
-    @EnumSource(
-            value = FlinkVersion.class,
-            names = {"v1_14", "v1_15"})
-    public void testRollbackFailureWithLastState(FlinkVersion flinkVersion) throws Exception {
-        var dep = TestUtils.buildApplicationCluster(flinkVersion);
+    @Test
+    public void testRollbackFailureWithLastState() throws Exception {
+        var dep = TestUtils.buildApplicationCluster();
         dep.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
         dep.getSpec().getFlinkConfiguration().put("t", "1");
+        offsetReconcilerClock(dep, Duration.ZERO);
 
         testRollback(
                 dep,
@@ -182,7 +135,7 @@ public class RollbackTest {
                     testController.reconcile(dep, context);
 
                     // Trigger rollback by delaying the recovery
-                    Thread.sleep(ROLLBACK_DELAY);
+                    offsetReconcilerClock(dep, Duration.ofSeconds(15));
                     testController.reconcile(dep, context);
                 },
                 () -> {
@@ -211,13 +164,11 @@ public class RollbackTest {
                 false);
     }
 
-    @ParameterizedTest
-    @EnumSource(
-            value = FlinkVersion.class,
-            names = {"v1_14", "v1_15"})
-    public void testRollbackStateless(FlinkVersion flinkVersion) throws Exception {
-        var dep = TestUtils.buildApplicationCluster(flinkVersion);
+    @Test
+    public void testRollbackStateless() throws Exception {
+        var dep = TestUtils.buildApplicationCluster();
         dep.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS);
+        offsetReconcilerClock(dep, Duration.ZERO);
 
         testRollback(
                 dep,
@@ -247,7 +198,7 @@ public class RollbackTest {
                                     "true");
 
                     // Trigger rollback by delaying the recovery
-                    Thread.sleep(ROLLBACK_DELAY);
+                    offsetReconcilerClock(dep, Duration.ofSeconds(15));
                     dep.getStatus()
                             .getJobStatus()
                             .getSavepointInfo()
@@ -266,19 +217,17 @@ public class RollbackTest {
                 true);
     }
 
-    @ParameterizedTest
-    @EnumSource(
-            value = FlinkVersion.class,
-            names = {"v1_14", "v1_15"})
-    public void testRollbackSession(FlinkVersion flinkVersion) throws Exception {
-        var dep = TestUtils.buildSessionCluster(flinkVersion);
+    @Test
+    public void testRollbackSession() throws Exception {
+        var dep = TestUtils.buildSessionCluster();
+        offsetReconcilerClock(dep, Duration.ZERO);
         testRollback(
                 dep,
                 () -> {
                     dep.getSpec().getFlinkConfiguration().put("random", "config");
                     testController.reconcile(dep, context);
                     // Trigger rollback by delaying the recovery
-                    Thread.sleep(ROLLBACK_DELAY);
+                    offsetReconcilerClock(dep, Duration.ofSeconds(15));
                     testController.reconcile(dep, context);
                 },
                 () -> {
@@ -301,7 +250,7 @@ public class RollbackTest {
         flinkConfiguration.put(
                 KubernetesOperatorConfigOptions.DEPLOYMENT_ROLLBACK_ENABLED.key(), "true");
         flinkConfiguration.put(
-                KubernetesOperatorConfigOptions.DEPLOYMENT_READINESS_TIMEOUT.key(), "400");
+                KubernetesOperatorConfigOptions.DEPLOYMENT_READINESS_TIMEOUT.key(), "10s");
 
         testController.reconcile(deployment, context);
 
@@ -390,7 +339,7 @@ public class RollbackTest {
             deployment.getSpec().getJob().setState(JobState.RUNNING);
             testController.reconcile(deployment, context);
             // Make sure we do not roll back to suspended state
-            Thread.sleep(ROLLBACK_DELAY);
+            offsetReconcilerClock(deployment, Duration.ofSeconds(15));
             testController.reconcile(deployment, context);
             testController.reconcile(deployment, context);
             assertTrue(
@@ -419,4 +368,11 @@ public class RollbackTest {
             assertNull(deployment.getStatus().getError());
         }
     }
+
+    private void offsetReconcilerClock(FlinkDeployment dep, Duration offset) {
+        testClock = Clock.offset(testClock, offset);
+        ((AbstractFlinkResourceReconciler<?, ?, ?>)
+                        testController.getReconcilerFactory().getOrCreate(dep))
+                .setClock(testClock);
+    }
 }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
index 442f9530..97e0adf6 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
@@ -58,6 +58,7 @@ public class TestingFlinkDeploymentController
                 EventSourceInitializer<FlinkDeployment>,
                 Cleaner<FlinkDeployment> {
 
+    private ReconcilerFactory reconcilerFactory;
     private FlinkDeploymentController flinkDeploymentController;
     private StatusUpdateCounter statusUpdateCounter = new StatusUpdateCounter();
     private EventCollector eventCollector = new EventCollector();
@@ -79,13 +80,15 @@ public class TestingFlinkDeploymentController
         eventRecorder = new EventRecorder(kubernetesClient, eventCollector);
         statusRecorder =
                 new StatusRecorder<>(kubernetesClient, new MetricManager<>(), statusUpdateCounter);
+        reconcilerFactory =
+                new ReconcilerFactory(
+                        kubernetesClient, configManager, eventRecorder, statusRecorder);
         flinkDeploymentController =
                 new FlinkDeploymentController(
                         configManager,
                         ValidatorUtils.discoverValidators(configManager),
                         ctxFactory,
-                        new ReconcilerFactory(
-                                kubernetesClient, configManager, eventRecorder, statusRecorder),
+                        reconcilerFactory,
                         new FlinkDeploymentObserverFactory(configManager, eventRecorder),
                         statusRecorder,
                         eventRecorder);
@@ -154,4 +157,8 @@ public class TestingFlinkDeploymentController
     public int getInternalStatusUpdateCount() {
         return statusUpdateCounter.getCount();
     }
+
+    public ReconcilerFactory getReconcilerFactory() {
+        return reconcilerFactory;
+    }
 }


[flink-kubernetes-operator] 01/02: [FLINK-30766][tests] Do not parameterize tests with all Flink versions

Posted by gy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit cb30a70dc961e6f61ddd370e828435112db152a0
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Sat Jan 21 20:57:20 2023 +0100

    [FLINK-30766][tests] Do not parameterize tests with all Flink versions
---
 .../flink/kubernetes/operator/TestUtils.java       | 20 +++++++++++++++++
 .../operator/config/FlinkConfigBuilderTest.java    |  4 ++--
 .../controller/DeploymentRecoveryTest.java         | 21 ++----------------
 .../controller/FailedDeploymentRestartTest.java    | 18 +---------------
 .../controller/FlinkDeploymentControllerTest.java  | 25 +++++-----------------
 .../operator/controller/RollbackTest.java          | 20 ++++++++++++-----
 .../controller/UnhealthyDeploymentRestartTest.java | 18 +---------------
 .../deployment/ApplicationReconcilerTest.java      |  6 +++---
 .../ApplicationReconcilerUpgradeModeTest.java      | 19 ++++++++--------
 .../operator/service/NativeFlinkServiceTest.java   |  4 ++--
 10 files changed, 60 insertions(+), 95 deletions(-)

diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index 1d99f256..f0d0941b 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -19,6 +19,8 @@ package org.apache.flink.kubernetes.operator;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
+import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.api.utils.BaseTestUtils;
 import org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
@@ -47,6 +49,7 @@ import io.javaoperatorsdk.operator.processing.event.EventSourceRetriever;
 import okhttp3.Headers;
 import okhttp3.mockwebserver.RecordedRequest;
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.provider.Arguments;
 
 import javax.annotation.Nullable;
 
@@ -58,6 +61,7 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.time.Instant;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -66,8 +70,10 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
+import java.util.stream.Stream;
 
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
 
 /** Testing utilities. */
 public class TestUtils extends BaseTestUtils {
@@ -271,6 +277,20 @@ public class TestUtils extends BaseTestUtils {
                 .build();
     }
 
+    public static Stream<Arguments> flinkVersionsAndUpgradeModes() {
+        List<Arguments> args = new ArrayList<>();
+        for (FlinkVersion version : Set.of(FlinkVersion.v1_14, FlinkVersion.v1_15)) {
+            for (UpgradeMode upgradeMode : UpgradeMode.values()) {
+                args.add(arguments(version, upgradeMode));
+            }
+        }
+        return args.stream();
+    }
+
+    public static Stream<Arguments> flinkVersions() {
+        return List.of(arguments(FlinkVersion.v1_14), arguments(FlinkVersion.v1_15)).stream();
+    }
+
     /** Testing ResponseProvider. */
     public static class ValidatingResponseProvider<T> implements ResponseProvider<Object> {
 
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
index fa46cec0..aa4f4443 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
@@ -53,7 +53,7 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.File;
 import java.io.IOException;
@@ -168,7 +168,7 @@ public class FlinkConfigBuilderTest {
     }
 
     @ParameterizedTest
-    @EnumSource(FlinkVersion.class)
+    @MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
     public void testApplyFlinkConfigurationShouldSetShutdownOnFinishBasedOnFlinkVersion(
             FlinkVersion flinkVersion) {
         flinkDeployment.getSpec().setFlinkVersion(flinkVersion);
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java
index 673e1319..22b893db 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java
@@ -32,17 +32,10 @@ import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.MethodSource;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.stream.Stream;
-
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.params.provider.Arguments.arguments;
 
 /** @link Missing deployment recovery tests */
 @EnableKubernetesMockClient(crud = true)
@@ -66,7 +59,7 @@ public class DeploymentRecoveryTest {
     }
 
     @ParameterizedTest
-    @MethodSource("applicationTestParams")
+    @MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersionsAndUpgradeModes")
     public void verifyApplicationJmRecovery(FlinkVersion flinkVersion, UpgradeMode upgradeMode)
             throws Exception {
         FlinkDeployment appCluster = TestUtils.buildApplicationCluster(flinkVersion);
@@ -144,7 +137,7 @@ public class DeploymentRecoveryTest {
     }
 
     @ParameterizedTest
-    @EnumSource(FlinkVersion.class)
+    @MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
     public void verifySessionJmRecovery(FlinkVersion flinkVersion) throws Exception {
         FlinkDeployment appCluster = TestUtils.buildSessionCluster(flinkVersion);
         testController.reconcile(appCluster, context);
@@ -170,14 +163,4 @@ public class DeploymentRecoveryTest {
                 JobManagerDeploymentStatus.READY,
                 appCluster.getStatus().getJobManagerDeploymentStatus());
     }
-
-    private static Stream<Arguments> applicationTestParams() {
-        List<Arguments> args = new ArrayList<>();
-        for (FlinkVersion version : FlinkVersion.values()) {
-            for (UpgradeMode upgradeMode : UpgradeMode.values()) {
-                args.add(arguments(version, upgradeMode));
-            }
-        }
-        return args.stream();
-    }
 }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FailedDeploymentRestartTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FailedDeploymentRestartTest.java
index f34d8040..de79a0db 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FailedDeploymentRestartTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FailedDeploymentRestartTest.java
@@ -31,16 +31,10 @@ import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.stream.Stream;
-
 import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_JOB_RESTART_FAILED;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.params.provider.Arguments.arguments;
 
 /** @link Unhealthy deployment restart tests */
 @EnableKubernetesMockClient(crud = true)
@@ -66,7 +60,7 @@ public class FailedDeploymentRestartTest {
     }
 
     @ParameterizedTest
-    @MethodSource("applicationTestParams")
+    @MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersionsAndUpgradeModes")
     public void verifyFailedApplicationRecovery(FlinkVersion flinkVersion, UpgradeMode upgradeMode)
             throws Exception {
         FlinkDeployment appCluster = TestUtils.buildApplicationCluster(flinkVersion);
@@ -103,14 +97,4 @@ public class FailedDeploymentRestartTest {
                 appCluster.getSpec(),
                 appCluster.getStatus().getReconciliationStatus().deserializeLastReconciledSpec());
     }
-
-    private static Stream<Arguments> applicationTestParams() {
-        List<Arguments> args = new ArrayList<>();
-        for (FlinkVersion version : FlinkVersion.values()) {
-            for (UpgradeMode upgradeMode : UpgradeMode.values()) {
-                args.add(arguments(version, upgradeMode));
-            }
-        }
-        return args.stream();
-    }
 }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index b0b09a32..cb212b99 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -53,17 +53,13 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED;
 import static org.apache.flink.kubernetes.operator.utils.EventRecorder.Reason.ValidationError;
@@ -74,7 +70,6 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
-import static org.junit.jupiter.params.provider.Arguments.arguments;
 
 /** {@link FlinkDeploymentController} tests. */
 @EnableKubernetesMockClient(crud = true)
@@ -99,7 +94,7 @@ public class FlinkDeploymentControllerTest {
     }
 
     @ParameterizedTest
-    @EnumSource(FlinkVersion.class)
+    @MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
     public void verifyBasicReconcileLoop(FlinkVersion flinkVersion) throws Exception {
 
         UpdateControl<FlinkDeployment> updateControl;
@@ -380,7 +375,7 @@ public class FlinkDeploymentControllerTest {
     }
 
     @ParameterizedTest
-    @EnumSource(FlinkVersion.class)
+    @MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
     public void verifyUpgradeFromSavepoint(FlinkVersion flinkVersion) throws Exception {
         FlinkDeployment appCluster = TestUtils.buildApplicationCluster(flinkVersion);
         appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
@@ -473,7 +468,7 @@ public class FlinkDeploymentControllerTest {
     }
 
     @ParameterizedTest
-    @EnumSource(FlinkVersion.class)
+    @MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
     public void verifyStatelessUpgrade(FlinkVersion flinkVersion) throws Exception {
         testController.events().clear();
         FlinkDeployment appCluster = TestUtils.buildApplicationCluster(flinkVersion);
@@ -625,13 +620,13 @@ public class FlinkDeploymentControllerTest {
     }
 
     @ParameterizedTest
-    @EnumSource(FlinkVersion.class)
+    @MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
     public void testUpgradeNotReadyClusterSession(FlinkVersion flinkVersion) throws Exception {
         testUpgradeNotReadyCluster(TestUtils.buildSessionCluster(flinkVersion));
     }
 
     @ParameterizedTest
-    @MethodSource("applicationTestParams")
+    @MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersionsAndUpgradeModes")
     public void testUpgradeNotReadyClusterApplication(
             FlinkVersion flinkVersion, UpgradeMode upgradeMode) throws Exception {
         var appCluster = TestUtils.buildApplicationCluster(flinkVersion);
@@ -1054,16 +1049,6 @@ public class FlinkDeploymentControllerTest {
                         .getHost());
     }
 
-    private static Stream<Arguments> applicationTestParams() {
-        List<Arguments> args = new ArrayList<>();
-        for (FlinkVersion version : FlinkVersion.values()) {
-            for (UpgradeMode upgradeMode : UpgradeMode.values()) {
-                args.add(arguments(version, upgradeMode));
-            }
-        }
-        return args.stream();
-    }
-
     @Test
     public void testInitialSavepointOnError() throws Exception {
         FlinkDeployment flinkDeployment = TestUtils.buildApplicationCluster();
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
index f2fc4d05..b2194ff1 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
@@ -77,7 +77,9 @@ public class RollbackTest {
     }
 
     @ParameterizedTest
-    @EnumSource(FlinkVersion.class)
+    @EnumSource(
+            value = FlinkVersion.class,
+            names = {"v1_14", "v1_15"})
     public void testRollbackWithSavepoint(FlinkVersion flinkVersion) throws Exception {
         var dep = TestUtils.buildApplicationCluster(flinkVersion);
         dep.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
@@ -121,7 +123,9 @@ public class RollbackTest {
     }
 
     @ParameterizedTest
-    @EnumSource(FlinkVersion.class)
+    @EnumSource(
+            value = FlinkVersion.class,
+            names = {"v1_14", "v1_15"})
     public void testRollbackWithLastState(FlinkVersion flinkVersion) throws Exception {
         var dep = TestUtils.buildApplicationCluster(flinkVersion);
         dep.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
@@ -154,7 +158,9 @@ public class RollbackTest {
     }
 
     @ParameterizedTest
-    @EnumSource(FlinkVersion.class)
+    @EnumSource(
+            value = FlinkVersion.class,
+            names = {"v1_14", "v1_15"})
     public void testRollbackFailureWithLastState(FlinkVersion flinkVersion) throws Exception {
         var dep = TestUtils.buildApplicationCluster(flinkVersion);
         dep.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
@@ -206,7 +212,9 @@ public class RollbackTest {
     }
 
     @ParameterizedTest
-    @EnumSource(FlinkVersion.class)
+    @EnumSource(
+            value = FlinkVersion.class,
+            names = {"v1_14", "v1_15"})
     public void testRollbackStateless(FlinkVersion flinkVersion) throws Exception {
         var dep = TestUtils.buildApplicationCluster(flinkVersion);
         dep.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS);
@@ -259,7 +267,9 @@ public class RollbackTest {
     }
 
     @ParameterizedTest
-    @EnumSource(FlinkVersion.class)
+    @EnumSource(
+            value = FlinkVersion.class,
+            names = {"v1_14", "v1_15"})
     public void testRollbackSession(FlinkVersion flinkVersion) throws Exception {
         var dep = TestUtils.buildSessionCluster(flinkVersion);
         testRollback(
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
index a4f67544..e5814a0a 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
@@ -31,16 +31,10 @@ import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.stream.Stream;
-
 import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.params.provider.Arguments.arguments;
 
 /** @link Unhealthy deployment restart tests */
 @EnableKubernetesMockClient(crud = true)
@@ -69,7 +63,7 @@ public class UnhealthyDeploymentRestartTest {
     }
 
     @ParameterizedTest
-    @MethodSource("applicationTestParams")
+    @MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersionsAndUpgradeModes")
     public void verifyApplicationUnhealthyJmRecovery(
             FlinkVersion flinkVersion, UpgradeMode upgradeMode) throws Exception {
         FlinkDeployment appCluster = TestUtils.buildApplicationCluster(flinkVersion);
@@ -101,14 +95,4 @@ public class UnhealthyDeploymentRestartTest {
                 appCluster.getStatus().getJobManagerDeploymentStatus());
         assertEquals("RUNNING", appCluster.getStatus().getJobStatus().getState());
     }
-
-    private static Stream<Arguments> applicationTestParams() {
-        List<Arguments> args = new ArrayList<>();
-        for (FlinkVersion version : FlinkVersion.values()) {
-            for (UpgradeMode upgradeMode : UpgradeMode.values()) {
-                args.add(arguments(version, upgradeMode));
-            }
-        }
-        return args.stream();
-    }
 }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index 1ee4436f..4ca8618d 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -58,7 +58,7 @@ import lombok.Getter;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.platform.commons.util.StringUtils;
 
 import java.time.Clock;
@@ -99,7 +99,7 @@ public class ApplicationReconcilerTest extends OperatorTestBase {
     }
 
     @ParameterizedTest
-    @EnumSource(FlinkVersion.class)
+    @MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
     public void testUpgrade(FlinkVersion flinkVersion) throws Exception {
         FlinkDeployment deployment = TestUtils.buildApplicationCluster(flinkVersion);
 
@@ -587,7 +587,7 @@ public class ApplicationReconcilerTest extends OperatorTestBase {
     }
 
     @ParameterizedTest
-    @EnumSource(FlinkVersion.class)
+    @MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
     public void verifyJobIdNotResetDuringLastStateRecovery(FlinkVersion flinkVersion) {
         FlinkDeployment deployment = TestUtils.buildApplicationCluster(flinkVersion);
 
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
index 2da7da90..42b0c7ce 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
@@ -46,7 +46,6 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.ArrayList;
@@ -76,19 +75,19 @@ public class ApplicationReconcilerUpgradeModeTest extends OperatorTestBase {
     }
 
     @ParameterizedTest
-    @EnumSource(FlinkVersion.class)
+    @MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
     public void testUpgradeFromStatelessToStateless(FlinkVersion flinkVersion) throws Exception {
         testUpgradeToStateless(flinkVersion, UpgradeMode.STATELESS);
     }
 
     @ParameterizedTest
-    @EnumSource(FlinkVersion.class)
+    @MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
     public void testUpgradeFromSavepointToStateless(FlinkVersion flinkVersion) throws Exception {
         testUpgradeToStateless(flinkVersion, UpgradeMode.SAVEPOINT);
     }
 
     @ParameterizedTest
-    @EnumSource(FlinkVersion.class)
+    @MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
     public void testUpgradeFromLastStateToStateless(FlinkVersion flinkVersion) throws Exception {
         testUpgradeToStateless(flinkVersion, UpgradeMode.LAST_STATE);
     }
@@ -114,19 +113,19 @@ public class ApplicationReconcilerUpgradeModeTest extends OperatorTestBase {
     }
 
     @ParameterizedTest
-    @EnumSource(FlinkVersion.class)
+    @MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
     public void testUpgradeFromStatelessToSavepoint(FlinkVersion flinkVersion) throws Exception {
         testUpgradeToSavepoint(flinkVersion, UpgradeMode.STATELESS);
     }
 
     @ParameterizedTest
-    @EnumSource(FlinkVersion.class)
+    @MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
     public void testUpgradeFromSavepointToSavepoint(FlinkVersion flinkVersion) throws Exception {
         testUpgradeToSavepoint(flinkVersion, UpgradeMode.SAVEPOINT);
     }
 
     @ParameterizedTest
-    @EnumSource(FlinkVersion.class)
+    @MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
     public void testUpgradeFromLastStateToSavepoint(FlinkVersion flinkVersion) throws Exception {
         testUpgradeToSavepoint(flinkVersion, UpgradeMode.LAST_STATE);
     }
@@ -164,19 +163,19 @@ public class ApplicationReconcilerUpgradeModeTest extends OperatorTestBase {
     }
 
     @ParameterizedTest
-    @EnumSource(FlinkVersion.class)
+    @MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
     public void testUpgradeFromStatelessToLastState(FlinkVersion flinkVersion) throws Exception {
         testUpgradeToLastState(flinkVersion, UpgradeMode.STATELESS);
     }
 
     @ParameterizedTest
-    @EnumSource(FlinkVersion.class)
+    @MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
     public void testUpgradeFromSavepointToLastState(FlinkVersion flinkVersion) throws Exception {
         testUpgradeToLastState(flinkVersion, UpgradeMode.SAVEPOINT);
     }
 
     @ParameterizedTest
-    @EnumSource(FlinkVersion.class)
+    @MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
     public void testUpgradeFromLastStateToLastState(FlinkVersion flinkVersion) throws Exception {
         testUpgradeToLastState(flinkVersion, UpgradeMode.LAST_STATE);
     }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
index a0ba485c..59bf99a7 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
@@ -57,7 +57,7 @@ import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -117,7 +117,7 @@ public class NativeFlinkServiceTest {
     }
 
     @ParameterizedTest
-    @EnumSource(FlinkVersion.class)
+    @MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
     public void testCancelJobWithSavepointUpgradeMode(FlinkVersion flinkVersion) throws Exception {
         final TestingClusterClient<String> testingClusterClient =
                 new TestingClusterClient<>(configuration, TestUtils.TEST_DEPLOYMENT_NAME);