You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/11/24 08:15:17 UTC
[flink] branch master updated: [FLINK-24627][tests] Port JUnit 4 rules to JUnit5 extensions
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 78b231f [FLINK-24627][tests] Port JUnit 4 rules to JUnit5 extensions
78b231f is described below
commit 78b231f60aed59061f0f609e0cfd659d78e6fdd5
Author: Hang Ruan <ru...@hotmail.com>
AuthorDate: Mon Oct 25 11:55:15 2021 +0800
[FLINK-24627][tests] Port JUnit 4 rules to JUnit5 extensions
---
.../TaskCancelAsyncProducerConsumerITCase.java | 22 ++-
.../runtime/testutils/MiniClusterExtension.java | 63 +++++++
.../flink/runtime/util/BlobServerExtension.java | 74 +++++++++
.../runtime/zookeeper/ZooKeeperExtension.java | 82 ++++++++++
.../flink/core/testutils/AllCallbackWrapper.java | 47 ++++++
.../flink/core/testutils/CustomExtension.java | 50 ++++++
.../flink/core/testutils/EachCallbackWrapper.java | 47 ++++++
.../testutils/executor/TestExecutorExtension.java | 54 ++++++
.../testutils/junit/SharedObjectsExtension.java | 181 +++++++++++++++++++++
.../junit/extensions/retry/RetryExtension.java | 129 +++++++++++++++
.../retry/RetryTestExecutionExtension.java | 88 ++++++++++
.../retry/strategy/AbstractRetryStrategy.java | 40 +++++
.../retry/strategy/RetryOnExceptionStrategy.java | 59 +++++++
.../retry/strategy/RetryOnFailureStrategy.java | 50 ++++++
.../extensions/retry/strategy/RetryStrategy.java | 37 +++++
.../testutils/logging/LoggerAuditingExtension.java | 96 +++++++++++
.../org/apache/flink/util/LogLevelExtension.java | 114 +++++++++++++
.../org/apache/flink/util/TestLoggerExtension.java | 80 +++++++++
.../junit/RetryOnExceptionExtensionTest.java | 83 ++++++++++
.../junit/RetryOnFailureExtensionTest.java | 76 +++++++++
.../streaming/util/StreamCollectorExtension.java | 146 +++++++++++++++++
.../test/util/MiniClusterWithClientExtension.java | 123 ++++++++++++++
22 files changed, 1733 insertions(+), 8 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
index c09046b..9dcaefd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.testutils.AllCallbackWrapper;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
@@ -39,15 +40,16 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.minicluster.MiniCluster;
-import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterExtension;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.runtime.testutils.TestingUtils;
import org.apache.flink.types.LongValue;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
import org.apache.flink.util.concurrent.FutureUtils;
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
import java.time.Duration;
import java.util.Arrays;
@@ -58,7 +60,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-public class TaskCancelAsyncProducerConsumerITCase extends TestLogger {
+@ExtendWith({TestLoggerExtension.class})
+public class TaskCancelAsyncProducerConsumerITCase {
// The Exceptions thrown by the producer/consumer Threads
private static volatile Exception ASYNC_PRODUCER_EXCEPTION;
@@ -68,13 +71,16 @@ public class TaskCancelAsyncProducerConsumerITCase extends TestLogger {
private static volatile Thread ASYNC_PRODUCER_THREAD;
private static volatile Thread ASYNC_CONSUMER_THREAD;
- @ClassRule
- public static final MiniClusterResource MINI_CLUSTER_RESOURCE =
- new MiniClusterResource(
+ public static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+ new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(getFlinkConfiguration())
.build());
+ @RegisterExtension
+ public static AllCallbackWrapper allCallbackWrapper =
+ new AllCallbackWrapper(MINI_CLUSTER_RESOURCE);
+
private static Configuration getFlinkConfiguration() {
Configuration config = new Configuration();
config.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, MemorySize.parse("4096"));
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterExtension.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterExtension.java
new file mode 100644
index 0000000..a6f7a37
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterExtension.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testutils;
+
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.core.testutils.CustomExtension;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+import java.net.URI;
+
+/** An extension which starts a {@link MiniCluster} for testing purposes. */
+public class MiniClusterExtension implements CustomExtension {
+ private final MiniClusterResource miniClusterResource;
+
+ public MiniClusterExtension(
+ final MiniClusterResourceConfiguration miniClusterResourceConfiguration) {
+ this.miniClusterResource = new MiniClusterResource(miniClusterResourceConfiguration);
+ }
+
+ public int getNumberSlots() {
+ return miniClusterResource.getNumberSlots();
+ }
+
+ public MiniCluster getMiniCluster() {
+ return miniClusterResource.getMiniCluster();
+ }
+
+ public UnmodifiableConfiguration getClientConfiguration() {
+ return miniClusterResource.getClientConfiguration();
+ }
+
+ public URI getRestAddres() {
+ return miniClusterResource.getRestAddres();
+ }
+
+ @Override
+ public void before(ExtensionContext context) throws Exception {
+ miniClusterResource.before();
+ }
+
+ @Override
+ public void after(ExtensionContext context) throws Exception {
+ miniClusterResource.after();
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerExtension.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerExtension.java
new file mode 100644
index 0000000..ab8d8d6
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerExtension.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CustomExtension;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * A simple {@link org.junit.jupiter.api.extension.Extension} to be used by tests that require a
+ * {@link BlobServer}.
+ */
+public class BlobServerExtension implements CustomExtension {
+ private static final Logger LOG = LoggerFactory.getLogger(BlobServerExtension.class);
+ private final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ private BlobServer blobServer;
+
+ @Override
+ public void before(ExtensionContext context) throws Exception {
+ temporaryFolder.create();
+
+ Configuration config = new Configuration();
+ config.setString(
+ BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
+
+ blobServer = new BlobServer(config, new VoidBlobStore());
+ blobServer.start();
+ }
+
+ @Override
+ public void after(ExtensionContext context) throws Exception {
+ temporaryFolder.delete();
+
+ try {
+ blobServer.close();
+ } catch (IOException e) {
+ LOG.error("Exception while shutting down blob server.", e);
+ }
+ }
+
+ public int getBlobServerPort() {
+ return blobServer.getPort();
+ }
+
+ public BlobServer getBlobServer() {
+ return blobServer;
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperExtension.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperExtension.java
new file mode 100644
index 0000000..a8b3ee1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperExtension.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.flink.core.testutils.CustomExtension;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * {@link org.junit.jupiter.api.extension.Extension} which starts a {@link
+ * org.apache.zookeeper.server.ZooKeeperServer}.
+ */
+public class ZooKeeperExtension implements CustomExtension {
+ private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperExtension.class);
+
+ @Nullable private TestingServer zooKeeperServer;
+
+ public String getConnectString() {
+ verifyIsRunning();
+ return zooKeeperServer.getConnectString();
+ }
+
+ private void verifyIsRunning() {
+ Preconditions.checkState(zooKeeperServer != null);
+ }
+
+ @Override
+ public void before(ExtensionContext context) throws Exception {
+ terminateZooKeeperServer();
+ zooKeeperServer = new TestingServer(true);
+ }
+
+ private void terminateZooKeeperServer() throws IOException {
+ if (zooKeeperServer != null) {
+ zooKeeperServer.stop();
+ zooKeeperServer = null;
+ }
+ }
+
+ @Override
+ public void after(ExtensionContext context) throws Exception {
+ try {
+ terminateZooKeeperServer();
+ } catch (IOException e) {
+ LOG.warn("Could not properly terminate the {}.", getClass().getSimpleName(), e);
+ }
+ }
+
+ public void restart() throws Exception {
+ Preconditions.checkNotNull(zooKeeperServer);
+ zooKeeperServer.restart();
+ }
+
+ public void stop() throws IOException {
+ Preconditions.checkNotNull(zooKeeperServer);
+ zooKeeperServer.stop();
+ }
+}
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/AllCallbackWrapper.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/AllCallbackWrapper.java
new file mode 100644
index 0000000..df4f466
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/AllCallbackWrapper.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.testutils;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+/** An extension wrap logic for {@link BeforeAllCallback} and {@link AfterAllCallback}. */
+public class AllCallbackWrapper<C extends CustomExtension>
+ implements BeforeAllCallback, AfterAllCallback {
+ private final C customExtension;
+
+ public AllCallbackWrapper(C customExtension) {
+ this.customExtension = customExtension;
+ }
+
+ public C getCustomExtension() {
+ return customExtension;
+ }
+
+ @Override
+ public void afterAll(ExtensionContext context) throws Exception {
+ customExtension.after(context);
+ }
+
+ @Override
+ public void beforeAll(ExtensionContext context) throws Exception {
+ customExtension.before(context);
+ }
+}
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CustomExtension.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CustomExtension.java
new file mode 100644
index 0000000..43e542e
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CustomExtension.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.testutils;
+
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+/**
+ * An extension that is invoked before/after all/each tests, depending on whether it is wrapped in a
+ * {@link EachCallbackWrapper} or {@link AllCallbackWrapper}.
+ *
+ * <p>{@code before} method will be called in {@code beforeEach} or {@code beforeAll}. {@code after}
+ * will be called in {@code afterEach} or {@code afterAll}.
+ *
+ * <p>Usage example:
+ *
+ * <pre>{@code
+ * public class Test{
+ * CustomExtension eachCustom = new CustomExtensionImpl1();
+ * CustomExtension allCustom = new CustomExtensionImpl2();
+ * @RegisterExtension
+ * static AllCallbackWrapper allWrapper = new AllCallbackWrapper(allCustom);
+ * @RegisterExtension
+ * EachCallbackWrapper eachWrapper = new EachCallbackWrapper(eachCustom);
+ * }
+ * }</pre>
+ *
+ * <p>A {@code CustomExtension} instance must not be wrapped in both AllCallbackWrapper and
+ * EachCallbackWrapper for the same test class.
+ */
+public interface CustomExtension {
+ default void before(ExtensionContext context) throws Exception {}
+
+ default void after(ExtensionContext context) throws Exception {}
+}
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/EachCallbackWrapper.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/EachCallbackWrapper.java
new file mode 100644
index 0000000..66b1984
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/EachCallbackWrapper.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.testutils;
+
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+/** An extension wrap logic for {@link BeforeEachCallback} and {@link AfterEachCallback}. */
+public class EachCallbackWrapper<C extends CustomExtension>
+ implements BeforeEachCallback, AfterEachCallback {
+ private final C customExtension;
+
+ public EachCallbackWrapper(C customExtension) {
+ this.customExtension = customExtension;
+ }
+
+ public C getCustomExtension() {
+ return customExtension;
+ }
+
+ @Override
+ public void afterEach(ExtensionContext context) throws Exception {
+ customExtension.after(context);
+ }
+
+ @Override
+ public void beforeEach(ExtensionContext context) throws Exception {
+ customExtension.before(context);
+ }
+}
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/executor/TestExecutorExtension.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/executor/TestExecutorExtension.java
new file mode 100644
index 0000000..269420c
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/executor/TestExecutorExtension.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.executor;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
+
+/** Extension which starts/stops an {@link ExecutorService} for testing purposes. */
+public class TestExecutorExtension<T extends ExecutorService>
+ implements BeforeAllCallback, AfterAllCallback {
+ private final Supplier<T> serviceFactory;
+
+ private T executorService;
+
+ public TestExecutorExtension(Supplier<T> serviceFactory) {
+ this.serviceFactory = serviceFactory;
+ }
+
+ @Override
+ public void beforeAll(ExtensionContext context) throws Exception {
+ executorService = serviceFactory.get();
+ }
+
+ public T getExecutor() {
+ // only return an Executor since this resource is in charge of the life cycle
+ return executorService;
+ }
+
+ @Override
+ public void afterAll(ExtensionContext context) throws Exception {
+ if (executorService != null) {
+ executorService.shutdown();
+ }
+ }
+}
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/SharedObjectsExtension.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/SharedObjectsExtension.java
new file mode 100644
index 0000000..c3fac18
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/SharedObjectsExtension.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.junit;
+
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This rule allows objects to be used both in the main test case as well as in UDFs by using
+ * serializable {@link SharedReference}s. Usage:
+ *
+ * <pre><code>
+ * {@literal @RegisterExtension}
+ * public final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();
+ *
+ * {@literal @Test}
+ * public void test() throws Exception {
+ * StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ * {@literal SharedReference<Queue<Long>> listRef = sharedObjects.add(new ConcurrentLinkedQueue<>());}
+ * int n = 10000;
+ * env.setParallelism(100);
+ * env.fromSequence(0, n).map(i -> listRef.get().add(i));
+ * env.execute();
+ * assertEquals(n + 1, listRef.get().size());
+ * assertEquals(
+ * LongStream.rangeClosed(0, n).boxed().collect(Collectors.toList()),
+ * listRef.get().stream().sorted().collect(Collectors.toList()));
+ * }
+ * </code></pre>
+ *
+ * <p>The main idea is that shared objects are bound to the scope of a test case instead of a class.
+ * That allows us to:
+ *
+ * <ul>
+ * <li>Avoid all kinds of static fields in test classes that only exist since all fields in UDFs
+ * need to be serializable.
+ * <li>Hopefully make it easier to reason about the test setup
+ * <li>Facilitate to share more test building blocks across test classes.
+ * <li>Fully allow tests to be rerun/run in parallel without worrying about static fields
+ * </ul>
+ *
+ * <p>Note that since the shared objects are accessed through multiple threads, they need to be
+ * thread-safe or accessed in a thread-safe manner.
+ */
+@NotThreadSafe
+public class SharedObjectsExtension implements BeforeEachCallback, AfterEachCallback {
+ /** Instance-cache used to make a SharedObjects accessible for multiple threads. */
+ private static final Map<Integer, SharedObjectsExtension> INSTANCES = new ConcurrentHashMap<>();
+
+ private static final AtomicInteger LAST_ID = new AtomicInteger();
+ /**
+ * Identifier of the SharedObjects used to retrieve the original instance during
+ * deserialization.
+ */
+ private final int id;
+ /** All registered objects for the current test case. The objects are purged upon completion. */
+ private final transient Map<SharedReference<?>, Object> objects = new ConcurrentHashMap<>();
+
+ private SharedObjectsExtension(int id) {
+ this.id = id;
+ }
+
+ /**
+ * Creates a new instance. Usually that should be done inside a JUnit test class as an
+ * instance-field annotated with {@link org.junit.Rule}.
+ */
+ public static SharedObjectsExtension create() {
+ return new SharedObjectsExtension(LAST_ID.getAndIncrement());
+ }
+
+ private static SharedObjectsExtension get(int sharedObjectsId) {
+ SharedObjectsExtension sharedObjects = INSTANCES.get(sharedObjectsId);
+ if (sharedObjects == null) {
+ throw new IllegalStateException("Object was accessed after the test was completed");
+ }
+ return sharedObjects;
+ }
+
+ /**
+ * Adds a new object to this {@code SharedObjects}. Although not necessary, it is recommended to
+ * only access the object through the returned {@link SharedReference}.
+ */
+ public <T> SharedReference<T> add(T object) {
+ SharedReference<T> tag = new SharedObjectsExtension.DefaultTag<>(id, objects.size());
+ objects.put(tag, object);
+ return tag;
+ }
+
+ @Override
+ public void beforeEach(ExtensionContext context) throws Exception {
+ INSTANCES.put(id, this);
+ }
+
+ @Override
+ public void afterEach(ExtensionContext context) throws Exception {
+ objects.clear();
+ INSTANCES.remove(id);
+ }
+
+ @SuppressWarnings("unchecked")
+ <T> T get(SharedReference<T> tag) {
+ T object = (T) objects.get(tag);
+ if (object == null) {
+ throw new IllegalStateException("Object was accessed after the test was completed");
+ }
+ return object;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SharedObjectsExtension that = (SharedObjectsExtension) o;
+ return id == that.id;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id);
+ }
+
+ private static class DefaultTag<T> implements SharedReference<T> {
+ private final int sharedObjectsId;
+ private final int objectId;
+
+ public DefaultTag(int sharedObjectsId, int objectId) {
+ this.sharedObjectsId = sharedObjectsId;
+ this.objectId = objectId;
+ }
+
+ @Override
+ public T get() {
+ return SharedObjectsExtension.get(sharedObjectsId).get(this);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SharedObjectsExtension.DefaultTag<?> that = (SharedObjectsExtension.DefaultTag<?>) o;
+ return sharedObjectsId == that.sharedObjectsId && objectId == that.objectId;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(sharedObjectsId, objectId);
+ }
+ }
+}
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/retry/RetryExtension.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/retry/RetryExtension.java
new file mode 100644
index 0000000..f1a5c16
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/retry/RetryExtension.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.junit.extensions.retry;
+
+import org.apache.flink.testutils.junit.RetryOnException;
+import org.apache.flink.testutils.junit.RetryOnFailure;
+import org.apache.flink.testutils.junit.extensions.retry.strategy.RetryOnExceptionStrategy;
+import org.apache.flink.testutils.junit.extensions.retry.strategy.RetryOnFailureStrategy;
+import org.apache.flink.testutils.junit.extensions.retry.strategy.RetryStrategy;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.Extension;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContextProvider;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+/** An extension to let failed test retry. */
+public class RetryExtension implements TestTemplateInvocationContextProvider, AfterAllCallback {
+ static final ExtensionContext.Namespace RETRY_NAMESPACE =
+ ExtensionContext.Namespace.create("retryLog");
+ static final String RETRY_KEY = "testRetry";
+
+ @Override
+ public boolean supportsTestTemplate(ExtensionContext context) {
+ RetryOnFailure retryOnFailure = getRetryAnnotation(context, RetryOnFailure.class);
+ RetryOnException retryOnException = getRetryAnnotation(context, RetryOnException.class);
+ return retryOnException != null || retryOnFailure != null;
+ }
+
+ @Override
+ public Stream<TestTemplateInvocationContext> provideTestTemplateInvocationContexts(
+ ExtensionContext context) {
+ RetryOnFailure retryOnFailure = getRetryAnnotation(context, RetryOnFailure.class);
+ RetryOnException retryOnException = getRetryAnnotation(context, RetryOnException.class);
+
+ // sanity check that we don't use both annotations
+ if (retryOnFailure != null && retryOnException != null) {
+ throw new IllegalArgumentException(
+ "You cannot combine the RetryOnFailure and RetryOnException annotations.");
+ }
+
+ Map<String, RetryStrategy> testLog =
+ (Map<String, RetryStrategy>)
+ context.getStore(RETRY_NAMESPACE)
+ .getOrComputeIfAbsent(RETRY_KEY, key -> new HashMap<>());
+ int totalTimes;
+ if (retryOnException != null) {
+ totalTimes = retryOnException.times() + 1;
+ testLog.put(
+ getTestMethodKey(context),
+ new RetryOnExceptionStrategy(totalTimes, retryOnException.exception()));
+ } else if (retryOnFailure != null) {
+ totalTimes = retryOnFailure.times() + 1;
+ testLog.put(getTestMethodKey(context), new RetryOnFailureStrategy(totalTimes));
+ } else {
+ throw new IllegalArgumentException("Unsupported retry strategy.");
+ }
+
+ return IntStream.rangeClosed(1, totalTimes).mapToObj(i -> new RetryContext(i, totalTimes));
+ }
+
+ @Override
+ public void afterAll(ExtensionContext context) throws Exception {
+ context.getStore(RETRY_NAMESPACE).remove(RETRY_KEY);
+ }
+
+ static String getTestMethodKey(ExtensionContext context) {
+ return context.getRequiredTestClass().getCanonicalName()
+ + "#"
+ + context.getRequiredTestMethod().getName();
+ }
+
+ private <T extends Annotation> T getRetryAnnotation(
+ ExtensionContext context, Class<T> annotationClass) {
+ Method testMethod = context.getRequiredTestMethod();
+ T annotation = testMethod.getAnnotation(annotationClass);
+
+ if (annotation == null) {
+ // if nothing is specified on the test method, fall back to annotations on the class
+ annotation = context.getTestClass().get().getAnnotation(annotationClass);
+ }
+ return annotation;
+ }
+
+ class RetryContext implements TestTemplateInvocationContext {
+ final int retryIndex;
+ final int totalTimes;
+
+ RetryContext(int retryIndex, int totalTimes) {
+ this.totalTimes = totalTimes;
+ this.retryIndex = retryIndex;
+ }
+
+ @Override
+ public String getDisplayName(int invocationIndex) {
+ return String.format("Attempt [%d/%d]", retryIndex, totalTimes);
+ }
+
+ @Override
+ public List<Extension> getAdditionalExtensions() {
+ return Arrays.asList(new RetryTestExecutionExtension(retryIndex, totalTimes));
+ }
+ }
+}
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/retry/RetryTestExecutionExtension.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/retry/RetryTestExecutionExtension.java
new file mode 100644
index 0000000..8126c74
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/retry/RetryTestExecutionExtension.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.junit.extensions.retry;
+
+import org.apache.flink.testutils.junit.extensions.retry.strategy.RetryStrategy;
+
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.ConditionEvaluationResult;
+import org.junit.jupiter.api.extension.ExecutionCondition;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.TestExecutionExceptionHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+import static org.apache.flink.testutils.junit.extensions.retry.RetryExtension.RETRY_KEY;
+import static org.apache.flink.testutils.junit.extensions.retry.RetryExtension.RETRY_NAMESPACE;
+import static org.apache.flink.testutils.junit.extensions.retry.RetryExtension.getTestMethodKey;
+
+/** Extension to decide whether a retry test should run. */
+public class RetryTestExecutionExtension
+ implements ExecutionCondition, TestExecutionExceptionHandler, AfterEachCallback {
+ private static final Logger LOG = LoggerFactory.getLogger(RetryTestExecutionExtension.class);
+ private final int retryIndex;
+ private final int totalTimes;
+
+ public RetryTestExecutionExtension(int retryIndex, int totalTimes) {
+ this.retryIndex = retryIndex;
+ this.totalTimes = totalTimes;
+ }
+
+ @Override
+ public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context) {
+ RetryStrategy retryStrategy = getRetryStrategyInStore(context);
+ String method = getTestMethodKey(context);
+ if (!retryStrategy.hasNextAttempt()) {
+ return ConditionEvaluationResult.disabled(method + "has already passed or failed.");
+ }
+ return ConditionEvaluationResult.enabled(
+ String.format("Test %s[%d/%d]", method, retryIndex, totalTimes));
+ }
+
+ @Override
+ public void handleTestExecutionException(ExtensionContext context, Throwable throwable)
+ throws Throwable {
+ RetryStrategy retryStrategy = getRetryStrategyInStore(context);
+ String method = getTestMethodKey(context);
+ retryStrategy.handleException(method, retryIndex, throwable);
+ }
+
+ @Override
+ public void afterEach(ExtensionContext context) throws Exception {
+ Throwable exception = context.getExecutionException().orElse(null);
+ if (exception == null) {
+ RetryStrategy retryStrategy = getRetryStrategyInStore(context);
+ String method = getTestMethodKey(context);
+ retryStrategy.stopFollowingAttempts();
+ LOG.trace(
+ String.format(
+ "Retry test %s[%d/%d] passed, stop retrying.",
+ method, retryIndex, totalTimes));
+ }
+ }
+
+ private RetryStrategy getRetryStrategyInStore(ExtensionContext context) {
+ Map<String, RetryStrategy> retryStrategies =
+ (Map<String, RetryStrategy>) context.getStore(RETRY_NAMESPACE).get(RETRY_KEY);
+ String method = getTestMethodKey(context);
+ return retryStrategies.get(method);
+ }
+}
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/retry/strategy/AbstractRetryStrategy.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/retry/strategy/AbstractRetryStrategy.java
new file mode 100644
index 0000000..685d9cf
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/retry/strategy/AbstractRetryStrategy.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.junit.extensions.retry.strategy;
+
+/** Retry strategy base class. */
+public abstract class AbstractRetryStrategy implements RetryStrategy {
+ protected final int totalTimes;
+ protected boolean hasNextAttempt;
+
+ public AbstractRetryStrategy(int totalTimes, boolean hasNextAttempt) {
+ this.totalTimes = totalTimes;
+ this.hasNextAttempt = hasNextAttempt;
+ }
+
+ @Override
+ public boolean hasNextAttempt() {
+ return hasNextAttempt;
+ }
+
+ @Override
+ public void stopFollowingAttempts() {
+ this.hasNextAttempt = false;
+ }
+}
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/retry/strategy/RetryOnExceptionStrategy.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/retry/strategy/RetryOnExceptionStrategy.java
new file mode 100644
index 0000000..68dc1ec
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/retry/strategy/RetryOnExceptionStrategy.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.junit.extensions.retry.strategy;
+
+import org.opentest4j.TestAbortedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Retry strategy that retry fixed times, and will not fail with some kind of exception. */
+public class RetryOnExceptionStrategy extends AbstractRetryStrategy {
+ private static final Logger LOG = LoggerFactory.getLogger(RetryOnExceptionStrategy.class);
+
+ private final Class<? extends Throwable> repeatableException;
+
+ public RetryOnExceptionStrategy(
+ int retryTimes, Class<? extends Throwable> repeatableException) {
+ super(retryTimes, true);
+ this.repeatableException = repeatableException;
+ }
+
+ @Override
+ public void handleException(String testName, int attemptIndex, Throwable throwable)
+ throws Throwable {
+ if (repeatableException.isAssignableFrom(throwable.getClass())) {
+ // continue retrying when get some repeatable exceptions
+ String retryMsg =
+ String.format(
+ "Retry test %s[%d/%d] failed with repeatable exception, continue retrying.",
+ testName, attemptIndex, totalTimes);
+ LOG.warn(retryMsg, throwable);
+ throw new TestAbortedException(retryMsg);
+ } else {
+ // stop retrying when get an unrepeatable exception
+ stopFollowingAttempts();
+ LOG.error(
+ String.format(
+ "Retry test %s[%d/%d] failed with unrepeatable exception, stop retrying.",
+ testName, attemptIndex, totalTimes),
+ throwable);
+ throw throwable;
+ }
+ }
+}
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/retry/strategy/RetryOnFailureStrategy.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/retry/strategy/RetryOnFailureStrategy.java
new file mode 100644
index 0000000..4d2e44c
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/retry/strategy/RetryOnFailureStrategy.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.junit.extensions.retry.strategy;
+
+import org.opentest4j.TestAbortedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Retry strategy that retry fixed times. */
+public class RetryOnFailureStrategy extends AbstractRetryStrategy {
+ private static final Logger LOG = LoggerFactory.getLogger(RetryOnFailureStrategy.class);
+
+ public RetryOnFailureStrategy(int retryTimes) {
+ super(retryTimes, true);
+ }
+
+ @Override
+ public void handleException(String testName, int attemptIndex, Throwable throwable)
+ throws Throwable {
+ // Failed when reach the total retry times
+ if (attemptIndex >= totalTimes) {
+ LOG.error("Test Failed at the last retry.", throwable);
+ throw throwable;
+ }
+
+ // continue retrying
+ String retryMsg =
+ String.format(
+ "Retry test %s[%d/%d] failed, continue retrying.",
+ testName, attemptIndex, totalTimes);
+ LOG.error(retryMsg, throwable);
+ throw new TestAbortedException(retryMsg);
+ }
+}
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/retry/strategy/RetryStrategy.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/retry/strategy/RetryStrategy.java
new file mode 100644
index 0000000..f6f9adc
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/retry/strategy/RetryStrategy.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.junit.extensions.retry.strategy;
+
+/** Retry strategy for executing retry tests. */
+public interface RetryStrategy {
+ /** Return the next attempt should execute or not. */
+ boolean hasNextAttempt();
+
+ /** Stop the following attempts when test succeed or failed. */
+ void stopFollowingAttempts();
+
+ /**
+ * Handle the exception after the test execution.
+ *
+ * @param testName the test name
+ * @param attemptIndex test attempt index that starts from 1
+ * @param throwable the throwable that the test case throws
+ */
+ void handleException(String testName, int attemptIndex, Throwable throwable) throws Throwable;
+}
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/logging/LoggerAuditingExtension.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/logging/LoggerAuditingExtension.java
new file mode 100644
index 0000000..8394e924
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/logging/LoggerAuditingExtension.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.logging;
+
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.Appender;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.appender.AbstractAppender;
+import org.apache.logging.log4j.core.config.AppenderRef;
+import org.apache.logging.log4j.core.config.LoggerConfig;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * Utility for auditing logged messages.(Junit5 extension)
+ *
+ * <p>Implementation note: Make sure to not expose log4j dependencies in the interface of this class
+ * to ease updates in logging infrastructure.
+ */
+public class LoggerAuditingExtension implements BeforeEachCallback, AfterEachCallback {
+ private static final LoggerContext LOGGER_CONTEXT =
+ (LoggerContext) LogManager.getContext(false);
+
+ private final String loggerName;
+ private final org.slf4j.event.Level level;
+
+ private ConcurrentLinkedQueue<String> loggingEvents;
+
+ public LoggerAuditingExtension(Class<?> clazz, org.slf4j.event.Level level) {
+ this.loggerName = clazz.getCanonicalName();
+ this.level = level;
+ }
+
+ public List<String> getMessages() {
+ return new ArrayList<>(loggingEvents);
+ }
+
+ @Override
+ public void beforeEach(ExtensionContext context) throws Exception {
+ loggingEvents = new ConcurrentLinkedQueue<>();
+
+ Appender testAppender =
+ new AbstractAppender("test-appender", null, null, false) {
+ @Override
+ public void append(LogEvent event) {
+ loggingEvents.add(event.getMessage().getFormattedMessage());
+ }
+ };
+ testAppender.start();
+
+ AppenderRef appenderRef = AppenderRef.createAppenderRef(testAppender.getName(), null, null);
+ LoggerConfig logger =
+ LoggerConfig.createLogger(
+ false,
+ Level.getLevel(level.name()),
+ "test",
+ null,
+ new AppenderRef[] {appenderRef},
+ null,
+ LOGGER_CONTEXT.getConfiguration(),
+ null);
+ logger.addAppender(testAppender, null, null);
+
+ LOGGER_CONTEXT.getConfiguration().addLogger(loggerName, logger);
+ LOGGER_CONTEXT.updateLoggers();
+ }
+
+ @Override
+ public void afterEach(ExtensionContext context) throws Exception {
+ LOGGER_CONTEXT.getConfiguration().removeLogger(loggerName);
+ LOGGER_CONTEXT.updateLoggers();
+ loggingEvents = null;
+ }
+}
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/LogLevelExtension.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/LogLevelExtension.java
new file mode 100644
index 0000000..d00278d
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/LogLevelExtension.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.config.Configuration;
+import org.apache.logging.log4j.core.config.LoggerConfig;
+import org.apache.logging.slf4j.Log4jLogger;
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A extension that sets the log level for specific class/package loggers for a test. Logging
+ * configuration will only be extended when logging is enabled at all (so root logger is not OFF).
+ */
+public class LogLevelExtension implements BeforeAllCallback, AfterAllCallback {
+ public static final boolean LOGGING_ENABLED =
+ LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME).isErrorEnabled();
+ private Map<String, Level> testLevels = new HashMap<>();
+ private List<Runnable> resetActions = new ArrayList<>();
+ private static final Map<Level, org.apache.logging.log4j.Level> SLF_TO_LOG4J = new HashMap<>();
+ private LoggerContext log4jContext;
+
+ static {
+ SLF_TO_LOG4J.put(Level.ERROR, org.apache.logging.log4j.Level.ERROR);
+ SLF_TO_LOG4J.put(Level.WARN, org.apache.logging.log4j.Level.WARN);
+ SLF_TO_LOG4J.put(Level.INFO, org.apache.logging.log4j.Level.INFO);
+ SLF_TO_LOG4J.put(Level.DEBUG, org.apache.logging.log4j.Level.DEBUG);
+ SLF_TO_LOG4J.put(Level.TRACE, org.apache.logging.log4j.Level.TRACE);
+ }
+
+ @Override
+ public void beforeAll(ExtensionContext context) throws Exception {
+ if (!LOGGING_ENABLED) {
+ return;
+ }
+
+ for (Map.Entry<String, Level> levelEntry : testLevels.entrySet()) {
+ final Logger logger = LoggerFactory.getLogger(levelEntry.getKey());
+ if (logger instanceof Log4jLogger) {
+ setLog4jLevel(levelEntry.getKey(), levelEntry.getValue());
+ } else {
+ throw new UnsupportedOperationException("Cannot change log level of " + logger);
+ }
+ }
+
+ if (log4jContext != null) {
+ log4jContext.updateLoggers();
+ }
+ }
+
+ private void setLog4jLevel(String logger, Level level) {
+ if (log4jContext == null) {
+ log4jContext = (LoggerContext) LogManager.getContext(false);
+ }
+ final Configuration conf = log4jContext.getConfiguration();
+ LoggerConfig loggerConfig = conf.getLoggers().get(logger);
+ if (loggerConfig != null) {
+ final org.apache.logging.log4j.Level oldLevel = loggerConfig.getLevel();
+ loggerConfig.setLevel(SLF_TO_LOG4J.get(level));
+ resetActions.add(() -> loggerConfig.setLevel(oldLevel));
+ } else {
+ conf.addLogger(logger, new LoggerConfig(logger, SLF_TO_LOG4J.get(level), true));
+ resetActions.add(() -> conf.removeLogger(logger));
+ }
+ }
+
+ @Override
+ public void afterAll(ExtensionContext context) throws Exception {
+ resetActions.forEach(Runnable::run);
+
+ if (log4jContext != null) {
+ log4jContext.updateLoggers();
+ }
+ }
+
+ public LogLevelExtension set(Class<?> clazz, Level level) {
+ return set(clazz.getName(), level);
+ }
+
+ public LogLevelExtension set(Package logPackage, Level level) {
+ return set(logPackage.getName(), level);
+ }
+
+ public LogLevelExtension set(String classOrPackageName, Level level) {
+ testLevels.put(classOrPackageName, level);
+ return this;
+ }
+}
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/TestLoggerExtension.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/TestLoggerExtension.java
new file mode 100644
index 0000000..ba5c776
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/TestLoggerExtension.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.TestWatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+
+/** A JUnit-5-style test logger. */
+public class TestLoggerExtension implements TestWatcher, BeforeEachCallback {
+ private static final Logger LOG = LoggerFactory.getLogger(TestLoggerExtension.class);
+
+ @Override
+ public void beforeEach(ExtensionContext context) {
+ LOG.info(
+ "\n================================================================================"
+ + "\nTest {}.{} is running."
+ + "\n--------------------------------------------------------------------------------",
+ context.getRequiredTestClass().getCanonicalName(),
+ context.getRequiredTestMethod().getName());
+ }
+
+ @Override
+ public void testSuccessful(ExtensionContext context) {
+ LOG.info(
+ "\n--------------------------------------------------------------------------------"
+ + "\nTest {}.{} successfully run."
+ + "\n================================================================================",
+ context.getRequiredTestClass().getCanonicalName(),
+ context.getRequiredTestMethod().getName());
+ }
+
+ @Override
+ public void testFailed(ExtensionContext context, Throwable cause) {
+ LOG.error(
+ "\n--------------------------------------------------------------------------------"
+ + "\nTest {}.{} failed with:\n{}"
+ + "\n================================================================================",
+ context.getRequiredTestClass().getCanonicalName(),
+ context.getRequiredTestMethod().getName(),
+ exceptionToString(cause));
+ }
+
+ private static String exceptionToString(Throwable t) {
+ if (t == null) {
+ return "(null)";
+ }
+
+ try {
+ StringWriter stm = new StringWriter();
+ PrintWriter wrt = new PrintWriter(stm);
+ t.printStackTrace(wrt);
+ wrt.close();
+ return stm.toString();
+ } catch (Throwable ignored) {
+ return t.getClass().getName() + " (error while printing stack trace)";
+ }
+ }
+}
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/test/java/org/apache/flink/testutils/junit/RetryOnExceptionExtensionTest.java b/flink-test-utils-parent/flink-test-utils-junit/src/test/java/org/apache/flink/testutils/junit/RetryOnExceptionExtensionTest.java
new file mode 100644
index 0000000..2065e3e
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/test/java/org/apache/flink/testutils/junit/RetryOnExceptionExtensionTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.junit;
+
+import org.apache.flink.testutils.junit.extensions.retry.RetryExtension;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Tests for the RetryOnException annotation. */
+@ExtendWith(RetryExtension.class)
+public class RetryOnExceptionExtensionTest {
+
+ private static final int NUMBER_OF_RUNS = 3;
+
+ private static int runsForSuccessfulTest = 0;
+
+ private static int runsForTestWithMatchingException = 0;
+
+ private static int runsForTestWithSubclassException = 0;
+
+ private static int runsForPassAfterOneFailure = 0;
+
+ @AfterAll
+ public static void verify() {
+ assertEquals(NUMBER_OF_RUNS + 1, runsForTestWithMatchingException);
+ assertEquals(NUMBER_OF_RUNS + 1, runsForTestWithSubclassException);
+ assertEquals(1, runsForSuccessfulTest);
+ assertEquals(2, runsForPassAfterOneFailure);
+ }
+
+ @TestTemplate
+ @RetryOnException(times = NUMBER_OF_RUNS, exception = IllegalArgumentException.class)
+ public void testSuccessfulTest() {
+ runsForSuccessfulTest++;
+ }
+
+ @TestTemplate
+ @RetryOnException(times = NUMBER_OF_RUNS, exception = IllegalArgumentException.class)
+ public void testMatchingException() {
+ runsForTestWithMatchingException++;
+ if (runsForTestWithMatchingException <= NUMBER_OF_RUNS) {
+ throw new IllegalArgumentException();
+ }
+ }
+
+ @TestTemplate
+ @RetryOnException(times = NUMBER_OF_RUNS, exception = RuntimeException.class)
+ public void testSubclassException() {
+ runsForTestWithSubclassException++;
+ if (runsForTestWithSubclassException <= NUMBER_OF_RUNS) {
+ throw new IllegalArgumentException();
+ }
+ }
+
+ @TestTemplate
+ @RetryOnException(times = NUMBER_OF_RUNS, exception = IllegalArgumentException.class)
+ public void testPassAfterOneFailure() {
+ runsForPassAfterOneFailure++;
+ if (runsForPassAfterOneFailure == 1) {
+ throw new IllegalArgumentException();
+ }
+ }
+}
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/test/java/org/apache/flink/testutils/junit/RetryOnFailureExtensionTest.java b/flink-test-utils-parent/flink-test-utils-junit/src/test/java/org/apache/flink/testutils/junit/RetryOnFailureExtensionTest.java
new file mode 100644
index 0000000..13675d4
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/test/java/org/apache/flink/testutils/junit/RetryOnFailureExtensionTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.junit;
+
+import org.apache.flink.testutils.junit.extensions.retry.RetryExtension;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Tests for the RetryOnFailure annotation. */
+@ExtendWith(RetryExtension.class)
+public class RetryOnFailureExtensionTest {
+
+ private static final int NUMBER_OF_RUNS = 5;
+
+ private static int numberOfFailedRuns;
+
+ private static int numberOfSuccessfulRuns;
+
+ private static boolean firstRun = true;
+
+ @AfterAll
+ public static void verify() throws Exception {
+ assertEquals(NUMBER_OF_RUNS + 1, numberOfFailedRuns);
+ assertEquals(3, numberOfSuccessfulRuns);
+ }
+
+ @TestTemplate
+ @RetryOnFailure(times = NUMBER_OF_RUNS)
+ public void testRetryOnFailure() throws Exception {
+ // All but the (expected) last run should be successful
+ if (numberOfFailedRuns < NUMBER_OF_RUNS) {
+ numberOfFailedRuns++;
+ throw new RuntimeException("Expected test exception");
+ } else {
+ numberOfSuccessfulRuns++;
+ }
+ }
+
+ @TestTemplate
+ @RetryOnFailure(times = NUMBER_OF_RUNS)
+ public void testRetryOnceOnFailure() throws Exception {
+ if (firstRun) {
+ numberOfFailedRuns++;
+ firstRun = false;
+ throw new RuntimeException("Expected test exception");
+ } else {
+ numberOfSuccessfulRuns++;
+ }
+ }
+
+ @TestTemplate
+ @RetryOnFailure(times = NUMBER_OF_RUNS)
+ public void testNotRetryOnSuccess() throws Exception {
+ numberOfSuccessfulRuns++;
+ }
+}
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamCollectorExtension.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamCollectorExtension.java
new file mode 100644
index 0000000..d7be3bf
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamCollectorExtension.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A simple utility for collecting all the elements in a {@link DataStream}.
+ *
+ * <pre>{@code
+ * public class DataStreamTest {
+ *
+ * {@literal @}RegisterExtension
+ * public StreamCollectorExtension collector = new StreamCollectorExtension();
+ *
+ * public void test() throws Exception {
+ * StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ * DataStream<Integer> stream = env.fromElements(1, 2, 3);
+ *
+ * CompletableFuture<Collection<Integer>> results = collector.collect(stream);
+ * Assert.assertThat(results.get(), hasItems(1, 2, 3));
+ * }
+ * }
+ * }</pre>
+ *
+ * <p><b>Note:</b> The stream collector assumes: 1) The stream is bounded. 2) All elements will fit
+ * in memory. 3) All tasks run within the same JVM.
+ */
+public class StreamCollectorExtension implements BeforeEachCallback, AfterEachCallback {
+ private static final AtomicLong counter = new AtomicLong();
+
+ private static final Map<Long, CountDownLatch> latches = new ConcurrentHashMap<>();
+
+ private static final Map<Long, Queue> resultQueues = new ConcurrentHashMap<>();
+
+ private List<Long> ids;
+
+ @Override
+ public void beforeEach(ExtensionContext context) throws Exception {
+ ids = new ArrayList<>();
+ }
+
+ /**
+ * @return A future that contains all the elements of the DataStream which completes when all
+ * elements have been processed.
+ */
+ public <IN> CompletableFuture<Collection<IN>> collect(DataStream<IN> stream) {
+ final long id = counter.getAndIncrement();
+ ids.add(id);
+
+ int parallelism = stream.getParallelism();
+ if (parallelism == ExecutionConfig.PARALLELISM_DEFAULT) {
+ parallelism = stream.getExecutionEnvironment().getParallelism();
+ }
+
+ CountDownLatch latch = new CountDownLatch(parallelism);
+ latches.put(id, latch);
+
+ Queue<IN> results = new ConcurrentLinkedDeque<>();
+ resultQueues.put(id, results);
+
+ stream.addSink(new StreamCollectorExtension.CollectingSink<>(id));
+
+ return CompletableFuture.runAsync(
+ () -> {
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Failed to collect results");
+ }
+ })
+ .thenApply(ignore -> results);
+ }
+
+ @Override
+ public void afterEach(ExtensionContext context) throws Exception {
+ for (Long id : ids) {
+ latches.remove(id);
+ resultQueues.remove(id);
+ }
+ }
+
+ private static class CollectingSink<IN> extends RichSinkFunction<IN> {
+
+ private final long id;
+
+ private transient CountDownLatch latch;
+
+ private transient Queue<IN> results;
+
+ private CollectingSink(long id) {
+ this.id = id;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void open(Configuration parameters) throws Exception {
+ latch = StreamCollectorExtension.latches.get(id);
+ results = (Queue<IN>) StreamCollectorExtension.resultQueues.get(id);
+ }
+
+ @Override
+ public void invoke(IN value, Context context) throws Exception {
+ results.add(value);
+ }
+
+ @Override
+ public void close() throws Exception {
+ latch.countDown();
+ }
+ }
+}
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterWithClientExtension.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterWithClientExtension.java
new file mode 100644
index 0000000..28650f7
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterWithClientExtension.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.runtime.testutils.MiniClusterExtension;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Starts a Flink mini cluster as a resource and registers the respective ExecutionEnvironment and
+ * StreamExecutionEnvironment.
+ */
+public class MiniClusterWithClientExtension extends MiniClusterExtension {
+ private static final Logger LOG = LoggerFactory.getLogger(MiniClusterWithClientExtension.class);
+
+ private ClusterClient<?> clusterClient;
+ private RestClusterClient<MiniClusterClient.MiniClusterId> restClusterClient;
+
+ private TestEnvironment executionEnvironment;
+
+ public MiniClusterWithClientExtension(
+ final MiniClusterResourceConfiguration miniClusterResourceConfiguration) {
+ super(miniClusterResourceConfiguration);
+ }
+
+ public ClusterClient<?> getClusterClient() {
+ return clusterClient;
+ }
+
+ /**
+ * Returns a {@link RestClusterClient} that can be used to communicate with this mini cluster.
+ * Only use this if the client returned via {@link #getClusterClient()} does not fulfill your
+ * needs.
+ */
+ public RestClusterClient<?> getRestClusterClient() throws Exception {
+ return restClusterClient;
+ }
+
+ public TestEnvironment getTestEnvironment() {
+ return executionEnvironment;
+ }
+
+ @Override
+ public void before(ExtensionContext context) throws Exception {
+ super.before(context);
+
+ clusterClient = createMiniClusterClient();
+ restClusterClient = createRestClusterClient();
+
+ executionEnvironment = new TestEnvironment(getMiniCluster(), getNumberSlots(), false);
+ executionEnvironment.setAsContext();
+ TestStreamEnvironment.setAsContext(getMiniCluster(), getNumberSlots());
+ }
+
+ @Override
+ public void after(ExtensionContext context) throws Exception {
+ LOG.info("Finalization triggered: Cluster shutdown is going to be initiated.");
+ TestStreamEnvironment.unsetAsContext();
+ TestEnvironment.unsetAsContext();
+
+ Exception exception = null;
+
+ if (clusterClient != null) {
+ try {
+ clusterClient.close();
+ } catch (Exception e) {
+ exception = e;
+ }
+ }
+
+ clusterClient = null;
+
+ if (restClusterClient != null) {
+ try {
+ restClusterClient.close();
+ } catch (Exception e) {
+ exception = ExceptionUtils.firstOrSuppressed(e, exception);
+ }
+ }
+
+ restClusterClient = null;
+
+ super.after(context);
+
+ if (exception != null) {
+ LOG.warn("Could not properly shut down the MiniClusterWithClientResource.", exception);
+ }
+ }
+
+ private MiniClusterClient createMiniClusterClient() {
+ return new MiniClusterClient(getClientConfiguration(), getMiniCluster());
+ }
+
+ private RestClusterClient<MiniClusterClient.MiniClusterId> createRestClusterClient()
+ throws Exception {
+ return new RestClusterClient<>(
+ getClientConfiguration(), MiniClusterClient.MiniClusterId.INSTANCE);
+ }
+}