You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/11/24 08:15:17 UTC

[flink] branch master updated: [FLINK-24627][tests] Port JUnit 4 rules to JUnit5 extensions

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 78b231f  [FLINK-24627][tests] Port JUnit 4 rules to JUnit5 extensions
78b231f is described below

commit 78b231f60aed59061f0f609e0cfd659d78e6fdd5
Author: Hang Ruan <ru...@hotmail.com>
AuthorDate: Mon Oct 25 11:55:15 2021 +0800

    [FLINK-24627][tests] Port JUnit 4 rules to JUnit5 extensions
---
 .../TaskCancelAsyncProducerConsumerITCase.java     |  22 ++-
 .../runtime/testutils/MiniClusterExtension.java    |  63 +++++++
 .../flink/runtime/util/BlobServerExtension.java    |  74 +++++++++
 .../runtime/zookeeper/ZooKeeperExtension.java      |  82 ++++++++++
 .../flink/core/testutils/AllCallbackWrapper.java   |  47 ++++++
 .../flink/core/testutils/CustomExtension.java      |  50 ++++++
 .../flink/core/testutils/EachCallbackWrapper.java  |  47 ++++++
 .../testutils/executor/TestExecutorExtension.java  |  54 ++++++
 .../testutils/junit/SharedObjectsExtension.java    | 181 +++++++++++++++++++++
 .../junit/extensions/retry/RetryExtension.java     | 129 +++++++++++++++
 .../retry/RetryTestExecutionExtension.java         |  88 ++++++++++
 .../retry/strategy/AbstractRetryStrategy.java      |  40 +++++
 .../retry/strategy/RetryOnExceptionStrategy.java   |  59 +++++++
 .../retry/strategy/RetryOnFailureStrategy.java     |  50 ++++++
 .../extensions/retry/strategy/RetryStrategy.java   |  37 +++++
 .../testutils/logging/LoggerAuditingExtension.java |  96 +++++++++++
 .../org/apache/flink/util/LogLevelExtension.java   | 114 +++++++++++++
 .../org/apache/flink/util/TestLoggerExtension.java |  80 +++++++++
 .../junit/RetryOnExceptionExtensionTest.java       |  83 ++++++++++
 .../junit/RetryOnFailureExtensionTest.java         |  76 +++++++++
 .../streaming/util/StreamCollectorExtension.java   | 146 +++++++++++++++++
 .../test/util/MiniClusterWithClientExtension.java  | 123 ++++++++++++++
 22 files changed, 1733 insertions(+), 8 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
index c09046b..9dcaefd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.testutils.AllCallbackWrapper;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
@@ -39,15 +40,16 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.minicluster.MiniCluster;
-import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterExtension;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.runtime.testutils.TestingUtils;
 import org.apache.flink.types.LongValue;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
 import org.apache.flink.util.concurrent.FutureUtils;
 
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.time.Duration;
 import java.util.Arrays;
