You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/02/24 15:56:38 UTC

[flink] branch master updated (f533566 -> 8cdd0b85)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from f533566  [FLINK-25793][connector/base] Fix high thourouput bug for throttling destinations.
     new 881d4c6  [hotfix][tests] Lazily create MiniClusterResourceConfiguraiton
     new 576354c  [hotfix][tests] Restructure AbstractHaJobRunITCase
     new 8cdd0b85 [FLINK-26252][tests] Refactor MiniClusterExtension to support JUnit 5 parallel tests

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/architecture/common/Predicates.java      |  50 +++-
 .../flink/architecture/rules/ITCaseRules.java      |  69 +++--
 .../84abeb9c-8355-4165-96aa-dda65b04e5e7           |   7 +-
 .../a6cbd99c-b115-447a-8f19-43c1094db549           |   7 +-
 .../8ab2328f-b38b-4e34-b768-0deb6b6171fb           |  14 +-
 .../source/reader/AlignedWatermarksITCase.java     |  16 +-
 .../dc1ba6f4-3d84-498c-a085-e02ba5936201           |   7 +-
 .../dd583797-83e1-414c-a38d-330773978813           |   7 +-
 .../db3972e4-f3a3-45b2-9643-27cba0cef09d           |  14 +-
 .../e1f30f33-c61c-4707-8c78-a3a80479564e           |  14 +-
 .../f5e3e868-8d92-4258-9654-a605dc9c550f           |  21 +-
 .../26d337fc-45c4-4d03-a84a-6692c37fafbc           |  77 +++++-
 .../6b9ab1b0-c14d-4667-bab5-407b81fba98b           |  21 +-
 .../97dda445-f6bc-43e2-8106-5876ca0cd052           |  91 ++++++-
 .../connector/pulsar/sink/PulsarSinkITCase.java    |  11 +-
 flink-filesystems/flink-s3-fs-base/pom.xml         |   6 +
 .../fs/s3/common/HAJobRunOnMinioS3StoreITCase.java |  33 ++-
 flink-filesystems/flink-s3-fs-hadoop/pom.xml       |   6 +
 flink-filesystems/flink-s3-fs-presto/pom.xml       |   6 +
 .../flink/formats/csv/DataStreamCsvITCase.java     |   8 +-
 .../jobmanager/JMXJobManagerMetricTest.java        |  17 +-
 .../highavailability/AbstractHAJobRunITCase.java   |  45 +---
 .../TaskCancelAsyncProducerConsumerITCase.java     |  18 +-
 .../testutils/InternalMiniClusterExtension.java    | 112 ++++++++
 .../runtime/testutils/MiniClusterExtension.java    |  63 -----
 .../junit5/InjectClusterClientConfiguration.java   |  13 +-
 .../test/junit5/InjectClusterRESTAddress.java      |  13 +-
 .../flink/test/junit5/InjectMiniCluster.java       |  12 +-
 .../flink/test/junit5/InjectClusterClient.java}    |  14 +-
 .../flink/test/junit5/MiniClusterExtension.java    | 297 +++++++++++++++++++++
 .../test/util/MiniClusterWithClientExtension.java  | 123 ---------
 31 files changed, 857 insertions(+), 355 deletions(-)
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InternalMiniClusterExtension.java
 delete mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterExtension.java
 copy flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/annotations/TestSemantics.java => flink-runtime/src/test/java/org/apache/flink/test/junit5/InjectClusterClientConfiguration.java (73%)
 copy flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/annotations/TestSemantics.java => flink-runtime/src/test/java/org/apache/flink/test/junit5/InjectClusterRESTAddress.java (77%)
 copy flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/annotations/TestSemantics.java => flink-runtime/src/test/java/org/apache/flink/test/junit5/InjectMiniCluster.java (76%)
 copy flink-test-utils-parent/{flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/annotations/TestSemantics.java => flink-test-utils/src/main/java/org/apache/flink/test/junit5/InjectClusterClient.java} (73%)
 create mode 100644 flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/junit5/MiniClusterExtension.java
 delete mode 100644 flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterWithClientExtension.java

[flink] 03/03: [FLINK-26252][tests] Refactor MiniClusterExtension to support JUnit 5 parallel tests

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8cdd0b85c3ab2bfbe47aa5b680cdcbaf186c0731
Author: slinkydeveloper <fr...@gmail.com>
AuthorDate: Tue Feb 22 16:07:05 2022 +0100

    [FLINK-26252][tests] Refactor MiniClusterExtension to support JUnit 5 parallel tests
---
 .../flink/architecture/common/Predicates.java      |  50 +++-
 .../flink/architecture/rules/ITCaseRules.java      |  69 +++--
 .../84abeb9c-8355-4165-96aa-dda65b04e5e7           |   7 +-
 .../a6cbd99c-b115-447a-8f19-43c1094db549           |   7 +-
 .../8ab2328f-b38b-4e34-b768-0deb6b6171fb           |  14 +-
 .../source/reader/AlignedWatermarksITCase.java     |  16 +-
 .../dc1ba6f4-3d84-498c-a085-e02ba5936201           |   7 +-
 .../dd583797-83e1-414c-a38d-330773978813           |   7 +-
 .../db3972e4-f3a3-45b2-9643-27cba0cef09d           |  14 +-
 .../e1f30f33-c61c-4707-8c78-a3a80479564e           |  14 +-
 .../f5e3e868-8d92-4258-9654-a605dc9c550f           |  21 +-
 .../26d337fc-45c4-4d03-a84a-6692c37fafbc           |  77 +++++-
 .../6b9ab1b0-c14d-4667-bab5-407b81fba98b           |  21 +-
 .../97dda445-f6bc-43e2-8106-5876ca0cd052           |  91 ++++++-
 .../connector/pulsar/sink/PulsarSinkITCase.java    |  11 +-
 .../fs/s3/common/HAJobRunOnMinioS3StoreITCase.java |  28 +-
 .../flink/formats/csv/DataStreamCsvITCase.java     |   8 +-
 .../jobmanager/JMXJobManagerMetricTest.java        |  17 +-
 .../highavailability/AbstractHAJobRunITCase.java   |  11 +-
 .../TaskCancelAsyncProducerConsumerITCase.java     |  18 +-
 .../testutils/InternalMiniClusterExtension.java    | 112 ++++++++
 .../runtime/testutils/MiniClusterExtension.java    |  79 ------
 .../junit5/InjectClusterClientConfiguration.java   |  38 +++
 .../test/junit5/InjectClusterRESTAddress.java      |  38 +++
 .../flink/test/junit5/InjectMiniCluster.java       |  37 +++
 .../flink/test/junit5/InjectClusterClient.java     |  39 +++
 .../flink/test/junit5/MiniClusterExtension.java    | 297 +++++++++++++++++++++
 .../test/util/MiniClusterWithClientExtension.java  | 123 ---------
 28 files changed, 943 insertions(+), 328 deletions(-)

diff --git a/flink-architecture-tests/flink-architecture-tests-base/src/main/java/org/apache/flink/architecture/common/Predicates.java b/flink-architecture-tests/flink-architecture-tests-base/src/main/java/org/apache/flink/architecture/common/Predicates.java
index 56e6c11..d45cb56 100644
--- a/flink-architecture-tests/flink-architecture-tests-base/src/main/java/org/apache/flink/architecture/common/Predicates.java
+++ b/flink-architecture-tests/flink-architecture-tests-base/src/main/java/org/apache/flink/architecture/common/Predicates.java
@@ -29,6 +29,7 @@ import com.tngtech.archunit.core.domain.properties.CanBeAnnotated;
 import java.lang.annotation.Annotation;
 import java.util.Arrays;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import static com.tngtech.archunit.lang.conditions.ArchPredicates.is;
 import static org.apache.flink.architecture.common.JavaFieldPredicates.annotatedWith;
@@ -76,11 +77,26 @@ public class Predicates {
      * <p>Attention: changing the description will add a rule into the stored.rules.
      */
     public static DescribedPredicate<JavaField> arePublicStaticOfType(Class<?> clazz) {
+        return areFieldOfType(clazz, JavaModifier.PUBLIC, JavaModifier.STATIC);
+    }
+
+    /**
+     * Tests that the given field is of the given type {@code clazz} and has given modifiers.
+     *
+     * <p>Attention: changing the description will add a rule into the stored.rules.
+     */
+    public static DescribedPredicate<JavaField> areFieldOfType(
+            Class<?> clazz, JavaModifier... modifiers) {
         return DescribedPredicate.describe(
-                "are public, static, and of type " + clazz.getSimpleName(),
+                String.format(
+                        "are %s, and of type %s",
+                        Arrays.stream(modifiers)
+                                .map(JavaModifier::toString)
+                                .map(String::toLowerCase)
+                                .collect(Collectors.joining(", ")),
+                        clazz.getSimpleName()),
                 field ->
-                        field.getModifiers().contains(JavaModifier.PUBLIC)
-                                && field.getModifiers().contains(JavaModifier.STATIC)
+                        field.getModifiers().containsAll(Arrays.asList(modifiers))
                                 && field.getRawType().isEquivalentTo(clazz));
     }
 
@@ -126,6 +142,34 @@ public class Predicates {
         return arePublicStaticFinalOfType(clazz).and(annotatedWith(annotationType));
     }
 
+    /**
+     * Tests that the given field is {@code static final} and of the given type {@code clazz} with
+     * exactly the given {@code annotationType}. It doesn't matter if public, private or protected.
+     */
+    public static DescribedPredicate<JavaField> areStaticFinalOfTypeWithAnnotation(
+            Class<?> clazz, Class<? extends Annotation> annotationType) {
+        return areFieldOfType(clazz, JavaModifier.STATIC, JavaModifier.FINAL)
+                .and(annotatedWith(annotationType));
+    }
+
+    /**
+     * Returns a {@link DescribedPredicate} that returns true if one and only one of the given
+     * predicates match.
+     */
+    @SafeVarargs
+    public static <T> DescribedPredicate<T> exactlyOneOf(
+            final DescribedPredicate<? super T>... other) {
+        return DescribedPredicate.describe(
+                "only one of the following predicates match:\n"
+                        + Arrays.stream(other)
+                                .map(dp -> "* " + dp + "\n")
+                                .collect(Collectors.joining()),
+                t ->
+                        Arrays.stream(other)
+                                .map(dp -> dp.apply(t))
+                                .reduce(false, Boolean::logicalXor));
+    }
+
     private Predicates() {}
 
     /**
diff --git a/flink-architecture-tests/flink-architecture-tests-test/src/main/java/org/apache/flink/architecture/rules/ITCaseRules.java b/flink-architecture-tests/flink-architecture-tests-test/src/main/java/org/apache/flink/architecture/rules/ITCaseRules.java
index b9afc4d..e679f5c 100644
--- a/flink-architecture-tests/flink-architecture-tests-test/src/main/java/org/apache/flink/architecture/rules/ITCaseRules.java
+++ b/flink-architecture-tests/flink-architecture-tests-test/src/main/java/org/apache/flink/architecture/rules/ITCaseRules.java
@@ -18,8 +18,9 @@
 
 package org.apache.flink.architecture.rules;
 
-import org.apache.flink.core.testutils.AllCallbackWrapper;
-import org.apache.flink.runtime.testutils.MiniClusterExtension;
+import org.apache.flink.architecture.common.Predicates;
+import org.apache.flink.runtime.testutils.InternalMiniClusterExtension;
+import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 
@@ -29,15 +30,19 @@ import com.tngtech.archunit.junit.ArchTest;
 import com.tngtech.archunit.lang.ArchRule;
 import org.junit.ClassRule;
 import org.junit.Rule;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.Extension;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
+import java.util.Arrays;
+
 import static com.tngtech.archunit.core.domain.JavaModifier.ABSTRACT;
 import static com.tngtech.archunit.library.freeze.FreezingArchRule.freeze;
 import static org.apache.flink.architecture.common.Conditions.fulfill;
 import static org.apache.flink.architecture.common.GivenJavaClasses.javaClassesThat;
 import static org.apache.flink.architecture.common.Predicates.arePublicFinalOfTypeWithAnnotation;
-import static org.apache.flink.architecture.common.Predicates.arePublicStaticFinalAssignableTo;
 import static org.apache.flink.architecture.common.Predicates.arePublicStaticFinalOfTypeWithAnnotation;
+import static org.apache.flink.architecture.common.Predicates.areStaticFinalOfTypeWithAnnotation;
 import static org.apache.flink.architecture.common.Predicates.containAnyFieldsInClassHierarchyThat;
 
 /** Rules for Integration Tests. */
@@ -61,18 +66,21 @@ public class ITCaseRules {
      * <p>1. For JUnit 5 test, both fields are required like:
      *
      * <pre>{@code
+     * @RegisterExtension
      * public static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
      *         new MiniClusterExtension(
      *                 new MiniClusterResourceConfiguration.Builder()
      *                         .setConfiguration(getFlinkConfiguration())
      *                         .build());
+     * }</pre>
      *
-     * @RegisterExtension
-     * public static AllCallbackWrapper allCallbackWrapper =
-     *         new AllCallbackWrapper(MINI_CLUSTER_RESOURCE);
+     * <p>2. For JUnit 5 test, use {@link ExtendWith}:
+     *
+     * <pre>{@code
+     * @ExtendWith(MiniClusterExtension.class)
      * }</pre>
      *
-     * <p>2. For JUnit 4 test via @Rule like:
+     * <p>3. For JUnit 4 test via @Rule like:
      *
      * <pre>{@code
      * @Rule
@@ -86,7 +94,7 @@ public class ITCaseRules {
      *                          .build());
      * }</pre>
      *
-     * <p>3. For JUnit 4 test via @ClassRule like:
+     * <p>4. For JUnit 4 test via @ClassRule like:
      *
      * <pre>{@code
      * @ClassRule
@@ -110,7 +118,6 @@ public class ITCaseRules {
                                             fulfill(
                                                     // JUnit 5 violation check
                                                     miniClusterExtensionRule()
-                                                            .and(allCallbackWrapper())
                                                             // JUnit 4 violation check, which should
                                                             // be
                                                             // removed
@@ -135,14 +142,46 @@ public class ITCaseRules {
                         MiniClusterWithClientResource.class, Rule.class));
     }
 
+    private static DescribedPredicate<JavaClass> inFlinkRuntimePackages() {
+        return JavaClass.Predicates.resideInAPackage("org.apache.flink.runtime.*");
+    }
+
+    private static DescribedPredicate<JavaClass> outsideFlinkRuntimePackages() {
+        return JavaClass.Predicates.resideOutsideOfPackage("org.apache.flink.runtime.*");
+    }
+
     private static DescribedPredicate<JavaClass> miniClusterExtensionRule() {
-        return containAnyFieldsInClassHierarchyThat(
-                arePublicStaticFinalAssignableTo(MiniClusterExtension.class));
+        // Only flink-runtime should use InternalMiniClusterExtension,
+        // other packages should use MiniClusterExtension
+        return Predicates.exactlyOneOf(
+                inFlinkRuntimePackages()
+                        .and(
+                                containAnyFieldsInClassHierarchyThat(
+                                        areStaticFinalOfTypeWithAnnotation(
+                                                InternalMiniClusterExtension.class,
+                                                RegisterExtension.class))),
+                outsideFlinkRuntimePackages()
+                        .and(
+                                containAnyFieldsInClassHierarchyThat(
+                                        areStaticFinalOfTypeWithAnnotation(
+                                                MiniClusterExtension.class,
+                                                RegisterExtension.class))),
+                inFlinkRuntimePackages()
+                        .and(
+                                isAnnotatedWithExtendWithUsingExtension(
+                                        InternalMiniClusterExtension.class)),
+                outsideFlinkRuntimePackages()
+                        .and(isAnnotatedWithExtendWithUsingExtension(MiniClusterExtension.class)));
     }
 
-    private static DescribedPredicate<JavaClass> allCallbackWrapper() {
-        return containAnyFieldsInClassHierarchyThat(
-                arePublicStaticFinalOfTypeWithAnnotation(
-                        AllCallbackWrapper.class, RegisterExtension.class));
+    private static DescribedPredicate<JavaClass> isAnnotatedWithExtendWithUsingExtension(
+            Class<? extends Extension> extensionClass) {
+        return DescribedPredicate.describe(
+                "is annotated with @ExtendWith with class " + extensionClass.getSimpleName(),
+                clazz ->
+                        clazz.isAnnotatedWith(ExtendWith.class)
+                                && Arrays.asList(
+                                                clazz.getAnnotationOfType(ExtendWith.class).value())
+                                        .contains(extensionClass));
     }
 }
diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/archunit-violations/84abeb9c-8355-4165-96aa-dda65b04e5e7 b/flink-connectors/flink-connector-aws-kinesis-data-streams/archunit-violations/84abeb9c-8355-4165-96aa-dda65b04e5e7
index f79360e..a370e4c 100644
--- a/flink-connectors/flink-connector-aws-kinesis-data-streams/archunit-violations/84abeb9c-8355-4165-96aa-dda65b04e5e7
+++ b/flink-connectors/flink-connector-aws-kinesis-data-streams/archunit-violations/84abeb9c-8355-4165-96aa-dda65b04e5e7
@@ -1 +1,6 @@
-org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSinkITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWit [...]
\ No newline at end of file
+org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSinkITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
\ No newline at end of file
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/archunit-violations/a6cbd99c-b115-447a-8f19-43c1094db549 b/flink-connectors/flink-connector-aws-kinesis-firehose/archunit-violations/a6cbd99c-b115-447a-8f19-43c1094db549
index 0cdac11..5ad7b14 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/archunit-violations/a6cbd99c-b115-447a-8f19-43c1094db549
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/archunit-violations/a6cbd99c-b115-447a-8f19-43c1094db549
@@ -1 +1,6 @@
-org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithC [...]
\ No newline at end of file
+org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
\ No newline at end of file
diff --git a/flink-connectors/flink-connector-base/archunit-violations/8ab2328f-b38b-4e34-b768-0deb6b6171fb b/flink-connectors/flink-connector-base/archunit-violations/8ab2328f-b38b-4e34-b768-0deb6b6171fb
index d81373f..03ba5016 100644
--- a/flink-connectors/flink-connector-base/archunit-violations/8ab2328f-b38b-4e34-b768-0deb6b6171fb
+++ b/flink-connectors/flink-connector-base/archunit-violations/8ab2328f-b38b-4e34-b768-0deb6b6171fb
@@ -1,2 +1,12 @@
-org.apache.flink.connector.base.sink.AsyncSinkBaseITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResou [...]
-org.apache.flink.connector.base.source.reader.CoordinatedSourceRescaleITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniCl [...]
\ No newline at end of file
+org.apache.flink.connector.base.sink.AsyncSinkBaseITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.connector.base.source.reader.CoordinatedSourceRescaleITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
\ No newline at end of file
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/AlignedWatermarksITCase.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/AlignedWatermarksITCase.java
index 8410751..bf36e2c 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/AlignedWatermarksITCase.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/AlignedWatermarksITCase.java
@@ -27,7 +27,6 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.testutils.AllCallbackWrapper;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -39,7 +38,8 @@ import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
-import org.apache.flink.test.util.MiniClusterWithClientExtension;
+import org.apache.flink.test.junit5.InjectMiniCluster;
+import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.flink.testutils.logging.LoggerAuditingExtension;
 
 import org.junit.jupiter.api.Test;
@@ -71,21 +71,17 @@ public class AlignedWatermarksITCase {
 
     private static final InMemoryReporter reporter = InMemoryReporter.createWithRetainedMetrics();
 
-    public static final MiniClusterWithClientExtension MINI_CLUSTER_RESOURCE =
-            new MiniClusterWithClientExtension(
+    @RegisterExtension
+    private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+            new MiniClusterExtension(
                     new MiniClusterResourceConfiguration.Builder()
                             .setNumberTaskManagers(1)
                             .setConfiguration(reporter.addToConfiguration(new Configuration()))
                             .build());
 
-    @RegisterExtension
-    public static final AllCallbackWrapper<MiniClusterWithClientExtension> ALL_WRAPPER =
-            new AllCallbackWrapper<>(MINI_CLUSTER_RESOURCE);
-
     @Test
-    public void testAlignment() throws Exception {
+    public void testAlignment(@InjectMiniCluster MiniCluster miniCluster) throws Exception {
         final JobGraph jobGraph = getJobGraph();
-        final MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
         final CompletableFuture<JobSubmissionResult> submission = miniCluster.submitJob(jobGraph);
         final JobID jobID = submission.get().getJobID();
         CommonTestUtils.waitForAllTaskRunning(miniCluster, jobID, false);
diff --git a/flink-connectors/flink-connector-cassandra/archunit-violations/dc1ba6f4-3d84-498c-a085-e02ba5936201 b/flink-connectors/flink-connector-cassandra/archunit-violations/dc1ba6f4-3d84-498c-a085-e02ba5936201
index 895e122..15fa3a7 100644
--- a/flink-connectors/flink-connector-cassandra/archunit-violations/dc1ba6f4-3d84-498c-a085-e02ba5936201
+++ b/flink-connectors/flink-connector-cassandra/archunit-violations/dc1ba6f4-3d84-498c-a085-e02ba5936201
@@ -1 +1,6 @@
-org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniCluste [...]
\ No newline at end of file
+org.apache.flink.streaming.connectors.cassandra.CassandraConnectorITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
\ No newline at end of file
diff --git a/flink-connectors/flink-connector-elasticsearch-base/archunit-violations/dd583797-83e1-414c-a38d-330773978813 b/flink-connectors/flink-connector-elasticsearch-base/archunit-violations/dd583797-83e1-414c-a38d-330773978813
index 249f361..a215b58 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/archunit-violations/dd583797-83e1-414c-a38d-330773978813
+++ b/flink-connectors/flink-connector-elasticsearch-base/archunit-violations/dd583797-83e1-414c-a38d-330773978813
@@ -1 +1,6 @@
-org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriterITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniCluster [...]
\ No newline at end of file
+org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriterITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
\ No newline at end of file
diff --git a/flink-connectors/flink-connector-elasticsearch6/archunit-violations/db3972e4-f3a3-45b2-9643-27cba0cef09d b/flink-connectors/flink-connector-elasticsearch6/archunit-violations/db3972e4-f3a3-45b2-9643-27cba0cef09d
index 2520fe8..341409a 100644
--- a/flink-connectors/flink-connector-elasticsearch6/archunit-violations/db3972e4-f3a3-45b2-9643-27cba0cef09d
+++ b/flink-connectors/flink-connector-elasticsearch6/archunit-violations/db3972e4-f3a3-45b2-9643-27cba0cef09d
@@ -1,2 +1,12 @@
-org.apache.flink.connector.elasticsearch.sink.Elasticsearch6SinkITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterW [...]
-org.apache.flink.connector.elasticsearch.table.Elasticsearch6DynamicSinkITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type Mini [...]
\ No newline at end of file
+org.apache.flink.connector.elasticsearch.sink.Elasticsearch6SinkITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.connector.elasticsearch.table.Elasticsearch6DynamicSinkITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
\ No newline at end of file
diff --git a/flink-connectors/flink-connector-elasticsearch7/archunit-violations/e1f30f33-c61c-4707-8c78-a3a80479564e b/flink-connectors/flink-connector-elasticsearch7/archunit-violations/e1f30f33-c61c-4707-8c78-a3a80479564e
index 854c270..e6b90b9 100644
--- a/flink-connectors/flink-connector-elasticsearch7/archunit-violations/e1f30f33-c61c-4707-8c78-a3a80479564e
+++ b/flink-connectors/flink-connector-elasticsearch7/archunit-violations/e1f30f33-c61c-4707-8c78-a3a80479564e
@@ -1,2 +1,12 @@
-org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterW [...]
-org.apache.flink.connector.elasticsearch.table.Elasticsearch7DynamicSinkITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type Mini [...]
\ No newline at end of file
+org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.connector.elasticsearch.table.Elasticsearch7DynamicSinkITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
\ No newline at end of file
diff --git a/flink-connectors/flink-connector-files/archunit-violations/f5e3e868-8d92-4258-9654-a605dc9c550f b/flink-connectors/flink-connector-files/archunit-violations/f5e3e868-8d92-4258-9654-a605dc9c550f
index 1a94180..ff88812 100644
--- a/flink-connectors/flink-connector-files/archunit-violations/f5e3e868-8d92-4258-9654-a605dc9c550f
+++ b/flink-connectors/flink-connector-files/archunit-violations/f5e3e868-8d92-4258-9654-a605dc9c550f
@@ -1,3 +1,18 @@
-org.apache.flink.connector.file.sink.BatchExecutionFileSinkITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithCl [...]
-org.apache.flink.connector.file.sink.StreamingExecutionFileSinkITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWi [...]
-org.apache.flink.connector.file.sink.writer.FileSinkMigrationITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWith [...]
\ No newline at end of file
+org.apache.flink.connector.file.sink.BatchExecutionFileSinkITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.connector.file.sink.StreamingExecutionFileSinkITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.connector.file.sink.writer.FileSinkMigrationITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
\ No newline at end of file
diff --git a/flink-connectors/flink-connector-hive/archunit-violations/26d337fc-45c4-4d03-a84a-6692c37fafbc b/flink-connectors/flink-connector-hive/archunit-violations/26d337fc-45c4-4d03-a84a-6692c37fafbc
index 115f4c9..0113d84 100644
--- a/flink-connectors/flink-connector-hive/archunit-violations/26d337fc-45c4-4d03-a84a-6692c37fafbc
+++ b/flink-connectors/flink-connector-hive/archunit-violations/26d337fc-45c4-4d03-a84a-6692c37fafbc
@@ -1,11 +1,66 @@
-org.apache.flink.connectors.hive.HiveDialectITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource an [...]
-org.apache.flink.connectors.hive.HiveDialectQueryITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResour [...]
-org.apache.flink.connectors.hive.HiveLookupJoinITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource [...]
-org.apache.flink.connectors.hive.HiveRunnerITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and [...]
-org.apache.flink.connectors.hive.HiveSourceITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and [...]
-org.apache.flink.connectors.hive.HiveTableSinkITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource  [...]
-org.apache.flink.connectors.hive.HiveTableSourceITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResourc [...]
-org.apache.flink.connectors.hive.HiveTemporalJoinITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResour [...]
-org.apache.flink.connectors.hive.TableEnvHiveConnectorITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientR [...]
-org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReaderITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClus [...]
-org.apache.flink.table.catalog.hive.HiveCatalogITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource [...]
\ No newline at end of file
+org.apache.flink.connectors.hive.HiveDialectITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.connectors.hive.HiveDialectQueryITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.connectors.hive.HiveLookupJoinITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.connectors.hive.HiveRunnerITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.connectors.hive.HiveSourceITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.connectors.hive.HiveTableSinkITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.connectors.hive.HiveTableSourceITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.connectors.hive.HiveTemporalJoinITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.connectors.hive.TableEnvHiveConnectorITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReaderITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.table.catalog.hive.HiveCatalogITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
\ No newline at end of file
diff --git a/flink-connectors/flink-connector-jdbc/archunit-violations/6b9ab1b0-c14d-4667-bab5-407b81fba98b b/flink-connectors/flink-connector-jdbc/archunit-violations/6b9ab1b0-c14d-4667-bab5-407b81fba98b
index c60803a..ecb32df 100644
--- a/flink-connectors/flink-connector-jdbc/archunit-violations/6b9ab1b0-c14d-4667-bab5-407b81fba98b
+++ b/flink-connectors/flink-connector-jdbc/archunit-violations/6b9ab1b0-c14d-4667-bab5-407b81fba98b
@@ -1,3 +1,18 @@
-org.apache.flink.connector.jdbc.JdbcITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public [...]
-org.apache.flink.connector.jdbc.catalog.MySqlCatalogITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientRes [...]
-org.apache.flink.connector.jdbc.catalog.PostgresCatalogITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClient [...]
\ No newline at end of file
+org.apache.flink.connector.jdbc.JdbcITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.connector.jdbc.catalog.MySqlCatalogITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.connector.jdbc.catalog.PostgresCatalogITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
\ No newline at end of file
diff --git a/flink-connectors/flink-connector-kafka/archunit-violations/97dda445-f6bc-43e2-8106-5876ca0cd052 b/flink-connectors/flink-connector-kafka/archunit-violations/97dda445-f6bc-43e2-8106-5876ca0cd052
index 52e14f9..c1e6561 100644
--- a/flink-connectors/flink-connector-kafka/archunit-violations/97dda445-f6bc-43e2-8106-5876ca0cd052
+++ b/flink-connectors/flink-connector-kafka/archunit-violations/97dda445-f6bc-43e2-8106-5876ca0cd052
@@ -1,13 +1,78 @@
-org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducerITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterW [...]
-org.apache.flink.connector.kafka.sink.KafkaSinkITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource [...]
-org.apache.flink.connector.kafka.sink.KafkaTransactionLogITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClie [...]
-org.apache.flink.connector.kafka.sink.KafkaWriterITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResour [...]
-org.apache.flink.connector.kafka.source.KafkaSourceITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientReso [...]
-org.apache.flink.connector.kafka.source.KafkaSourceLegacyITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClie [...]
-org.apache.flink.streaming.connectors.kafka.FlinkKafkaInternalProducerITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniCl [...]
-org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWit [...]
-org.apache.flink.streaming.connectors.kafka.KafkaITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResour [...]
-org.apache.flink.streaming.connectors.kafka.KafkaProducerAtLeastOnceITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClus [...]
-org.apache.flink.streaming.connectors.kafka.KafkaProducerExactlyOnceITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClus [...]
-org.apache.flink.streaming.connectors.kafka.shuffle.KafkaShuffleExactlyOnceITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type M [...]
-org.apache.flink.streaming.connectors.kafka.shuffle.KafkaShuffleITCase does not satisfy: contain any fields that is is assignable to MiniClusterExtension and public and static and final and contain any fields that are public, static, and of type AllCallbackWrapper and final and annotated with @RegisterExtension or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterW [...]
\ No newline at end of file
+org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducerITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.connector.kafka.sink.KafkaSinkITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.connector.kafka.sink.KafkaTransactionLogITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.connector.kafka.sink.KafkaWriterITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.connector.kafka.source.KafkaSourceITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.connector.kafka.source.KafkaSourceLegacyITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.streaming.connectors.kafka.FlinkKafkaInternalProducerITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.streaming.connectors.kafka.KafkaITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.streaming.connectors.kafka.KafkaProducerAtLeastOnceITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.streaming.connectors.kafka.KafkaProducerExactlyOnceITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.streaming.connectors.kafka.shuffle.KafkaShuffleExactlyOnceITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.streaming.connectors.kafka.shuffle.KafkaShuffleITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
\ No newline at end of file
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
index ef67997..94e23d7 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
@@ -22,11 +22,10 @@ import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestSuiteBase;
 import org.apache.flink.connector.pulsar.testutils.function.ControlSource;
-import org.apache.flink.core.testutils.AllCallbackWrapper;
 import org.apache.flink.runtime.minicluster.RpcServiceSharing;
-import org.apache.flink.runtime.testutils.MiniClusterExtension;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.flink.testutils.junit.SharedObjectsExtension;
 
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -46,7 +45,8 @@ class PulsarSinkITCase extends PulsarTestSuiteBase {
 
     private static final int PARALLELISM = 1;
 
-    public static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+    @RegisterExtension
+    private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
             new MiniClusterExtension(
                     new MiniClusterResourceConfiguration.Builder()
                             .setNumberTaskManagers(1)
@@ -55,11 +55,6 @@ class PulsarSinkITCase extends PulsarTestSuiteBase {
                             .withHaLeadershipControl()
                             .build());
 
-    @SuppressWarnings("unused")
-    @RegisterExtension
-    public static final AllCallbackWrapper<MiniClusterExtension> CALLBACK_WRAPPER =
-            new AllCallbackWrapper<>(MINI_CLUSTER_RESOURCE);
-
     // Using this extension for creating shared reference which would be used in source function.
     @RegisterExtension final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();
 
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/HAJobRunOnMinioS3StoreITCase.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/HAJobRunOnMinioS3StoreITCase.java
index ebe71da..bd34590 100644
--- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/HAJobRunOnMinioS3StoreITCase.java
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/HAJobRunOnMinioS3StoreITCase.java
@@ -22,16 +22,14 @@ import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.testutils.AllCallbackWrapper;
-import org.apache.flink.core.testutils.EachCallbackWrapper;
 import org.apache.flink.core.testutils.TestContainerExtension;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.highavailability.AbstractHAJobRunITCase;
 import org.apache.flink.runtime.highavailability.FileSystemJobResultStore;
 import org.apache.flink.runtime.highavailability.JobResultStoreOptions;
-import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.runtime.testutils.MiniClusterExtension;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.junit5.MiniClusterExtension;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
 
@@ -65,21 +63,15 @@ public abstract class HAJobRunOnMinioS3StoreITCase extends AbstractHAJobRunITCas
 
     @RegisterExtension
     @Order(3)
-    private static final EachCallbackWrapper<MiniClusterExtension> miniClusterExtension =
-            new EachCallbackWrapper<>(
-                    new MiniClusterExtension(
-                            () -> {
-                                final Configuration configuration = createConfiguration();
-                                FileSystem.initialize(configuration, null);
-                                return new MiniClusterResourceConfiguration.Builder()
-                                        .setConfiguration(configuration)
-                                        .build();
-                            }));
-
-    @Override
-    public MiniCluster getMiniCluster() {
-        return miniClusterExtension.getCustomExtension().getMiniCluster();
-    }
+    private static final MiniClusterExtension miniClusterExtension =
+            new MiniClusterExtension(
+                    () -> {
+                        final Configuration configuration = createConfiguration();
+                        FileSystem.initialize(configuration, null);
+                        return new MiniClusterResourceConfiguration.Builder()
+                                .setConfiguration(configuration)
+                                .build();
+                    });
 
     private static MinioTestContainer getMinioContainer() {
         return MINIO_EXTENSION.getCustomExtension().getTestContainer();
diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/DataStreamCsvITCase.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/DataStreamCsvITCase.java
index a9a64e7..97fd784 100644
--- a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/DataStreamCsvITCase.java
+++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/DataStreamCsvITCase.java
@@ -26,16 +26,15 @@ import org.apache.flink.connector.file.sink.FileSink;
 import org.apache.flink.connector.file.src.FileSource;
 import org.apache.flink.connector.file.src.reader.StreamFormat;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.testutils.AllCallbackWrapper;
 import org.apache.flink.formats.common.Converter;
 import org.apache.flink.runtime.minicluster.RpcServiceSharing;
-import org.apache.flink.runtime.testutils.MiniClusterExtension;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamUtils;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
 import org.apache.flink.streaming.api.operators.collect.ClientAndIterator;
+import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.flink.util.TestLoggerExtension;
 import org.apache.flink.util.function.FunctionWithException;
 
@@ -79,6 +78,7 @@ public class DataStreamCsvITCase {
 
     @TempDir File outDir;
 
+    @RegisterExtension
     private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
             new MiniClusterExtension(
                     new MiniClusterResourceConfiguration.Builder()
@@ -88,10 +88,6 @@ public class DataStreamCsvITCase {
                             .withHaLeadershipControl()
                             .build());
 
-    @RegisterExtension
-    private static AllCallbackWrapper<MiniClusterExtension> allCallbackWrapper =
-            new AllCallbackWrapper<>(MINI_CLUSTER_RESOURCE);
-
     // ------------------------------------------------------------------------
     //  test data
     // ------------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
index 54189bd..9da584d 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
-import org.apache.flink.core.testutils.AllCallbackWrapper;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.metrics.jmx.JMXReporter;
 import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
@@ -37,7 +36,8 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.MiniClusterWithClientExtension;
+import org.apache.flink.test.junit5.InjectClusterClient;
+import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.flink.testutils.TestingUtils;
 import org.apache.flink.util.concurrent.FutureUtils;
 
@@ -57,18 +57,15 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Tests to verify JMX reporter functionality on the JobManager. */
 class JMXJobManagerMetricTest {
 
-    private static final MiniClusterWithClientExtension MINI_CLUSTER_RESOURCE =
-            new MiniClusterWithClientExtension(
+    @RegisterExtension
+    private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+            new MiniClusterExtension(
                     new MiniClusterResourceConfiguration.Builder()
                             .setConfiguration(getConfiguration())
                             .setNumberSlotsPerTaskManager(1)
                             .setNumberTaskManagers(1)
                             .build());
 
-    @RegisterExtension
-    private static final AllCallbackWrapper<MiniClusterWithClientExtension> ALL_WRAPPER =
-            new AllCallbackWrapper<>(MINI_CLUSTER_RESOURCE);
-
     private static Configuration getConfiguration() {
         Configuration flinkConfiguration = new Configuration();
 
@@ -84,7 +81,8 @@ class JMXJobManagerMetricTest {
 
     /** Tests that metrics registered on the JobManager are actually accessible via JMX. */
     @Test
-    void testJobManagerJMXMetricAccess() throws Exception {
+    void testJobManagerJMXMetricAccess(@InjectClusterClient ClusterClient<?> client)
+            throws Exception {
         Deadline deadline = Deadline.now().plus(Duration.ofMinutes(2));
 
         try {
@@ -113,7 +111,6 @@ class JMXJobManagerMetricTest {
                             .setJobCheckpointingSettings(jobCheckpointingSettings)
                             .build();
 
-            ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
             client.submitJob(jobGraph).get();
 
             FutureUtils.retrySuccessfulWithDelay(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHAJobRunITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHAJobRunITCase.java
index 9398cda..80f9407 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHAJobRunITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHAJobRunITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.zookeeper.ZooKeeperExtension;
+import org.apache.flink.test.junit5.InjectMiniCluster;
 import org.apache.flink.testutils.TestingUtils;
 import org.apache.flink.util.TestLoggerExtension;
 import org.apache.flink.util.concurrent.FutureUtils;
@@ -46,6 +47,9 @@ import static org.assertj.core.api.Assertions.assertThat;
 /**
  * {@code AbstractHAJobRunITCase} runs a job storing in HA mode and provides {@code abstract}
  * methods for initializing a specific {@link FileSystem}.
+ *
+ * <p>Sub-classes must use a {@link
+ * org.apache.flink.runtime.testutils.InternalMiniClusterExtension}.
  */
 @ExtendWith(TestLoggerExtension.class)
 public abstract class AbstractHAJobRunITCase {
@@ -72,12 +76,9 @@ public abstract class AbstractHAJobRunITCase {
 
     protected void runAfterJobTermination() throws Exception {}
 
-    protected abstract MiniCluster getMiniCluster();
-
     @Test
-    public void testJobExecutionInHaMode() throws Exception {
-        final MiniCluster flinkCluster = getMiniCluster();
-
+    public void testJobExecutionInHaMode(@InjectMiniCluster MiniCluster flinkCluster)
+            throws Exception {
         final JobGraph jobGraph = JobGraphTestUtils.singleNoOpJobGraph();
 
         // providing a timeout helps making the test fail in case some issue occurred while
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
index 09d60e6..bb5c6bf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.core.testutils.AllCallbackWrapper;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
@@ -40,8 +39,9 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.minicluster.MiniCluster;
-import org.apache.flink.runtime.testutils.MiniClusterExtension;
+import org.apache.flink.runtime.testutils.InternalMiniClusterExtension;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.junit5.InjectMiniCluster;
 import org.apache.flink.testutils.TestingUtils;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.TestLoggerExtension;
@@ -71,16 +71,13 @@ public class TaskCancelAsyncProducerConsumerITCase {
     private static volatile Thread ASYNC_PRODUCER_THREAD;
     private static volatile Thread ASYNC_CONSUMER_THREAD;
 
-    public static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
-            new MiniClusterExtension(
+    @RegisterExtension
+    private static final InternalMiniClusterExtension MINI_CLUSTER_RESOURCE =
+            new InternalMiniClusterExtension(
                     new MiniClusterResourceConfiguration.Builder()
                             .setConfiguration(getFlinkConfiguration())
                             .build());
 
-    @RegisterExtension
-    public static AllCallbackWrapper allCallbackWrapper =
-            new AllCallbackWrapper(MINI_CLUSTER_RESOURCE);
-
     private static Configuration getFlinkConfiguration() {
         Configuration config = new Configuration();
         config.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, MemorySize.parse("4096"));
@@ -97,7 +94,8 @@ public class TaskCancelAsyncProducerConsumerITCase {
      * the main task Thread.
      */
     @Test
-    public void testCancelAsyncProducerAndConsumer() throws Exception {
+    public void testCancelAsyncProducerAndConsumer(@InjectMiniCluster MiniCluster flink)
+            throws Exception {
         Deadline deadline = Deadline.now().plus(Duration.ofMinutes(2));
 
         // Job with async producer and consumer
@@ -117,8 +115,6 @@ public class TaskCancelAsyncProducerConsumerITCase {
 
         JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(producer, consumer);
 
-        final MiniCluster flink = MINI_CLUSTER_RESOURCE.getMiniCluster();
-
         // Submit job and wait until running
         flink.runDetached(jobGraph);
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InternalMiniClusterExtension.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InternalMiniClusterExtension.java
new file mode 100644
index 0000000..5a89834
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InternalMiniClusterExtension.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.test.junit5.InjectClusterClientConfiguration;
+import org.apache.flink.test.junit5.InjectClusterRESTAddress;
+import org.apache.flink.test.junit5.InjectMiniCluster;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.ParameterContext;
+import org.junit.jupiter.api.extension.ParameterResolutionException;
+import org.junit.jupiter.api.extension.ParameterResolver;
+
+import java.net.URI;
+
+/**
+ * An extension which starts a {@link MiniCluster} for testing purposes.
+ *
+ * <p>This should only be used by tests within the flink-runtime module. Other modules should use
+ * {@code MiniClusterExtension} provided by flink-test-utils module.
+ */
+@Internal
+public class InternalMiniClusterExtension
+        implements BeforeAllCallback, AfterAllCallback, ParameterResolver {
+
+    private final MiniClusterResource miniClusterResource;
+
+    public InternalMiniClusterExtension(
+            final MiniClusterResourceConfiguration miniClusterResourceConfiguration) {
+        this.miniClusterResource = new MiniClusterResource(miniClusterResourceConfiguration);
+    }
+
+    public int getNumberSlots() {
+        return miniClusterResource.getNumberSlots();
+    }
+
+    public MiniCluster getMiniCluster() {
+        return miniClusterResource.getMiniCluster();
+    }
+
+    public UnmodifiableConfiguration getClientConfiguration() {
+        return miniClusterResource.getClientConfiguration();
+    }
+
+    public URI getRestAddres() {
+        return miniClusterResource.getRestAddres();
+    }
+
+    @Override
+    public void beforeAll(ExtensionContext context) throws Exception {
+        miniClusterResource.before();
+    }
+
+    @Override
+    public void afterAll(ExtensionContext context) throws Exception {
+        miniClusterResource.after();
+    }
+
+    @Override
+    public boolean supportsParameter(
+            ParameterContext parameterContext, ExtensionContext extensionContext)
+            throws ParameterResolutionException {
+        Class<?> parameterType = parameterContext.getParameter().getType();
+        if (parameterContext.isAnnotated(InjectMiniCluster.class)
+                && parameterType.isAssignableFrom(MiniCluster.class)) {
+            return true;
+        }
+        if (parameterContext.isAnnotated(InjectClusterClientConfiguration.class)
+                && parameterType.isAssignableFrom(UnmodifiableConfiguration.class)) {
+            return true;
+        }
+        return parameterContext.isAnnotated(InjectClusterRESTAddress.class)
+                && parameterType.isAssignableFrom(URI.class);
+    }
+
+    @Override
+    public Object resolveParameter(
+            ParameterContext parameterContext, ExtensionContext extensionContext)
+            throws ParameterResolutionException {
+        if (parameterContext.isAnnotated(InjectMiniCluster.class)) {
+            return miniClusterResource.getMiniCluster();
+        }
+        if (parameterContext.isAnnotated(InjectClusterClientConfiguration.class)) {
+            return miniClusterResource.getClientConfiguration();
+        }
+        if (parameterContext.isAnnotated(InjectClusterRESTAddress.class)) {
+            return miniClusterResource.getRestAddres();
+        }
+        throw new ParameterResolutionException("Unsupported parameter");
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterExtension.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterExtension.java
deleted file mode 100644
index 0e1115c..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterExtension.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testutils;
-
-import org.apache.flink.annotation.Experimental;
-import org.apache.flink.configuration.UnmodifiableConfiguration;
-import org.apache.flink.core.testutils.CustomExtension;
-import org.apache.flink.runtime.minicluster.MiniCluster;
-
-import org.junit.jupiter.api.extension.ExtensionContext;
-
-import javax.annotation.Nullable;
-
-import java.net.URI;
-import java.util.function.Supplier;
-
-/** An extension which starts a {@link MiniCluster} for testing purposes. */
-public class MiniClusterExtension implements CustomExtension {
-
-    private final Supplier<MiniClusterResourceConfiguration>
-            miniClusterResourceConfigurationSupplier;
-    @Nullable private MiniClusterResource miniClusterResource;
-
-    public MiniClusterExtension(
-            final MiniClusterResourceConfiguration miniClusterResourceConfiguration) {
-        this(() -> miniClusterResourceConfiguration);
-    }
-
-    @Experimental
-    public MiniClusterExtension(
-            final Supplier<MiniClusterResourceConfiguration>
-                    miniClusterResourceConfigurationSupplier) {
-        this.miniClusterResourceConfigurationSupplier = miniClusterResourceConfigurationSupplier;
-    }
-
-    public int getNumberSlots() {
-        return miniClusterResource.getNumberSlots();
-    }
-
-    public MiniCluster getMiniCluster() {
-        return miniClusterResource.getMiniCluster();
-    }
-
-    public UnmodifiableConfiguration getClientConfiguration() {
-        return miniClusterResource.getClientConfiguration();
-    }
-
-    public URI getRestAddres() {
-        return miniClusterResource.getRestAddres();
-    }
-
-    @Override
-    public void before(ExtensionContext context) throws Exception {
-        this.miniClusterResource =
-                new MiniClusterResource(miniClusterResourceConfigurationSupplier.get());
-        miniClusterResource.before();
-    }
-
-    @Override
-    public void after(ExtensionContext context) throws Exception {
-        miniClusterResource.after();
-    }
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/test/junit5/InjectClusterClientConfiguration.java b/flink-runtime/src/test/java/org/apache/flink/test/junit5/InjectClusterClientConfiguration.java
new file mode 100644
index 0000000..eb788be
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/test/junit5/InjectClusterClientConfiguration.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.junit5;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotate a test method parameter with this annotation to inject the {@link
+ * UnmodifiableConfiguration} for building a cluster client.
+ *
+ * @see org.apache.flink.test.junit5.MiniClusterExtension
+ */
+@Target(ElementType.PARAMETER)
+@Retention(RetentionPolicy.RUNTIME)
+@Experimental
+public @interface InjectClusterClientConfiguration {}
diff --git a/flink-runtime/src/test/java/org/apache/flink/test/junit5/InjectClusterRESTAddress.java b/flink-runtime/src/test/java/org/apache/flink/test/junit5/InjectClusterRESTAddress.java
new file mode 100644
index 0000000..3afafcb
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/test/junit5/InjectClusterRESTAddress.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.junit5;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import java.net.URI;
+
+/**
+ * Annotate a test method parameter with this annotation to inject the {@link URI} REST address of
+ * the cluster.
+ *
+ * @see org.apache.flink.test.junit5.MiniClusterExtension
+ */
+@Target(ElementType.PARAMETER)
+@Retention(RetentionPolicy.RUNTIME)
+@Experimental
+public @interface InjectClusterRESTAddress {}
diff --git a/flink-runtime/src/test/java/org/apache/flink/test/junit5/InjectMiniCluster.java b/flink-runtime/src/test/java/org/apache/flink/test/junit5/InjectMiniCluster.java
new file mode 100644
index 0000000..e0f3a1a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/test/junit5/InjectMiniCluster.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.junit5;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotate a test method parameter with this annotation to inject the {@link MiniCluster} instance.
+ *
+ * @see org.apache.flink.test.junit5.MiniClusterExtension
+ */
+@Target(ElementType.PARAMETER)
+@Retention(RetentionPolicy.RUNTIME)
+@Experimental
+public @interface InjectMiniCluster {}
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/junit5/InjectClusterClient.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/junit5/InjectClusterClient.java
new file mode 100644
index 0000000..abc0adb
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/junit5/InjectClusterClient.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.junit5;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.rest.RestClusterClient;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotate a test method parameter with this annotation to inject the {@link ClusterClient} or the
+ * {@link RestClusterClient} instance.
+ *
+ * @see MiniClusterExtension
+ */
+@Target(ElementType.PARAMETER)
+@Retention(RetentionPolicy.RUNTIME)
+@Experimental
+public @interface InjectClusterClient {}
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/junit5/MiniClusterExtension.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/junit5/MiniClusterExtension.java
new file mode 100644
index 0000000..4f880be
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/junit5/MiniClusterExtension.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.junit5;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.testutils.InternalMiniClusterExtension;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.test.util.TestEnvironment;
+
+import org.junit.jupiter.api.TestFactory;
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.ParameterContext;
+import org.junit.jupiter.api.extension.ParameterResolutionException;
+import org.junit.jupiter.api.extension.ParameterResolver;
+import org.junit.jupiter.params.ParameterizedTest;
+
+import java.util.function.Supplier;
+
+/**
+ * Starts a Flink {@link MiniCluster} and registers the respective {@link ExecutionEnvironment} and
+ * {@link StreamExecutionEnvironment} in the correct thread local environment.
+ *
+ * <p>Example usage:
+ *
+ * <pre>{@code
+ * @ExtendWith(MiniClusterExtension.class)
+ * class MyTest {
+ *      @Test
+ *      public void myTest() {
+ *          ExecutionEnvironment execEnv = ExecutionEnvironment.getExecutionEnvironment();
+ *      }
+ * }
+ * }</pre>
+ *
+ * <p>Or to tune the {@link MiniCluster} parameters:
+ *
+ * <pre>{@code
+ * class MyTest {
+ *
+ *     @RegisterExtension
+ *     public static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+ *             new MiniClusterExtension(
+ *                     new MiniClusterResourceConfiguration.Builder()
+ *                             .setNumberTaskManagers(1)
+ *                             .setConfiguration(new Configuration())
+ *                             .build());
+ *
+ *     @Test
+ *     public void myTest() {
+ *          ExecutionEnvironment execEnv = ExecutionEnvironment.getExecutionEnvironment();
+ *     }
+ * }
+ * }</pre>
+ *
+ * <p>You can use parameter injection with the annotations {@link InjectMiniCluster}, {@link
+ * InjectClusterClient}, {@link InjectClusterRESTAddress}, {@link InjectClusterClientConfiguration}:
+ *
+ * <pre>{@code
+ * class MyTest {
+ *
+ *     @RegisterExtension
+ *     public static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+ *             new MiniClusterExtension(
+ *                     new MiniClusterResourceConfiguration.Builder()
+ *                             .setNumberTaskManagers(1)
+ *                             .setConfiguration(new Configuration())
+ *                             .build());
+ *
+ *     @Test
+ *     public void myTest(@InjectMiniCluster MiniCluster miniCluster) {
+ *          // Use miniCluster
+ *     }
+ *
+ *     @Test
+ *     public void myTest(@InjectClusterClient ClusterClient<?> clusterClient) {
+ *          // clusterClient here is an instance of MiniClusterClient
+ *     }
+ *
+ *     @Test
+ *     public void myTest(@InjectClusterClient RestClusterClient<?> restClusterClient) {
+ *          // Using RestClusterClient as parameter type will force the creation of a RestClusterClient,
+ *          //  rather than MiniClusterClient
+ *     }
+ * }
+ * }</pre>
+ *
+ * <p>You can use it both with programmatic and declarative extension annotations. Check <a
+ * href="https://junit.org/junit5/docs/snapshot/user-guide/#extensions-registration>JUnit 5 docs</a>
+ * for more details.
+ *
+ * <p>This extension by default creates one {@link MiniCluster} per test class. If you need one
+ * instance of {@link MiniCluster} per test, we suggest you to instantiate it manually in your test
+ * code.
+ *
+ * <p>This extension works correctly with parallel tests and with parametrized tests, but it doesn't
+ * work when combining {@link TestFactory} and parallel tests, because of <a
+ * href="https://github.com/junit-team/junit5/issues/378">some limitations</a>. Use {@link
+ * ParameterizedTest} instead.
+ */
+@Experimental
+public final class MiniClusterExtension
+        implements BeforeAllCallback,
+                BeforeEachCallback,
+                AfterEachCallback,
+                AfterAllCallback,
+                ParameterResolver {
+
+    private static final ExtensionContext.Namespace NAMESPACE =
+            ExtensionContext.Namespace.create(MiniClusterExtension.class);
+
+    private static final String CLUSTER_REST_CLIENT = "clusterRestClient";
+    private static final String MINI_CLUSTER_CLIENT = "miniClusterClient";
+
+    private final Supplier<MiniClusterResourceConfiguration>
+            miniClusterResourceConfigurationSupplier;
+
+    private InternalMiniClusterExtension internalMiniClusterExtension;
+
+    public MiniClusterExtension() {
+        this(
+                new MiniClusterResourceConfiguration.Builder()
+                        .setNumberTaskManagers(1)
+                        .setNumberSlotsPerTaskManager(1)
+                        .build());
+    }
+
+    public MiniClusterExtension(
+            final MiniClusterResourceConfiguration miniClusterResourceConfiguration) {
+        this(() -> miniClusterResourceConfiguration);
+    }
+
+    @Experimental
+    public MiniClusterExtension(
+            Supplier<MiniClusterResourceConfiguration> miniClusterResourceConfigurationSupplier) {
+        this.miniClusterResourceConfigurationSupplier = miniClusterResourceConfigurationSupplier;
+    }
+
+    // Accessors
+
+    @Override
+    public boolean supportsParameter(
+            ParameterContext parameterContext, ExtensionContext extensionContext)
+            throws ParameterResolutionException {
+        Class<?> parameterType = parameterContext.getParameter().getType();
+        if (parameterContext.isAnnotated(InjectClusterClient.class)
+                && parameterType.isAssignableFrom(ClusterClient.class)) {
+            return true;
+        }
+        return internalMiniClusterExtension.supportsParameter(parameterContext, extensionContext);
+    }
+
+    @Override
+    public Object resolveParameter(
+            ParameterContext parameterContext, ExtensionContext extensionContext)
+            throws ParameterResolutionException {
+        Class<?> parameterType = parameterContext.getParameter().getType();
+        if (parameterContext.isAnnotated(InjectClusterClient.class)) {
+            if (parameterType.equals(RestClusterClient.class)) {
+                return extensionContext
+                        .getStore(NAMESPACE)
+                        .getOrComputeIfAbsent(
+                                CLUSTER_REST_CLIENT,
+                                k -> {
+                                    try {
+                                        return new CloseableParameter<>(
+                                                createRestClusterClient(
+                                                        internalMiniClusterExtension));
+                                    } catch (Exception e) {
+                                        throw new ParameterResolutionException(
+                                                "Cannot create rest cluster client", e);
+                                    }
+                                },
+                                CloseableParameter.class)
+                        .get();
+            }
+            // Default to MiniClusterClient
+            return extensionContext
+                    .getStore(NAMESPACE)
+                    .getOrComputeIfAbsent(
+                            MINI_CLUSTER_CLIENT,
+                            k -> {
+                                try {
+                                    return new CloseableParameter<>(
+                                            createMiniClusterClient(internalMiniClusterExtension));
+                                } catch (Exception e) {
+                                    throw new ParameterResolutionException(
+                                            "Cannot create mini cluster client", e);
+                                }
+                            },
+                            CloseableParameter.class)
+                    .get();
+        }
+        return internalMiniClusterExtension.resolveParameter(parameterContext, extensionContext);
+    }
+
+    // Lifecycle implementation
+
+    @Override
+    public void beforeAll(ExtensionContext context) throws Exception {
+        internalMiniClusterExtension =
+                new InternalMiniClusterExtension(miniClusterResourceConfigurationSupplier.get());
+        internalMiniClusterExtension.beforeAll(context);
+    }
+
+    @Override
+    public void beforeEach(ExtensionContext context) throws Exception {
+        registerEnv(internalMiniClusterExtension);
+    }
+
+    @Override
+    public void afterEach(ExtensionContext context) throws Exception {
+        unregisterEnv(internalMiniClusterExtension);
+    }
+
+    @Override
+    public void afterAll(ExtensionContext context) throws Exception {
+        internalMiniClusterExtension.afterAll(context);
+    }
+
+    // Implementation
+
+    private void registerEnv(InternalMiniClusterExtension internalMiniClusterExtension) {
+        TestEnvironment executionEnvironment =
+                new TestEnvironment(
+                        internalMiniClusterExtension.getMiniCluster(),
+                        internalMiniClusterExtension.getNumberSlots(),
+                        false);
+        executionEnvironment.setAsContext();
+        TestStreamEnvironment.setAsContext(
+                internalMiniClusterExtension.getMiniCluster(),
+                internalMiniClusterExtension.getNumberSlots());
+    }
+
+    private void unregisterEnv(InternalMiniClusterExtension internalMiniClusterExtension) {
+        TestStreamEnvironment.unsetAsContext();
+        TestEnvironment.unsetAsContext();
+    }
+
+    private MiniClusterClient createMiniClusterClient(
+            InternalMiniClusterExtension internalMiniClusterExtension) {
+        return new MiniClusterClient(
+                internalMiniClusterExtension.getClientConfiguration(),
+                internalMiniClusterExtension.getMiniCluster());
+    }
+
+    private RestClusterClient<MiniClusterClient.MiniClusterId> createRestClusterClient(
+            InternalMiniClusterExtension internalMiniClusterExtension) throws Exception {
+        return new RestClusterClient<>(
+                internalMiniClusterExtension.getClientConfiguration(),
+                MiniClusterClient.MiniClusterId.INSTANCE);
+    }
+
+    private static class CloseableParameter<T extends AutoCloseable>
+            implements ExtensionContext.Store.CloseableResource {
+        private final T autoCloseable;
+
+        CloseableParameter(T autoCloseable) {
+            this.autoCloseable = autoCloseable;
+        }
+
+        public T get() {
+            return autoCloseable;
+        }
+
+        @Override
+        public void close() throws Throwable {
+            this.autoCloseable.close();
+        }
+    }
+}
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterWithClientExtension.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterWithClientExtension.java
deleted file mode 100644
index 28650f7..0000000
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterWithClientExtension.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.util;
-
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.MiniClusterClient;
-import org.apache.flink.client.program.rest.RestClusterClient;
-import org.apache.flink.runtime.testutils.MiniClusterExtension;
-import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.util.ExceptionUtils;
-
-import org.junit.jupiter.api.extension.ExtensionContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Starts a Flink mini cluster as a resource and registers the respective ExecutionEnvironment and
- * StreamExecutionEnvironment.
- */
-public class MiniClusterWithClientExtension extends MiniClusterExtension {
-    private static final Logger LOG = LoggerFactory.getLogger(MiniClusterWithClientExtension.class);
-
-    private ClusterClient<?> clusterClient;
-    private RestClusterClient<MiniClusterClient.MiniClusterId> restClusterClient;
-
-    private TestEnvironment executionEnvironment;
-
-    public MiniClusterWithClientExtension(
-            final MiniClusterResourceConfiguration miniClusterResourceConfiguration) {
-        super(miniClusterResourceConfiguration);
-    }
-
-    public ClusterClient<?> getClusterClient() {
-        return clusterClient;
-    }
-
-    /**
-     * Returns a {@link RestClusterClient} that can be used to communicate with this mini cluster.
-     * Only use this if the client returned via {@link #getClusterClient()} does not fulfill your
-     * needs.
-     */
-    public RestClusterClient<?> getRestClusterClient() throws Exception {
-        return restClusterClient;
-    }
-
-    public TestEnvironment getTestEnvironment() {
-        return executionEnvironment;
-    }
-
-    @Override
-    public void before(ExtensionContext context) throws Exception {
-        super.before(context);
-
-        clusterClient = createMiniClusterClient();
-        restClusterClient = createRestClusterClient();
-
-        executionEnvironment = new TestEnvironment(getMiniCluster(), getNumberSlots(), false);
-        executionEnvironment.setAsContext();
-        TestStreamEnvironment.setAsContext(getMiniCluster(), getNumberSlots());
-    }
-
-    @Override
-    public void after(ExtensionContext context) throws Exception {
-        LOG.info("Finalization triggered: Cluster shutdown is going to be initiated.");
-        TestStreamEnvironment.unsetAsContext();
-        TestEnvironment.unsetAsContext();
-
-        Exception exception = null;
-
-        if (clusterClient != null) {
-            try {
-                clusterClient.close();
-            } catch (Exception e) {
-                exception = e;
-            }
-        }
-
-        clusterClient = null;
-
-        if (restClusterClient != null) {
-            try {
-                restClusterClient.close();
-            } catch (Exception e) {
-                exception = ExceptionUtils.firstOrSuppressed(e, exception);
-            }
-        }
-
-        restClusterClient = null;
-
-        super.after(context);
-
-        if (exception != null) {
-            LOG.warn("Could not properly shut down the MiniClusterWithClientResource.", exception);
-        }
-    }
-
-    private MiniClusterClient createMiniClusterClient() {
-        return new MiniClusterClient(getClientConfiguration(), getMiniCluster());
-    }
-
-    private RestClusterClient<MiniClusterClient.MiniClusterId> createRestClusterClient()
-            throws Exception {
-        return new RestClusterClient<>(
-                getClientConfiguration(), MiniClusterClient.MiniClusterId.INSTANCE);
-    }
-}

[flink] 02/03: [hotfix][tests] Restructure AbstractHaJobRunITCase

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 576354cf44a41a1f8d7c0d12801c91badc1e9fcf
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Feb 24 14:23:38 2022 +0100

    [hotfix][tests] Restructure AbstractHaJobRunITCase
    
    Refactors the test to make use of a static MiniClusterExtension, in preperation FLINK-26252.
---
 flink-filesystems/flink-s3-fs-base/pom.xml         |  6 ++++
 .../fs/s3/common/HAJobRunOnMinioS3StoreITCase.java | 41 +++++++++++++++++-----
 flink-filesystems/flink-s3-fs-hadoop/pom.xml       |  6 ++++
 flink-filesystems/flink-s3-fs-presto/pom.xml       |  6 ++++
 .../highavailability/AbstractHAJobRunITCase.java   | 40 +++++----------------
 5 files changed, 59 insertions(+), 40 deletions(-)

diff --git a/flink-filesystems/flink-s3-fs-base/pom.xml b/flink-filesystems/flink-s3-fs-base/pom.xml
index 0b4a337..9ccf84e 100644
--- a/flink-filesystems/flink-s3-fs-base/pom.xml
+++ b/flink-filesystems/flink-s3-fs-base/pom.xml
@@ -251,6 +251,12 @@ under the License.
 				</exclusion>
 			</exclusions>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
 	</dependencies>
 
 	<build>
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/HAJobRunOnMinioS3StoreITCase.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/HAJobRunOnMinioS3StoreITCase.java
index c8f358d..ebe71da 100644
--- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/HAJobRunOnMinioS3StoreITCase.java
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/HAJobRunOnMinioS3StoreITCase.java
@@ -20,18 +20,25 @@ package org.apache.flink.fs.s3.common;
 
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.testutils.AllCallbackWrapper;
+import org.apache.flink.core.testutils.EachCallbackWrapper;
 import org.apache.flink.core.testutils.TestContainerExtension;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.highavailability.AbstractHAJobRunITCase;
 import org.apache.flink.runtime.highavailability.FileSystemJobResultStore;
 import org.apache.flink.runtime.highavailability.JobResultStoreOptions;
+import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.MiniClusterExtension;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
 
 import com.amazonaws.services.s3.model.S3ObjectSummary;
 import org.apache.commons.lang3.StringUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Order;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.time.Duration;
@@ -51,10 +58,29 @@ public abstract class HAJobRunOnMinioS3StoreITCase extends AbstractHAJobRunITCas
     private static final String JOB_RESULT_STORE_FOLDER = "jrs";
 
     @RegisterExtension
+    @Order(2)
     private static final AllCallbackWrapper<TestContainerExtension<MinioTestContainer>>
             MINIO_EXTENSION =
                     new AllCallbackWrapper<>(new TestContainerExtension<>(MinioTestContainer::new));
 
+    @RegisterExtension
+    @Order(3)
+    private static final EachCallbackWrapper<MiniClusterExtension> miniClusterExtension =
+            new EachCallbackWrapper<>(
+                    new MiniClusterExtension(
+                            () -> {
+                                final Configuration configuration = createConfiguration();
+                                FileSystem.initialize(configuration, null);
+                                return new MiniClusterResourceConfiguration.Builder()
+                                        .setConfiguration(configuration)
+                                        .build();
+                            }));
+
+    @Override
+    public MiniCluster getMiniCluster() {
+        return miniClusterExtension.getCustomExtension().getMiniCluster();
+    }
+
     private static MinioTestContainer getMinioContainer() {
         return MINIO_EXTENSION.getCustomExtension().getTestContainer();
     }
@@ -77,13 +103,7 @@ public abstract class HAJobRunOnMinioS3StoreITCase extends AbstractHAJobRunITCas
         return pathSeparator + StringUtils.join(subfolders, pathSeparator);
     }
 
-    @Override
-    protected String getHAStoragePath() {
-        return createS3URIWithSubPath(CLUSTER_ID);
-    }
-
-    @Override
-    protected Configuration createConfiguration() {
+    private static Configuration createConfiguration() {
         final Configuration config = new Configuration();
 
         getMinioContainer().setS3ConfigOptions(config);
@@ -94,7 +114,12 @@ public abstract class HAJobRunOnMinioS3StoreITCase extends AbstractHAJobRunITCas
                 JobResultStoreOptions.STORAGE_PATH,
                 createS3URIWithSubPath(CLUSTER_ID, JOB_RESULT_STORE_FOLDER));
 
-        return config;
+        return addHaConfiguration(config, createS3URIWithSubPath(CLUSTER_ID));
+    }
+
+    @AfterAll
+    public static void unsetFileSystem() {
+        FileSystem.initialize(new Configuration(), null);
     }
 
     @Override
diff --git a/flink-filesystems/flink-s3-fs-hadoop/pom.xml b/flink-filesystems/flink-s3-fs-hadoop/pom.xml
index 58956e8..6c3799d 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/pom.xml
+++ b/flink-filesystems/flink-s3-fs-hadoop/pom.xml
@@ -101,6 +101,12 @@ under the License.
 			<type>test-jar</type>
 			<scope>test</scope>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
diff --git a/flink-filesystems/flink-s3-fs-presto/pom.xml b/flink-filesystems/flink-s3-fs-presto/pom.xml
index d1f7493..9a87655 100644
--- a/flink-filesystems/flink-s3-fs-presto/pom.xml
+++ b/flink-filesystems/flink-s3-fs-presto/pom.xml
@@ -69,6 +69,12 @@ under the License.
 			<type>test-jar</type>
 			<scope>test</scope>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
 
 		<dependency>
 			<groupId>org.apache.curator</groupId>
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHAJobRunITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHAJobRunITCase.java
index f960cac..9398cda 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHAJobRunITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHAJobRunITCase.java
@@ -25,18 +25,15 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.testutils.AllCallbackWrapper;
-import org.apache.flink.core.testutils.EachCallbackWrapper;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.minicluster.MiniCluster;
-import org.apache.flink.runtime.testutils.MiniClusterExtension;
-import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.runtime.zookeeper.ZooKeeperExtension;
 import org.apache.flink.testutils.TestingUtils;
 import org.apache.flink.util.TestLoggerExtension;
 import org.apache.flink.util.concurrent.FutureUtils;
 
-import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Order;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -54,25 +51,17 @@ import static org.assertj.core.api.Assertions.assertThat;
 public abstract class AbstractHAJobRunITCase {
 
     @RegisterExtension
+    @Order(1)
     private static final AllCallbackWrapper<ZooKeeperExtension> ZOOKEEPER_EXTENSION =
             new AllCallbackWrapper<>(new ZooKeeperExtension());
 
-    @RegisterExtension
-    private final EachCallbackWrapper<MiniClusterExtension> miniClusterExtension =
-            new EachCallbackWrapper<>(
-                    new MiniClusterExtension(
-                            new MiniClusterResourceConfiguration.Builder()
-                                    .setConfiguration(getFlinkConfiguration())
-                                    .build()));
-
-    private Configuration getFlinkConfiguration() {
-        final Configuration config = createConfiguration();
-
+    protected static Configuration addHaConfiguration(
+            final Configuration config, final String haStoragePath) {
         config.set(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
         config.set(
                 HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
                 ZOOKEEPER_EXTENSION.getCustomExtension().getConnectString());
-        config.set(HighAvailabilityOptions.HA_STORAGE_PATH, getHAStoragePath());
+        config.set(HighAvailabilityOptions.HA_STORAGE_PATH, haStoragePath);
 
         // getFlinkConfiguration() is called on each new instantiation of the MiniCluster which is
         // happening before each test run
@@ -81,26 +70,13 @@ public abstract class AbstractHAJobRunITCase {
         return config;
     }
 
-    @AfterEach
-    public void unsetFileSystem() {
-        FileSystem.initialize(new Configuration(), null);
-    }
-
-    /**
-     * Should return the path to the HA storage which will be injected into the Flink configuration.
-     *
-     * @see HighAvailabilityOptions#HA_STORAGE_PATH
-     */
-    protected abstract String getHAStoragePath();
-
-    /** Initializes the {@link Configuration} used for the Flink cluster. */
-    protected abstract Configuration createConfiguration();
-
     protected void runAfterJobTermination() throws Exception {}
 
+    protected abstract MiniCluster getMiniCluster();
+
     @Test
     public void testJobExecutionInHaMode() throws Exception {
-        final MiniCluster flinkCluster = miniClusterExtension.getCustomExtension().getMiniCluster();
+        final MiniCluster flinkCluster = getMiniCluster();
 
         final JobGraph jobGraph = JobGraphTestUtils.singleNoOpJobGraph();
 

[flink] 01/03: [hotfix][tests] Lazily create MiniClusterResourceConfiguraiton

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 881d4c6cb2094d6c3b37a5cc33dfe09918d02771
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Feb 24 14:19:14 2022 +0100

    [hotfix][tests] Lazily create MiniClusterResourceConfiguraiton
    
    Certain configuration parameters (e.g., ports) are inferred from other extensions. As extensions commonly only really start up resources in before*() methods the current approach of determining the configuration up-front imposes limitations as to whether a MiniClusterExtension can be static or not.
    With this change the configuration can also be passed in with a Supplier that is evaluated right before the cluster is started.
---
 .../runtime/testutils/MiniClusterExtension.java      | 20 ++++++++++++++++++--
 1 file changed, 18 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterExtension.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterExtension.java
index a6f7a37..0e1115c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterExtension.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterExtension.java
@@ -18,21 +18,35 @@
 
 package org.apache.flink.runtime.testutils;
 
+import org.apache.flink.annotation.Experimental;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.core.testutils.CustomExtension;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 
 import org.junit.jupiter.api.extension.ExtensionContext;
 
+import javax.annotation.Nullable;
+
 import java.net.URI;
+import java.util.function.Supplier;
 
 /** An extension which starts a {@link MiniCluster} for testing purposes. */
 public class MiniClusterExtension implements CustomExtension {
-    private final MiniClusterResource miniClusterResource;
+
+    private final Supplier<MiniClusterResourceConfiguration>
+            miniClusterResourceConfigurationSupplier;
+    @Nullable private MiniClusterResource miniClusterResource;
 
     public MiniClusterExtension(
             final MiniClusterResourceConfiguration miniClusterResourceConfiguration) {
-        this.miniClusterResource = new MiniClusterResource(miniClusterResourceConfiguration);
+        this(() -> miniClusterResourceConfiguration);
+    }
+
+    @Experimental
+    public MiniClusterExtension(
+            final Supplier<MiniClusterResourceConfiguration>
+                    miniClusterResourceConfigurationSupplier) {
+        this.miniClusterResourceConfigurationSupplier = miniClusterResourceConfigurationSupplier;
     }
 
     public int getNumberSlots() {
@@ -53,6 +67,8 @@ public class MiniClusterExtension implements CustomExtension {
 
     @Override
     public void before(ExtensionContext context) throws Exception {
+        this.miniClusterResource =
+                new MiniClusterResource(miniClusterResourceConfigurationSupplier.get());
         miniClusterResource.before();
     }