@@ -58,7 +60,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
-public class TaskCancelAsyncProducerConsumerITCase extends TestLogger {
+@ExtendWith({TestLoggerExtension.class})
+public class TaskCancelAsyncProducerConsumerITCase {
 
     // The Exceptions thrown by the producer/consumer Threads
     private static volatile Exception ASYNC_PRODUCER_EXCEPTION;
@@ -68,13 +71,16 @@ public class TaskCancelAsyncProducerConsumerITCase extends TestLogger {
     private static volatile Thread ASYNC_PRODUCER_THREAD;
     private static volatile Thread ASYNC_CONSUMER_THREAD;
 
-    @ClassRule
-    public static final MiniClusterResource MINI_CLUSTER_RESOURCE =
-            new MiniClusterResource(
+    public static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+            new MiniClusterExtension(
                     new MiniClusterResourceConfiguration.Builder()
                             .setConfiguration(getFlinkConfiguration())
                             .build());
 
+    @RegisterExtension
+    public static AllCallbackWrapper allCallbackWrapper =
+            new AllCallbackWrapper(MINI_CLUSTER_RESOURCE);
+
     private static Configuration getFlinkConfiguration() {
         Configuration config = new Configuration();
         config.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, MemorySize.parse("4096"));
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterExtension.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterExtension.java
new file mode 100644
index 0000000..a6f7a37
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterExtension.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testutils;
+
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.core.testutils.CustomExtension;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+import java.net.URI;
+
+/** An extension which starts a {@link MiniCluster} for testing purposes. */
+public class MiniClusterExtension implements CustomExtension {
+    private final MiniClusterResource miniClusterResource;
+
+    public MiniClusterExtension(
+            final MiniClusterResourceConfiguration miniClusterResourceConfiguration) {
+        this.miniClusterResource = new MiniClusterResource(miniClusterResourceConfiguration);
+    }
+
+    public int getNumberSlots() {
+        return miniClusterResource.getNumberSlots();
+    }
+
+    public MiniCluster getMiniCluster() {
+        return miniClusterResource.getMiniCluster();
+    }
+
+    public UnmodifiableConfiguration getClientConfiguration() {
+        return miniClusterResource.getClientConfiguration();
+    }
+
+    public URI getRestAddres() {
+        return miniClusterResource.getRestAddres();
+    }
+
+    @Override
+    public void before(ExtensionContext context) throws Exception {
+        miniClusterResource.before();
+    }
+
+    @Override
+    public void after(ExtensionContext context) throws Exception {
+        miniClusterResource.after();
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerExtension.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerExtension.java
new file mode 100644
index 0000000..ab8d8d6
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerExtension.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CustomExtension;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * A simple {@link org.junit.jupiter.api.extension.Extension} to be used by tests that require a
+ * {@link BlobServer}.
+ */
+public class BlobServerExtension implements CustomExtension {
+    private static final Logger LOG = LoggerFactory.getLogger(BlobServerExtension.class);
+    private final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+    private BlobServer blobServer;
+
+    @Override
+    public void before(ExtensionContext context) throws Exception {
+        temporaryFolder.create();
+
+        Configuration config = new Configuration();
+        config.setString(
+                BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
+
+        blobServer = new BlobServer(config, new VoidBlobStore());
+        blobServer.start();
+    }
+
+    @Override
+    public void after(ExtensionContext context) throws Exception {
+        temporaryFolder.delete();
+
+        try {
+            blobServer.close();
+        } catch (IOException e) {
+            LOG.error("Exception while shutting down blob server.", e);
+        }
+    }
+
+    public int getBlobServerPort() {
+        return blobServer.getPort();
+    }
+
+    public BlobServer getBlobServer() {
+        return blobServer;
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperExtension.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperExtension.java
new file mode 100644
index 0000000..a8b3ee1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperExtension.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.flink.core.testutils.CustomExtension;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * {@link org.junit.jupiter.api.extension.Extension} which starts a {@link
+ * org.apache.zookeeper.server.ZooKeeperServer}.
+ */
+public class ZooKeeperExtension implements CustomExtension {
+    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperExtension.class);
+
+    @Nullable private TestingServer zooKeeperServer;
+
+    public String getConnectString() {
+        verifyIsRunning();
+        return zooKeeperServer.getConnectString();
+    }
+
+    private void verifyIsRunning() {
+        Preconditions.checkState(zooKeeperServer != null);
+    }
+
+    @Override
+    public void before(ExtensionContext context) throws Exception {
+        terminateZooKeeperServer();
+        zooKeeperServer = new TestingServer(true);
+    }
+
+    private void terminateZooKeeperServer() throws IOException {
+        if (zooKeeperServer != null) {
+            zooKeeperServer.stop();
+            zooKeeperServer = null;
+        }
+    }
+
+    @Override
+    public void after(ExtensionContext context) throws Exception {
+        try {
+            terminateZooKeeperServer();
+        } catch (IOException e) {
+            LOG.warn("Could not properly terminate the {}.", getClass().getSimpleName(), e);
+        }
+    }
+
+    public void restart() throws Exception {
+        Preconditions.checkNotNull(zooKeeperServer);
+        zooKeeperServer.restart();
+    }
+
+    public void stop() throws IOException {
+        Preconditions.checkNotNull(zooKeeperServer);
+        zooKeeperServer.stop();
+    }
+}
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/AllCallbackWrapper.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/AllCallbackWrapper.java
new file mode 100644
index 0000000..df4f466
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/AllCallbackWrapper.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.testutils;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+/** An extension wrap logic for {@link BeforeAllCallback} and {@link AfterAllCallback}. */
+public class AllCallbackWrapper<C extends CustomExtension>
+        implements BeforeAllCallback, AfterAllCallback {
+    private final C customExtension;
+
+    public AllCallbackWrapper(C customExtension) {
+        this.customExtension = customExtension;
+    }
+
+    public C getCustomExtension() {
+        return customExtension;
+    }
+
+    @Override
+    public void afterAll(ExtensionContext context) throws Exception {
+        customExtension.after(context);
+    }
+
+    @Override
+    public void beforeAll(ExtensionContext context) throws Exception {
+        customExtension.before(context);
+    }
+}
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CustomExtension.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CustomExtension.java
new file mode 100644
index 0000000..43e542e
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CustomExtension.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.testutils;
+
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+/**
+ * An extension that is invoked before/after all/each tests, depending on whether it is wrapped in a
+ * {@link EachCallbackWrapper} or {@link AllCallbackWrapper}.
+ *
+ * <p>{@code before} method will be called in {@code beforeEach} or {@code beforeAll}. {@code after}
+ * will be called in {@code afterEach} or {@code afterAll}.
+ *
+ * <p>Usage example:
+ *
+ * <pre>{@code
+ * public class Test{
+ *      CustomExtension eachCustom = new CustomExtensionImpl1();
+ *      CustomExtension allCustom = new CustomExtensionImpl2();
+ *      @RegisterExtension
+ *      static AllCallbackWrapper allWrapper = new AllCallbackWrapper(allCustom);
+ *      @RegisterExtension
+ *      EachCallbackWrapper eachWrapper = new EachCallbackWrapper(eachCustom);
+ * }
+ * }</pre>
+ *
+ * <p>A {@code CustomExtension} instance must not be wrapped in both AllCallbackWrapper and
+ * EachCallbackWrapper for the same test class.
+ */
+public interface CustomExtension {
+    default void before(ExtensionContext context) throws Exception {}
+
+    default void after(ExtensionContext context) throws Exception {}
+}
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/EachCallbackWrapper.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/EachCallbackWrapper.java
new file mode 100644
index 0000000..66b1984
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/EachCallbackWrapper.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.testutils;
+
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+/** An extension wrap logic for {@link BeforeEachCallback} and {@link AfterEachCallback}. */
+public class EachCallbackWrapper<C extends CustomExtension>
+        implements BeforeEachCallback, AfterEachCallback {
+    private final C customExtension;
+
+    public EachCallbackWrapper(C customExtension) {
+        this.customExtension = customExtension;
+    }
+
+    public C getCustomExtension() {
+        return customExtension;
+    }
+
+    @Override
+    public void afterEach(ExtensionContext context) throws Exception {
+        customExtension.after(context);
+    }
+
+    @Override
+    public void beforeEach(ExtensionContext context) throws Exception {
+        customExtension.before(context);
+    }
+}
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/executor/TestExecutorExtension.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/executor/TestExecutorExtension.java
new file mode 100644
index 0000000..269420c
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/executor/TestExecutorExtension.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.executor;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
+
+/** Extension which starts/stops an {@link ExecutorService} for testing purposes. */
+public class TestExecutorExtension<T extends ExecutorService>
+        implements BeforeAllCallback, AfterAllCallback {
+    private final Supplier<T> serviceFactory;
+
+    private T executorService;
+
+    public TestExecutorExtension(Supplier<T> serviceFactory) {
+        this.serviceFactory = serviceFactory;
+    }
+
+    @Override
+    public void beforeAll(ExtensionContext context) throws Exception {
+        executorService = serviceFactory.get();
+    }
+
+    public T getExecutor() {
+        // only return an Executor since this resource is in charge of the life cycle
+        return executorService;
+    }
+
+    @Override
+    public void afterAll(ExtensionContext context) throws Exception {
+        if (executorService != null) {
+            executorService.shutdown();
+        }
+    }
+}
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/SharedObjectsExtension.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/SharedObjectsExtension.java
new file mode 100644
index 0000000..c3fac18
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/SharedObjectsExtension.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.junit;
+
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This rule allows objects to be used both in the main test case as well as in UDFs by using
+ * serializable {@link SharedReference}s. Usage:
+ *
+ * <pre><code>
+ * {@literal    @RegisterExtension}
+ *     public final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();
+ *
+ * {@literal    @Test}
+ *     public void test() throws Exception {
+ *         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ * {@literal        SharedReference<Queue<Long>> listRef = sharedObjects.add(new ConcurrentLinkedQueue<>());}
+ *         int n = 10000;
+ *         env.setParallelism(100);
+ *         env.fromSequence(0, n).map(i -> listRef.get().add(i));
+ *         env.execute();
+ *         assertEquals(n + 1, listRef.get().size());
+ *         assertEquals(
+ *                 LongStream.rangeClosed(0, n).boxed().collect(Collectors.toList()),
+ *                 listRef.get().stream().sorted().collect(Collectors.toList()));
+ *     }
+ * </code></pre>
+ *
+ * <p>The main idea is that shared objects are bound to the scope of a test case instead of a class.
+ * That allows us to:
+ *
+ * <ul>
+ *   <li>Avoid all kinds of static fields in test classes that only exist since all fields in UDFs
+ *       need to be serializable.
+ *   <li>Hopefully make it easier to reason about the test setup
+ *   <li>Facilitate to share more test building blocks across test classes.
+ *   <li>Fully allow tests to be rerun/run in parallel without worrying about static fields
+ * </ul>
+ *
+ * <p>Note that since the shared objects are accessed through multiple threads, they need to be
+ * thread-safe or accessed in a thread-safe manner.
+ */
+@NotThreadSafe
+public class SharedObjectsExtension implements BeforeEachCallback, AfterEachCallback {
+    /** Instance-cache used to make a SharedObjects accessible for multiple threads. */
+    private static final Map<Integer, SharedObjectsExtension> INSTANCES = new ConcurrentHashMap<>();
+
+    private static final AtomicInteger LAST_ID = new AtomicInteger();
+    /**
+     * Identifier of the SharedObjects used to retrieve the original instance during
+     * deserialization.
+     */
+    private final int id;
+    /** All registered objects for the current test case. The objects are purged upon completion. */
+    private final transient Map<SharedReference<?>, Object> objects = new ConcurrentHashMap<>();
+
+    private SharedObjectsExtension(int id) {
+        this.id = id;
+    }
+
+    /**
+     * Creates a new instance. Usually that should be done inside a JUnit test class as an
+     * instance-field annotated with {@link org.junit.Rule}.
+     */
+    public static SharedObjectsExtension create() {
+        return new SharedObjectsExtension(LAST_ID.getAndIncrement());
+    }
+
+    private static SharedObjectsExtension get(int sharedObjectsId) {
+        SharedObjectsExtension sharedObjects = INSTANCES.get(sharedObjectsId);
+        if (sharedObjects == null) {
+            throw new IllegalStateException("Object was accessed after the test was completed");
+        }
+        return sharedObjects;
+    }
+
+    /**
+     * Adds a new object to this {@code SharedObjects}. Although not necessary, it is recommended to
+     * only access the object through the returned {@link SharedReference}.
+     */
+    public <T> SharedReference<T> add(T object) {
+        SharedReference<T> tag = new SharedObjectsExtension.DefaultTag<>(id, objects.size());
+        objects.put(tag, object);
+        return tag;
+    }
+
+    @Override
+    public void beforeEach(ExtensionContext context) throws Exception {
+        INSTANCES.put(id, this);
+    }
+
+    @Override
+    public void afterEach(ExtensionContext context) throws Exception {
+        objects.clear();
+        INSTANCES.remove(id);
+    }
+
+    @SuppressWarnings("unchecked")
+    <T> T get(SharedReference<T> tag) {
+        T object = (T) objects.get(tag);
+        if (object == null) {
+            throw new IllegalStateException("Object was accessed after the test was completed");
+        }
+        return object;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        SharedObjectsExtension that = (SharedObjectsExtension) o;
+        return id == that.id;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(id);
+    }
+
+    private static class DefaultTag<T> implements SharedReference<T> {
+        private final int sharedObjectsId;
+        private final int objectId;
+
+        public DefaultTag(int sharedObjectsId, int objectId) {
+            this.sharedObjectsId = sharedObjectsId;
+            this.objectId = objectId;
+        }
+
+        @Override
+        public T get() {
+            return SharedObjectsExtension.get(sharedObjectsId).get(this);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            SharedObjectsExtension.DefaultTag<?> that = (SharedObjectsExtension.DefaultTag<?>) o;
+            return sharedObjectsId == that.sharedObjectsId && objectId == that.objectId;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(sharedObjectsId, objectId);
+        }
+    }
+}
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/retry/RetryExtension.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/retry/RetryExtension.java
new file mode 100644
index 0000000..f1a5c16
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/retry/RetryExtension.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.junit.extensions.retry;
+
+import org.apache.flink.testutils.junit.RetryOnException;
+import org.apache.flink.testutils.junit.RetryOnFailure;
+import org.apache.flink.testutils.junit.extensions.retry.strategy.RetryOnExceptionStrategy;
+import org.apache.flink.testutils.junit.extensions.retry.strategy.RetryOnFailureStrategy;
+import org.apache.flink.testutils.junit.extensions.retry.strategy.RetryStrategy;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.Extension;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContextProvider;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+/** An extension to let failed test retry. */
+public class RetryExtension implements TestTemplateInvocationContextProvider, AfterAllCallback {
+    static final ExtensionContext.Namespace RETRY_NAMESPACE =
+            ExtensionContext.Namespace.create("retryLog");
+    static final String RETRY_KEY = "testRetry";
+
+    @Override
+    public boolean supportsTestTemplate(ExtensionContext context) {
+        RetryOnFailure retryOnFailure = getRetryAnnotation(context, RetryOnFailure.class);
+        RetryOnException retryOnException = getRetryAnnotation(context, RetryOnException.class);
+        return retryOnException != null || retryOnFailure != null;
+    }
+
+    @Override
+    public Stream<TestTemplateInvocationContext> provideTestTemplateInvocationContexts(
+            ExtensionContext context) {
+        RetryOnFailure retryOnFailure = getRetryAnnotation(context, RetryOnFailure.class);
+        RetryOnException retryOnException = getRetryAnnotation(context, RetryOnException.class);
+
+        // sanity check that we don't use both annotations
+        if (retryOnFailure != null && retryOnException != null) {
+            throw new IllegalArgumentException(
+                    "You cannot combine the RetryOnFailure and RetryOnException annotations.");
+        }
+
+        Map<String, RetryStrategy> testLog =
+                (Map<String, RetryStrategy>)
+                        context.getStore(RETRY_NAMESPACE)
+                                .getOrComputeIfAbsent(RETRY_KEY, key -> new HashMap<>());
+        int totalTimes;
+        if (retryOnException != null) {
+            totalTimes = retryOnException.times() + 1;
+            testLog.put(
+                    getTestMethodKey(context),
+                    new RetryOnExceptionStrategy(totalTimes, retryOnException.exception()));
+        } else if (retryOnFailure != null) {
+            totalTimes = retryOnFailure.times() + 1;
+            testLog.put(getTestMethodKey(context), new RetryOnFailureStrategy(totalTimes));
+        } else {
+            throw new IllegalArgumentException("Unsupported retry strategy.");
+        }
+
+        return IntStream.rangeClosed(1, totalTimes).mapToObj(i -> new RetryContext(i, totalTimes));
+    }
+
+    @Override
+    public void afterAll(ExtensionContext context) throws Exception {
+        context.getStore(RETRY_NAMESPACE).remove(RETRY_KEY);
+    }
+
+    static String getTestMethodKey(ExtensionContext context) {
+        return context.getRequiredTestClass().getCanonicalName()
+                + "#"
+                + context.getRequiredTestMethod().getName();
+    }
+
+    private <T extends Annotation> T getRetryAnnotation(
+            ExtensionContext context, Class<T> annotationClass) {
+        Method testMethod = context.getRequiredTestMethod();
+        T annotation = testMethod.getAnnotation(annotationClass);
+
+        if (annotation == null) {
+            // if nothing is specified on the test method, fall back to annotations on the class
+            annotation = context.getTestClass().get().getAnnotation(annotationClass);
+        }
+        return annotation;
+    }
+
+    class RetryContext implements TestTemplateInvocationContext {
+        final int retryIndex;
+        final int totalTimes;
+
+        RetryContext(int retryIndex, int totalTimes) {
+            this.totalTimes = totalTimes;
+            this.retryIndex = retryIndex;
+        }
+
+        @Override
+        public String getDisplayName(int invocationIndex) {
+            return String.format("Attempt [%d/%d]", retryIndex, totalTimes);
+        }
+
+        @Override
+        public List<Extension> getAdditionalExtensions() {
+            return Arrays.asList(new RetryTestExecutionExtension(retryIndex, totalTimes));
+        }
+    }
+}
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/retry/RetryTestExecutionExtension.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/retry/RetryTestExecutionExtension.java
new file mode 100644
index 0000000..8126c74
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/retry/RetryTestExecutionExtension.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.junit.extensions.retry;
+
+import org.apache.flink.testutils.junit.extensions.retry.strategy.RetryStrategy;
+
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.ConditionEvaluationResult;
+import org.junit.jupiter.api.extension.ExecutionCondition;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.TestExecutionExceptionHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+import static org.apache.flink.testutils.junit.extensions.retry.RetryExtension.RETRY_KEY;
+import static org.apache.flink.testutils.junit.extensions.retry.RetryExtension.RETRY_NAMESPACE;
+import static org.apache.flink.testutils.junit.extensions.retry.RetryExtension.getTestMethodKey;
+
+/** Extension to decide whether a retry test should run. */
+public class RetryTestExecutionExtension
+        implements ExecutionCondition, TestExecutionExceptionHandler, AfterEachCallback {
+    private static final Logger LOG = LoggerFactory.getLogger(RetryTestExecutionExtension.class);
+    private final int retryIndex;
+    private final int totalTimes;
+
+    public RetryTestExecutionExtension(int retryIndex, int totalTimes) {
+        this.retryIndex = retryIndex;
+        this.totalTimes = totalTimes;
+    }
+
+    @Override
+    public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context) {
+        RetryStrategy retryStrategy = getRetryStrategyInStore(context);
+        String method = getTestMethodKey(context);
+        if (!retryStrategy.hasNextAttempt()) {
+            return ConditionEvaluationResult.disabled(method + "has already passed or failed.");
+        }
+        return ConditionEvaluationResult.enabled(
+                String.format("Test %s[%d/%d]", method, retryIndex, totalTimes));
+    }
+
+    @Override
+    public void handleTestExecutionException(ExtensionContext context, Throwable throwable)
+            throws Throwable {
+        RetryStrategy retryStrategy = getRetryStrategyInStore(context);
+        String method = getTestMethodKey(context);
+        retryStrategy.handleException(method, retryIndex, throwable);
+    }
+
+    @Override
+    public void afterEach(ExtensionContext context) throws Exception {
+        Throwable exception = context.getExecutionException().orElse(null);
+        if (exception == null) {
+            RetryStrategy retryStrategy = getRetryStrategyInStore(context);
+            String method = getTestMethodKey(context);
+            retryStrategy.stopFollowingAttempts();
+            LOG.trace(
+                    String.format(
+                            "Retry test %s[%d/%d] passed, stop retrying.",
+                            method, retryIndex, totalTimes));
+        }
+    }
+
+    private RetryStrategy getRetryStrategyInStore(ExtensionContext context) {
+        Map<String, RetryStrategy> retryStrategies =
+                (Map<String, RetryStrategy>) context.getStore(RETRY_NAMESPACE).get(RETRY_KEY);
+        String method = getTestMethodKey(context);
+        return retryStrategies.get(method);
+    }
+}
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/retry/strategy/AbstractRetryStrategy.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/retry/strategy/AbstractRetryStrategy.java
new file mode 100644
index 0000000..685d9cf
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/retry/strategy/AbstractRetryStrategy.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.junit.extensions.retry.strategy;
+
+/** Retry strategy base class. */
+public abstract class AbstractRetryStrategy implements RetryStrategy {
+    protected final int totalTimes;
+    protected boolean hasNextAttempt;
+
+    public AbstractRetryStrategy(int totalTimes, boolean hasNextAttempt) {
+        this.totalTimes = totalTimes;
+        this.hasNextAttempt = hasNextAttempt;
+    }
+
+    @Override
+    public boolean hasNextAttempt() {
+        return hasNextAttempt;
+    }
+
+    @Override
+    public void stopFollowingAttempts() {
+        this.hasNextAttempt = false;
+    }
+}
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/retry/strategy/RetryOnExceptionStrategy.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/retry/strategy/RetryOnExceptionStrategy.java
new file mode 100644
index 0000000..68dc1ec
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/retry/strategy/RetryOnExceptionStrategy.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.junit.extensions.retry.strategy;
+
+import org.opentest4j.TestAbortedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Retry strategy that retry fixed times, and will not fail with some kind of exception. */
+public class RetryOnExceptionStrategy extends AbstractRetryStrategy {
+    private static final Logger LOG = LoggerFactory.getLogger(RetryOnExceptionStrategy.class);
+
+    private final Class<? extends Throwable> repeatableException;
+
+    public RetryOnExceptionStrategy(
+            int retryTimes, Class<? extends Throwable> repeatableException) {
+        super(retryTimes, true);
+        this.repeatableException = repeatableException;
+    }
+
+    @Override
+    public void handleException(String testName, int attemptIndex, Throwable throwable)
+            throws Throwable {
+        if (repeatableException.isAssignableFrom(throwable.getClass())) {
+            // continue retrying when get some repeatable exceptions
+            String retryMsg =
+                    String.format(
+                            "Retry test %s[%d/%d] failed with repeatable exception, continue retrying.",
+                            testName, attemptIndex, totalTimes);
+            LOG.warn(retryMsg, throwable);
+            throw new TestAbortedException(retryMsg);
+        } else {
+            // stop retrying when get an unrepeatable exception
+            stopFollowingAttempts();
+            LOG.error(
+                    String.format(
+                            "Retry test %s[%d/%d] failed with unrepeatable exception, stop retrying.",
+                            testName, attemptIndex, totalTimes),
+                    throwable);
+            throw throwable;
+        }
+    }
+}
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/retry/strategy/RetryOnFailureStrategy.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/retry/strategy/RetryOnFailureStrategy.java
new file mode 100644
index 0000000..4d2e44c
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/retry/strategy/RetryOnFailureStrategy.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.junit.extensions.retry.strategy;
+
+import org.opentest4j.TestAbortedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Retry strategy that retry fixed times. */
+public class RetryOnFailureStrategy extends AbstractRetryStrategy {
+    private static final Logger LOG = LoggerFactory.getLogger(RetryOnFailureStrategy.class);
+
+    public RetryOnFailureStrategy(int retryTimes) {
+        super(retryTimes, true);
+    }
+
+    @Override
+    public void handleException(String testName, int attemptIndex, Throwable throwable)
+            throws Throwable {
+        // Failed when reach the total retry times
+        if (attemptIndex >= totalTimes) {
+            LOG.error("Test Failed at the last retry.", throwable);
+            throw throwable;
+        }
+
+        // continue retrying
+        String retryMsg =
+                String.format(
+                        "Retry test %s[%d/%d] failed, continue retrying.",
+                        testName, attemptIndex, totalTimes);
+        LOG.error(retryMsg, throwable);
+        throw new TestAbortedException(retryMsg);
+    }
+}
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/retry/strategy/RetryStrategy.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/retry/strategy/RetryStrategy.java
new file mode 100644
index 0000000..f6f9adc
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/retry/strategy/RetryStrategy.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.junit.extensions.retry.strategy;
+
+/** Retry strategy for executing retry tests. */
+public interface RetryStrategy {
+    /** Return the next attempt should execute or not. */
+    boolean hasNextAttempt();
+
+    /** Stop the following attempts when test succeed or failed. */
+    void stopFollowingAttempts();
+
+    /**
+     * Handle the exception after the test execution.
+     *
+     * @param testName the test name
+     * @param attemptIndex test attempt index that starts from 1
+     * @param throwable the throwable that the test case throws
+     */
+    void handleException(String testName, int attemptIndex, Throwable throwable) throws Throwable;
+}
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/logging/LoggerAuditingExtension.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/logging/LoggerAuditingExtension.java
new file mode 100644
index 0000000..8394e924
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/logging/LoggerAuditingExtension.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.logging;
+
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.Appender;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.appender.AbstractAppender;
+import org.apache.logging.log4j.core.config.AppenderRef;
+import org.apache.logging.log4j.core.config.LoggerConfig;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * Utility for auditing logged messages.(Junit5 extension)
+ *
+ * <p>Implementation note: Make sure to not expose log4j dependencies in the interface of this class
+ * to ease updates in logging infrastructure.
+ */
+public class LoggerAuditingExtension implements BeforeEachCallback, AfterEachCallback {
+    private static final LoggerContext LOGGER_CONTEXT =
+            (LoggerContext) LogManager.getContext(false);
+
+    private final String loggerName;
+    private final org.slf4j.event.Level level;
+
+    private ConcurrentLinkedQueue<String> loggingEvents;
+
+    public LoggerAuditingExtension(Class<?> clazz, org.slf4j.event.Level level) {
+        this.loggerName = clazz.getCanonicalName();
+        this.level = level;
+    }
+
+    public List<String> getMessages() {
+        return new ArrayList<>(loggingEvents);
+    }
+
+    @Override
+    public void beforeEach(ExtensionContext context) throws Exception {
+        loggingEvents = new ConcurrentLinkedQueue<>();
+
+        Appender testAppender =
+                new AbstractAppender("test-appender", null, null, false) {
+                    @Override
+                    public void append(LogEvent event) {
+                        loggingEvents.add(event.getMessage().getFormattedMessage());
+                    }
+                };
+        testAppender.start();
+
+        AppenderRef appenderRef = AppenderRef.createAppenderRef(testAppender.getName(), null, null);
+        LoggerConfig logger =
+                LoggerConfig.createLogger(
+                        false,
+                        Level.getLevel(level.name()),
+                        "test",
+                        null,
+                        new AppenderRef[] {appenderRef},
+                        null,
+                        LOGGER_CONTEXT.getConfiguration(),
+                        null);
+        logger.addAppender(testAppender, null, null);
+
+        LOGGER_CONTEXT.getConfiguration().addLogger(loggerName, logger);
+        LOGGER_CONTEXT.updateLoggers();
+    }
+
+    @Override
+    public void afterEach(ExtensionContext context) throws Exception {
+        LOGGER_CONTEXT.getConfiguration().removeLogger(loggerName);
+        LOGGER_CONTEXT.updateLoggers();
+        loggingEvents = null;
+    }
+}
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/LogLevelExtension.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/LogLevelExtension.java
new file mode 100644
index 0000000..d00278d
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/LogLevelExtension.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.config.Configuration;
+import org.apache.logging.log4j.core.config.LoggerConfig;
+import org.apache.logging.slf4j.Log4jLogger;
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A extension that sets the log level for specific class/package loggers for a test. Logging
+ * configuration will only be extended when logging is enabled at all (so root logger is not OFF).
+ */
+public class LogLevelExtension implements BeforeAllCallback, AfterAllCallback {
+    public static final boolean LOGGING_ENABLED =
+            LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME).isErrorEnabled();
+    private Map<String, Level> testLevels = new HashMap<>();
+    private List<Runnable> resetActions = new ArrayList<>();
+    private static final Map<Level, org.apache.logging.log4j.Level> SLF_TO_LOG4J = new HashMap<>();
+    private LoggerContext log4jContext;
+
+    static {
+        SLF_TO_LOG4J.put(Level.ERROR, org.apache.logging.log4j.Level.ERROR);
+        SLF_TO_LOG4J.put(Level.WARN, org.apache.logging.log4j.Level.WARN);
+        SLF_TO_LOG4J.put(Level.INFO, org.apache.logging.log4j.Level.INFO);
+        SLF_TO_LOG4J.put(Level.DEBUG, org.apache.logging.log4j.Level.DEBUG);
+        SLF_TO_LOG4J.put(Level.TRACE, org.apache.logging.log4j.Level.TRACE);
+    }
+
+    @Override
+    public void beforeAll(ExtensionContext context) throws Exception {
+        if (!LOGGING_ENABLED) {
+            return;
+        }
+
+        for (Map.Entry<String, Level> levelEntry : testLevels.entrySet()) {
+            final Logger logger = LoggerFactory.getLogger(levelEntry.getKey());
+            if (logger instanceof Log4jLogger) {
+                setLog4jLevel(levelEntry.getKey(), levelEntry.getValue());
+            } else {
+                throw new UnsupportedOperationException("Cannot change log level of " + logger);
+            }
+        }
+
+        if (log4jContext != null) {
+            log4jContext.updateLoggers();
+        }
+    }
+
+    private void setLog4jLevel(String logger, Level level) {
+        if (log4jContext == null) {
+            log4jContext = (LoggerContext) LogManager.getContext(false);
+        }
+        final Configuration conf = log4jContext.getConfiguration();
+        LoggerConfig loggerConfig = conf.getLoggers().get(logger);
+        if (loggerConfig != null) {
+            final org.apache.logging.log4j.Level oldLevel = loggerConfig.getLevel();
+            loggerConfig.setLevel(SLF_TO_LOG4J.get(level));
+            resetActions.add(() -> loggerConfig.setLevel(oldLevel));
+        } else {
+            conf.addLogger(logger, new LoggerConfig(logger, SLF_TO_LOG4J.get(level), true));
+            resetActions.add(() -> conf.removeLogger(logger));
+        }
+    }
+
+    @Override
+    public void afterAll(ExtensionContext context) throws Exception {
+        resetActions.forEach(Runnable::run);
+
+        if (log4jContext != null) {
+            log4jContext.updateLoggers();
+        }
+    }
+
+    public LogLevelExtension set(Class<?> clazz, Level level) {
+        return set(clazz.getName(), level);
+    }
+
+    public LogLevelExtension set(Package logPackage, Level level) {
+        return set(logPackage.getName(), level);
+    }
+
+    public LogLevelExtension set(String classOrPackageName, Level level) {
+        testLevels.put(classOrPackageName, level);
+        return this;
+    }
+}
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/TestLoggerExtension.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/TestLoggerExtension.java
new file mode 100644
index 0000000..ba5c776
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/TestLoggerExtension.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.TestWatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+
+/** A JUnit-5-style test logger. */
+public class TestLoggerExtension implements TestWatcher, BeforeEachCallback {
+    private static final Logger LOG = LoggerFactory.getLogger(TestLoggerExtension.class);
+
+    @Override
+    public void beforeEach(ExtensionContext context) {
+        LOG.info(
+                "\n================================================================================"
+                        + "\nTest {}.{} is running."
+                        + "\n--------------------------------------------------------------------------------",
+                context.getRequiredTestClass().getCanonicalName(),
+                context.getRequiredTestMethod().getName());
+    }
+
+    @Override
+    public void testSuccessful(ExtensionContext context) {
+        LOG.info(
+                "\n--------------------------------------------------------------------------------"
+                        + "\nTest {}.{} successfully run."
+                        + "\n================================================================================",
+                context.getRequiredTestClass().getCanonicalName(),
+                context.getRequiredTestMethod().getName());
+    }
+
+    @Override
+    public void testFailed(ExtensionContext context, Throwable cause) {
+        LOG.error(
+                "\n--------------------------------------------------------------------------------"
+                        + "\nTest {}.{} failed with:\n{}"
+                        + "\n================================================================================",
+                context.getRequiredTestClass().getCanonicalName(),
+                context.getRequiredTestMethod().getName(),
+                exceptionToString(cause));
+    }
+
+    private static String exceptionToString(Throwable t) {
+        if (t == null) {
+            return "(null)";
+        }
+
+        try {
+            StringWriter stm = new StringWriter();
+            PrintWriter wrt = new PrintWriter(stm);
+            t.printStackTrace(wrt);
+            wrt.close();
+            return stm.toString();
+        } catch (Throwable ignored) {
+            return t.getClass().getName() + " (error while printing stack trace)";
+        }
+    }
+}
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/test/java/org/apache/flink/testutils/junit/RetryOnExceptionExtensionTest.java b/flink-test-utils-parent/flink-test-utils-junit/src/test/java/org/apache/flink/testutils/junit/RetryOnExceptionExtensionTest.java
new file mode 100644
index 0000000..2065e3e
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/test/java/org/apache/flink/testutils/junit/RetryOnExceptionExtensionTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.junit;
+
+import org.apache.flink.testutils.junit.extensions.retry.RetryExtension;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Tests for the RetryOnException annotation. */
+@ExtendWith(RetryExtension.class)
+public class RetryOnExceptionExtensionTest {
+
+    private static final int NUMBER_OF_RUNS = 3;
+
+    private static int runsForSuccessfulTest = 0;
+
+    private static int runsForTestWithMatchingException = 0;
+
+    private static int runsForTestWithSubclassException = 0;
+
+    private static int runsForPassAfterOneFailure = 0;
+
+    @AfterAll
+    public static void verify() {
+        assertEquals(NUMBER_OF_RUNS + 1, runsForTestWithMatchingException);
+        assertEquals(NUMBER_OF_RUNS + 1, runsForTestWithSubclassException);
+        assertEquals(1, runsForSuccessfulTest);
+        assertEquals(2, runsForPassAfterOneFailure);
+    }
+
+    @TestTemplate
+    @RetryOnException(times = NUMBER_OF_RUNS, exception = IllegalArgumentException.class)
+    public void testSuccessfulTest() {
+        runsForSuccessfulTest++;
+    }
+
+    @TestTemplate
+    @RetryOnException(times = NUMBER_OF_RUNS, exception = IllegalArgumentException.class)
+    public void testMatchingException() {
+        runsForTestWithMatchingException++;
+        if (runsForTestWithMatchingException <= NUMBER_OF_RUNS) {
+            throw new IllegalArgumentException();
+        }
+    }
+
+    @TestTemplate
+    @RetryOnException(times = NUMBER_OF_RUNS, exception = RuntimeException.class)
+    public void testSubclassException() {
+        runsForTestWithSubclassException++;
+        if (runsForTestWithSubclassException <= NUMBER_OF_RUNS) {
+            throw new IllegalArgumentException();
+        }
+    }
+
+    @TestTemplate
+    @RetryOnException(times = NUMBER_OF_RUNS, exception = IllegalArgumentException.class)
+    public void testPassAfterOneFailure() {
+        runsForPassAfterOneFailure++;
+        if (runsForPassAfterOneFailure == 1) {
+            throw new IllegalArgumentException();
+        }
+    }
+}
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/test/java/org/apache/flink/testutils/junit/RetryOnFailureExtensionTest.java b/flink-test-utils-parent/flink-test-utils-junit/src/test/java/org/apache/flink/testutils/junit/RetryOnFailureExtensionTest.java
new file mode 100644
index 0000000..13675d4
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/test/java/org/apache/flink/testutils/junit/RetryOnFailureExtensionTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.junit;
+
+import org.apache.flink.testutils.junit.extensions.retry.RetryExtension;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Tests for the RetryOnFailure annotation. */
+@ExtendWith(RetryExtension.class)
+public class RetryOnFailureExtensionTest {
+
+    private static final int NUMBER_OF_RUNS = 5;
+
+    private static int numberOfFailedRuns;
+
+    private static int numberOfSuccessfulRuns;
+
+    private static boolean firstRun = true;
+
+    @AfterAll
+    public static void verify() throws Exception {
+        assertEquals(NUMBER_OF_RUNS + 1, numberOfFailedRuns);
+        assertEquals(3, numberOfSuccessfulRuns);
+    }
+
+    @TestTemplate
+    @RetryOnFailure(times = NUMBER_OF_RUNS)
+    public void testRetryOnFailure() throws Exception {
+        // All but the (expected) last run should be successful
+        if (numberOfFailedRuns < NUMBER_OF_RUNS) {
+            numberOfFailedRuns++;
+            throw new RuntimeException("Expected test exception");
+        } else {
+            numberOfSuccessfulRuns++;
+        }
+    }
+
+    @TestTemplate
+    @RetryOnFailure(times = NUMBER_OF_RUNS)
+    public void testRetryOnceOnFailure() throws Exception {
+        if (firstRun) {
+            numberOfFailedRuns++;
+            firstRun = false;
+            throw new RuntimeException("Expected test exception");
+        } else {
+            numberOfSuccessfulRuns++;
+        }
+    }
+
+    @TestTemplate
+    @RetryOnFailure(times = NUMBER_OF_RUNS)
+    public void testNotRetryOnSuccess() throws Exception {
+        numberOfSuccessfulRuns++;
+    }
+}
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamCollectorExtension.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamCollectorExtension.java
new file mode 100644
index 0000000..d7be3bf
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamCollectorExtension.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A simple utility for collecting all the elements in a {@link DataStream}.
+ *
+ * <pre>{@code
+ * public class DataStreamTest {
+ *
+ * 		{@literal @}RegisterExtension
+ * 		public StreamCollectorExtension collector = new StreamCollectorExtension();
+ *
+ * 		public void test() throws Exception {
+ * 		 	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ * 		 	DataStream<Integer> stream = env.fromElements(1, 2, 3);
+ *
+ * 		 	CompletableFuture<Collection<Integer>> results = collector.collect(stream);
+ * 		 	Assert.assertThat(results.get(), hasItems(1, 2, 3));
+ * 		}
+ * }
+ * }</pre>
+ *
+ * <p><b>Note:</b> The stream collector assumes: 1) The stream is bounded. 2) All elements will fit
+ * in memory. 3) All tasks run within the same JVM.
+ */
+public class StreamCollectorExtension implements BeforeEachCallback, AfterEachCallback {
+    private static final AtomicLong counter = new AtomicLong();
+
+    private static final Map<Long, CountDownLatch> latches = new ConcurrentHashMap<>();
+
+    private static final Map<Long, Queue> resultQueues = new ConcurrentHashMap<>();
+
+    private List<Long> ids;
+
+    @Override
+    public void beforeEach(ExtensionContext context) throws Exception {
+        ids = new ArrayList<>();
+    }
+
+    /**
+     * @return A future that contains all the elements of the DataStream which completes when all
+     *     elements have been processed.
+     */
+    public <IN> CompletableFuture<Collection<IN>> collect(DataStream<IN> stream) {
+        final long id = counter.getAndIncrement();
+        ids.add(id);
+
+        int parallelism = stream.getParallelism();
+        if (parallelism == ExecutionConfig.PARALLELISM_DEFAULT) {
+            parallelism = stream.getExecutionEnvironment().getParallelism();
+        }
+
+        CountDownLatch latch = new CountDownLatch(parallelism);
+        latches.put(id, latch);
+
+        Queue<IN> results = new ConcurrentLinkedDeque<>();
+        resultQueues.put(id, results);
+
+        stream.addSink(new StreamCollectorExtension.CollectingSink<>(id));
+
+        return CompletableFuture.runAsync(
+                        () -> {
+                            try {
+                                latch.await();
+                            } catch (InterruptedException e) {
+                                throw new RuntimeException("Failed to collect results");
+                            }
+                        })
+                .thenApply(ignore -> results);
+    }
+
+    @Override
+    public void afterEach(ExtensionContext context) throws Exception {
+        for (Long id : ids) {
+            latches.remove(id);
+            resultQueues.remove(id);
+        }
+    }
+
+    private static class CollectingSink<IN> extends RichSinkFunction<IN> {
+
+        private final long id;
+
+        private transient CountDownLatch latch;
+
+        private transient Queue<IN> results;
+
+        private CollectingSink(long id) {
+            this.id = id;
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public void open(Configuration parameters) throws Exception {
+            latch = StreamCollectorExtension.latches.get(id);
+            results = (Queue<IN>) StreamCollectorExtension.resultQueues.get(id);
+        }
+
+        @Override
+        public void invoke(IN value, Context context) throws Exception {
+            results.add(value);
+        }
+
+        @Override
+        public void close() throws Exception {
+            latch.countDown();
+        }
+    }
+}
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterWithClientExtension.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterWithClientExtension.java
new file mode 100644
index 0000000..28650f7
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterWithClientExtension.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.runtime.testutils.MiniClusterExtension;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Starts a Flink mini cluster as a resource and registers the respective ExecutionEnvironment and
+ * StreamExecutionEnvironment.
+ */
+public class MiniClusterWithClientExtension extends MiniClusterExtension {
+    private static final Logger LOG = LoggerFactory.getLogger(MiniClusterWithClientExtension.class);
+
+    private ClusterClient<?> clusterClient;
+    private RestClusterClient<MiniClusterClient.MiniClusterId> restClusterClient;
+
+    private TestEnvironment executionEnvironment;
+
+    public MiniClusterWithClientExtension(
+            final MiniClusterResourceConfiguration miniClusterResourceConfiguration) {
+        super(miniClusterResourceConfiguration);
+    }
+
+    public ClusterClient<?> getClusterClient() {
+        return clusterClient;
+    }
+
+    /**
+     * Returns a {@link RestClusterClient} that can be used to communicate with this mini cluster.
+     * Only use this if the client returned via {@link #getClusterClient()} does not fulfill your
+     * needs.
+     */
+    public RestClusterClient<?> getRestClusterClient() throws Exception {
+        return restClusterClient;
+    }
+
+    public TestEnvironment getTestEnvironment() {
+        return executionEnvironment;
+    }
+
+    @Override
+    public void before(ExtensionContext context) throws Exception {
+        super.before(context);
+
+        clusterClient = createMiniClusterClient();
+        restClusterClient = createRestClusterClient();
+
+        executionEnvironment = new TestEnvironment(getMiniCluster(), getNumberSlots(), false);
+        executionEnvironment.setAsContext();
+        TestStreamEnvironment.setAsContext(getMiniCluster(), getNumberSlots());
+    }
+
+    @Override
+    public void after(ExtensionContext context) throws Exception {
+        LOG.info("Finalization triggered: Cluster shutdown is going to be initiated.");
+        TestStreamEnvironment.unsetAsContext();
+        TestEnvironment.unsetAsContext();
+
+        Exception exception = null;
+
+        if (clusterClient != null) {
+            try {
+                clusterClient.close();
+            } catch (Exception e) {
+                exception = e;
+            }
+        }
+
+        clusterClient = null;
+
+        if (restClusterClient != null) {
+            try {
+                restClusterClient.close();
+            } catch (Exception e) {
+                exception = ExceptionUtils.firstOrSuppressed(e, exception);
+            }
+        }
+
+        restClusterClient = null;
+
+        super.after(context);
+
+        if (exception != null) {
+            LOG.warn("Could not properly shut down the MiniClusterWithClientResource.", exception);
+        }
+    }
+
+    private MiniClusterClient createMiniClusterClient() {
+        return new MiniClusterClient(getClientConfiguration(), getMiniCluster());
+    }
+
+    private RestClusterClient<MiniClusterClient.MiniClusterId> createRestClusterClient()
+            throws Exception {
+        return new RestClusterClient<>(
+                getClientConfiguration(), MiniClusterClient.MiniClusterId.INSTANCE);
+    }
+}