You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ta...@apache.org on 2022/07/11 11:46:24 UTC
[flink] branch master updated: [FLINK-25315][tests] Introduce extensions and utils to help the Junit5 migration
This is an automated email from the ASF dual-hosted git repository.
tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 3bd3a0258b7 [FLINK-25315][tests] Introduce extensions and utils to help the Junit5 migration
3bd3a0258b7 is described below
commit 3bd3a0258b74d0d59a4161ae85ae0e81adb8b5b9
Author: yuchengxin <ca...@qq.com>
AuthorDate: Wed Dec 15 14:05:05 2021 +0800
[FLINK-25315][tests] Introduce extensions and utils to help the Junit5 migration
Co-authored-by: Hang Ruan <ru...@hotmail.com>
This closes #20145.
---
.../program/PackagedProgramUtilsPipelineTest.java | 35 +--
.../table/runtime/hashtable/LongHashTableTest.java | 58 ++---
.../junit/extensions/parameterized/Parameter.java | 36 +++
.../parameterized/ParameterizedTestExtension.java | 251 +++++++++++++++++++++
.../junit/extensions/parameterized/Parameters.java | 34 +++
.../flink/testutils/junit/utils/TempDirUtils.java | 100 ++++++++
6 files changed, 472 insertions(+), 42 deletions(-)
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramUtilsPipelineTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramUtilsPipelineTest.java
index 8e356b202e2..ecc0792c1ae 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramUtilsPipelineTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramUtilsPipelineTest.java
@@ -28,12 +28,14 @@ import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.testutils.ClassLoaderUtils;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
import java.net.URL;
@@ -51,14 +53,14 @@ import static org.junit.Assert.assertThat;
* Tests for {@link PackagedProgramUtils} methods that should be executed for {@link
* StreamExecutionEnvironment} and {@link Environment}.
*/
-@RunWith(Parameterized.class)
+@ExtendWith(ParameterizedTestExtension.class)
public class PackagedProgramUtilsPipelineTest {
- @Parameterized.Parameter public TestParameter testParameter;
+ @Parameter public TestParameter testParameter;
- @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+ @TempDir private java.nio.file.Path temporaryFolder;
- @Parameterized.Parameters
+ @Parameters(name = "testParameter-{0}")
public static Collection<TestParameter> parameters() {
return Arrays.asList(
TestParameter.of(
@@ -73,8 +75,8 @@ public class PackagedProgramUtilsPipelineTest {
* This tests whether configuration forwarding from a {@link Configuration} to the environment
* works.
*/
- @Test
- public void testConfigurationForwarding() throws Exception {
+ @TestTemplate
+ void testConfigurationForwarding() throws Exception {
// we want to test forwarding with this config, ensure that the default is what we expect.
assertThat(
ExecutionEnvironment.getExecutionEnvironment()
@@ -100,8 +102,8 @@ public class PackagedProgramUtilsPipelineTest {
assertThat(executionConfig.isAutoTypeRegistrationDisabled(), is(true));
}
- @Test
- public void testUserClassloaderForConfiguration() throws Exception {
+ @TestTemplate
+ void testUserClassloaderForConfiguration() throws Exception {
String userSerializerClassName = "UserSerializer";
List<URL> userUrls = getClassUrls(userSerializerClassName);
@@ -137,7 +139,7 @@ public class PackagedProgramUtilsPipelineTest {
private List<URL> getClassUrls(String className) throws IOException {
URLClassLoader urlClassLoader =
ClassLoaderUtils.compileAndLoadJava(
- temporaryFolder.newFolder(),
+ TempDirUtils.newFolder(temporaryFolder),
className + ".java",
"import com.esotericsoftware.kryo.Kryo;\n"
+ "import com.esotericsoftware.kryo.Serializer;\n"
@@ -178,6 +180,11 @@ public class PackagedProgramUtilsPipelineTest {
public ExecutionConfig extractExecutionConfig(Pipeline pipeline) {
return executionConfigExtractor.apply(pipeline);
}
+
+ @Override
+ public String toString() {
+ return entryClass.getSimpleName();
+ }
};
}
}
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/hashtable/LongHashTableTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/hashtable/LongHashTableTest.java
index a36ccbd7d0f..4ae6506ec50 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/hashtable/LongHashTableTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/hashtable/LongHashTableTest.java
@@ -33,12 +33,13 @@ import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
import org.apache.flink.table.runtime.util.RowIterator;
import org.apache.flink.table.runtime.util.UniformBinaryRowGenerator;
+import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.util.MutableObjectIterator;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
import java.io.IOException;
import java.util.ArrayList;
@@ -48,10 +49,10 @@ import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.fail;
+import static org.assertj.core.api.Fail.fail;
/** Test for {@link LongHashPartition}. */
-@RunWith(Parameterized.class)
+@ExtendWith(ParameterizedTestExtension.class)
public class LongHashTableTest {
private static final int PAGE_SIZE = 32 * 1024;
@@ -68,12 +69,12 @@ public class LongHashTableTest {
this.useCompress = useCompress;
}
- @Parameterized.Parameters(name = "useCompress-{0}")
+ @Parameters(name = "useCompress-{0}")
public static List<Boolean> getVarSeg() {
return Arrays.asList(true, false);
}
- @Before
+ @BeforeEach
public void init() {
TypeInformation[] types = new TypeInformation[] {Types.INT, Types.INT};
this.buildSideSerializer = new BinaryRowDataSerializer(types.length);
@@ -115,8 +116,8 @@ public class LongHashTableTest {
}
}
- @Test
- public void testInMemory() throws IOException {
+ @TestTemplate
+ void testInMemory() throws IOException {
final int numKeys = 100000;
final int buildValsPerKey = 3;
final int probeValsPerKey = 10;
@@ -132,6 +133,7 @@ public class LongHashTableTest {
final MyHashTable table = new MyHashTable(500 * PAGE_SIZE);
int numRecordsInJoinResult = join(table, buildInput, probeInput);
+
assertThat(numRecordsInJoinResult)
.as("Wrong number of records in join result.")
.isEqualTo(numKeys * buildValsPerKey * probeValsPerKey);
@@ -141,8 +143,8 @@ public class LongHashTableTest {
table.free();
}
- @Test
- public void testSpillingHashJoinOneRecursion() throws IOException {
+ @TestTemplate
+ void testSpillingHashJoinOneRecursion() throws IOException {
final int numKeys = 100000;
final int buildValsPerKey = 3;
final int probeValsPerKey = 10;
@@ -171,8 +173,8 @@ public class LongHashTableTest {
}
/** Non partition in memory in level 0. */
- @Test
- public void testSpillingHashJoinOneRecursionPerformance() throws IOException {
+ @TestTemplate
+ void testSpillingHashJoinOneRecursionPerformance() throws IOException {
final int numKeys = 1000000;
final int buildValsPerKey = 3;
final int probeValsPerKey = 10;
@@ -200,8 +202,8 @@ public class LongHashTableTest {
table.free();
}
- @Test
- public void testSpillingHashJoinOneRecursionValidity() throws IOException {
+ @TestTemplate
+ void testSpillingHashJoinOneRecursionValidity() throws IOException {
final int numKeys = 1000000;
final int buildValsPerKey = 3;
final int probeValsPerKey = 10;
@@ -254,8 +256,8 @@ public class LongHashTableTest {
table.free();
}
- @Test
- public void testSpillingHashJoinWithMassiveCollisions() throws IOException {
+ @TestTemplate
+ void testSpillingHashJoinWithMassiveCollisions() throws IOException {
// the following two values are known to have a hash-code collision on the initial level.
// we use them to make sure one partition grows over-proportionally large
final int repeatedValue1 = 40559;
@@ -339,8 +341,8 @@ public class LongHashTableTest {
table.free();
}
- @Test
- public void testSpillingHashJoinWithTwoRecursions() throws IOException {
+ @TestTemplate
+ void testSpillingHashJoinWithTwoRecursions() throws IOException {
// the following two values are known to have a hash-code collision on the first recursion
// level.
// we use them to make sure one partition grows over-proportionally large
@@ -430,8 +432,8 @@ public class LongHashTableTest {
* of repeated values (causing bucket collisions) are large enough to make sure that their target partition no longer
* fits into memory by itself and needs to be repartitioned in the recursion again.
*/
- @Test
- public void testFailingHashJoinTooManyRecursions() throws IOException {
+ @TestTemplate
+ void testFailingHashJoinTooManyRecursions() throws IOException {
// the following two values are known to have a hash-code collision on the first recursion
// level.
// we use them to make sure one partition grows over-proportionally large
@@ -489,8 +491,8 @@ public class LongHashTableTest {
table.free();
}
- @Test
- public void testSparseProbeSpilling() throws IOException, MemoryAllocationException {
+ @TestTemplate
+ void testSparseProbeSpilling() throws IOException, MemoryAllocationException {
final int numBuildKeys = 1000000;
final int numBuildVals = 1;
final int numProbeKeys = 20;
@@ -518,8 +520,8 @@ public class LongHashTableTest {
table.free();
}
- @Test
- public void validateSpillingDuringInsertion() throws IOException, MemoryAllocationException {
+ @TestTemplate
+ void validateSpillingDuringInsertion() throws IOException, MemoryAllocationException {
final int numBuildKeys = 500000;
final int numBuildVals = 1;
final int numProbeKeys = 10;
@@ -547,8 +549,8 @@ public class LongHashTableTest {
table.free();
}
- @Test
- public void testBucketsNotFulfillSegment() throws Exception {
+ @TestTemplate
+ void testBucketsNotFulfillSegment() throws Exception {
final int numKeys = 10000;
final int buildValsPerKey = 3;
final int probeValsPerKey = 10;
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/parameterized/Parameter.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/parameterized/Parameter.java
new file mode 100644
index 00000000000..1cfbf69e89b
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/parameterized/Parameter.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.junit.extensions.parameterized;
+
+import org.junit.runners.Parameterized;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * The annotation is used to replace {@link Parameterized.Parameter} for Junit 5 parameterized
+ * tests.
+ */
+@Target(ElementType.FIELD)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface Parameter {
+ int value() default 0;
+}
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/parameterized/ParameterizedTestExtension.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/parameterized/ParameterizedTestExtension.java
new file mode 100644
index 00000000000..f32f52041c7
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/parameterized/ParameterizedTestExtension.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.junit.extensions.parameterized;
+
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.Extension;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.ParameterContext;
+import org.junit.jupiter.api.extension.ParameterResolutionException;
+import org.junit.jupiter.api.extension.ParameterResolver;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContextProvider;
+import org.junit.platform.commons.support.AnnotationSupport;
+import org.junit.platform.commons.support.HierarchyTraversalMode;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.text.MessageFormat;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+/**
+ * This extension is used to implement parameterized tests for Junit 5 to replace Parameterized in
+ * Junit4.
+ *
+ * <p>When use this extension, all tests must be annotated by {@link TestTemplate}.
+ */
+public class ParameterizedTestExtension implements TestTemplateInvocationContextProvider {
+
+ private static final ExtensionContext.Namespace NAMESPACE =
+ ExtensionContext.Namespace.create("parameterized");
+ private static final String PARAMETERS_STORE_KEY = "parameters";
+ private static final String PARAMETER_FIELD_STORE_KEY_PREFIX = "parameterField_";
+ private static final String INDEX_TEMPLATE = "{index}";
+
+ @Override
+ public boolean supportsTestTemplate(ExtensionContext context) {
+ return true;
+ }
+
+ @Override
+ public Stream<TestTemplateInvocationContext> provideTestTemplateInvocationContexts(
+ ExtensionContext context) {
+
+ // Search method annotated with @Parameters
+ final List<Method> parameterProviders =
+ AnnotationSupport.findAnnotatedMethods(
+ context.getRequiredTestClass(),
+ Parameters.class,
+ HierarchyTraversalMode.TOP_DOWN);
+ if (parameterProviders.isEmpty()) {
+ throw new IllegalStateException("Cannot find any parameter provider");
+ }
+ if (parameterProviders.size() > 1) {
+ throw new IllegalStateException("Multiple parameter providers are found");
+ }
+
+ Method parameterProvider = parameterProviders.get(0);
+ // Get potential test name
+ String testNameTemplate = parameterProvider.getAnnotation(Parameters.class).name();
+
+ // Get parameter values
+ final Object parameterValues;
+ try {
+ parameterValues = parameterProvider.invoke(null);
+ context.getStore(NAMESPACE).put(PARAMETERS_STORE_KEY, parameterValues);
+ } catch (Exception e) {
+ throw new IllegalStateException("Failed to invoke parameter provider", e);
+ }
+ assert parameterValues != null;
+
+ // Parameter values could be Object[][]
+ if (parameterValues instanceof Object[][]) {
+ Object[][] typedParameterValues = (Object[][]) parameterValues;
+ return createContextForParameters(
+ Arrays.stream(typedParameterValues), testNameTemplate, context);
+ }
+
+ // or a Collection
+ if (parameterValues instanceof Collection) {
+ final Collection<?> typedParameterValues = (Collection<?>) parameterValues;
+ final Stream<Object[]> parameterValueStream =
+ typedParameterValues.stream()
+ .map(
+ (Function<Object, Object[]>)
+ parameterValue -> {
+ if (parameterValue instanceof Object[]) {
+ return (Object[]) parameterValue;
+ } else {
+ return new Object[] {parameterValue};
+ }
+ });
+ return createContextForParameters(parameterValueStream, testNameTemplate, context);
+ }
+
+ throw new IllegalStateException(
+ String.format(
+ "Return type of @Parameters annotated method \"%s\" should be either Object[][] or Collection",
+ parameterProvider));
+ }
+
+ private static class FieldInjectingInvocationContext implements TestTemplateInvocationContext {
+
+ private final String testNameTemplate;
+ private final Object[] parameterValues;
+
+ public FieldInjectingInvocationContext(String testNameTemplate, Object[] parameterValues) {
+ this.testNameTemplate = testNameTemplate;
+ this.parameterValues = parameterValues;
+ }
+
+ @Override
+ public String getDisplayName(int invocationIndex) {
+ if (INDEX_TEMPLATE.equals(testNameTemplate)) {
+ return TestTemplateInvocationContext.super.getDisplayName(invocationIndex);
+ } else {
+ return MessageFormat.format(testNameTemplate, parameterValues);
+ }
+ }
+
+ @Override
+ public List<Extension> getAdditionalExtensions() {
+ return Collections.singletonList(new FieldInjectingHook(parameterValues));
+ }
+
+ private static class FieldInjectingHook implements BeforeEachCallback {
+
+ private final Object[] parameterValues;
+
+ public FieldInjectingHook(Object[] parameterValues) {
+ this.parameterValues = parameterValues;
+ }
+
+ @Override
+ public void beforeEach(ExtensionContext context) throws Exception {
+ for (int i = 0; i < parameterValues.length; i++) {
+ getParameterField(i, context)
+ .set(context.getRequiredTestInstance(), parameterValues[i]);
+ }
+ }
+ }
+ }
+
+ private static class ConstructorParameterResolverInvocationContext
+ implements TestTemplateInvocationContext {
+
+ private final String testNameTemplate;
+ private final Object[] parameterValues;
+
+ public ConstructorParameterResolverInvocationContext(
+ String testNameTemplate, Object[] parameterValues) {
+ this.testNameTemplate = testNameTemplate;
+ this.parameterValues = parameterValues;
+ }
+
+ @Override
+ public String getDisplayName(int invocationIndex) {
+ if (INDEX_TEMPLATE.equals(testNameTemplate)) {
+ return TestTemplateInvocationContext.super.getDisplayName(invocationIndex);
+ } else {
+ return MessageFormat.format(testNameTemplate, parameterValues);
+ }
+ }
+
+ @Override
+ public List<Extension> getAdditionalExtensions() {
+ return Collections.singletonList(new ConstructorParameterResolver(parameterValues));
+ }
+
+ private static class ConstructorParameterResolver implements ParameterResolver {
+
+ private final Object[] parameterValues;
+
+ public ConstructorParameterResolver(Object[] parameterValues) {
+ this.parameterValues = parameterValues;
+ }
+
+ @Override
+ public boolean supportsParameter(
+ ParameterContext parameterContext, ExtensionContext extensionContext)
+ throws ParameterResolutionException {
+ return true;
+ }
+
+ @Override
+ public Object resolveParameter(
+ ParameterContext parameterContext, ExtensionContext extensionContext)
+ throws ParameterResolutionException {
+ return parameterValues[parameterContext.getIndex()];
+ }
+ }
+ }
+
+ // -------------------------------- Helper functions -------------------------------------------
+
+ private Stream<TestTemplateInvocationContext> createContextForParameters(
+ Stream<Object[]> parameterValueStream,
+ String testNameTemplate,
+ ExtensionContext context) {
+ // Search fields annotated by @Parameter
+ final List<Field> parameterFields =
+ AnnotationSupport.findAnnotatedFields(
+ context.getRequiredTestClass(), Parameter.class);
+
+ // Use constructor parameter style
+ if (parameterFields.isEmpty()) {
+ return parameterValueStream.map(
+ parameterValue ->
+ new ConstructorParameterResolverInvocationContext(
+ testNameTemplate, parameterValue));
+ }
+
+ // Use field injection style
+ for (Field parameterField : parameterFields) {
+ final int index = parameterField.getAnnotation(Parameter.class).value();
+ context.getStore(NAMESPACE).put(getParameterFieldStoreKey(index), parameterField);
+ }
+ return parameterValueStream.map(
+ parameterValue ->
+ new FieldInjectingInvocationContext(testNameTemplate, parameterValue));
+ }
+
+ private static String getParameterFieldStoreKey(int parameterIndex) {
+ return PARAMETER_FIELD_STORE_KEY_PREFIX + parameterIndex;
+ }
+
+ private static Field getParameterField(int parameterIndex, ExtensionContext context) {
+ return (Field) context.getStore(NAMESPACE).get(getParameterFieldStoreKey(parameterIndex));
+ }
+}
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/parameterized/Parameters.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/parameterized/Parameters.java
new file mode 100644
index 00000000000..c2ddee14db0
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/parameterized/Parameters.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.junit.extensions.parameterized;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * The annotation is used to replace Parameterized.Parameters(Junit4) for Junit 5 parameterized
+ * tests.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface Parameters {
+ String name() default "{index}";
+}
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/utils/TempDirUtils.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/utils/TempDirUtils.java
new file mode 100644
index 00000000000..4410d45048a
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/utils/TempDirUtils.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.junit.utils;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+/** The utils contains some methods same as org.junit.rules.TemporaryFolder in Junit4. */
+public class TempDirUtils {
+ private static final String TMP_PREFIX = "junit";
+
+ public static File newFolder(Path path) throws IOException {
+ Path tempPath;
+ if (path != null) {
+ tempPath = Files.createTempDirectory(path, TMP_PREFIX);
+ } else {
+ tempPath = Files.createTempDirectory(TMP_PREFIX);
+ }
+ return tempPath.toFile();
+ }
+
+ public static File newFile(Path path) throws IOException {
+ return File.createTempFile(TMP_PREFIX, null, path.toFile());
+ }
+
+ public static File newFolder(Path base, String... paths) throws IOException {
+ if (paths.length == 0) {
+ throw new IllegalArgumentException("must pass at least one path");
+ }
+
+ /*
+ * Before checking if the paths are absolute paths, check if create() was ever called,
+ * and if it wasn't, throw IllegalStateException.
+ */
+ File root = base.toFile();
+ for (String path : paths) {
+ if (new File(path).isAbsolute()) {
+ throw new IOException(
+ String.format("folder path '%s' is not a relative path", path));
+ }
+ }
+
+ File relativePath = null;
+ File file = root;
+ boolean lastMkdirsCallSuccessful = true;
+ for (String path : paths) {
+ relativePath = new File(relativePath, path);
+ file = new File(root, relativePath.getPath());
+
+ lastMkdirsCallSuccessful = file.mkdirs();
+ if (!lastMkdirsCallSuccessful && !file.isDirectory()) {
+ if (file.exists()) {
+ throw new IOException(
+ String.format(
+ "a file with the path '%s' exists", relativePath.getPath()));
+ } else {
+ throw new IOException(
+ String.format(
+ "could not create a folder with the path: '%s'",
+ relativePath.getPath()));
+ }
+ }
+ }
+ if (!lastMkdirsCallSuccessful) {
+ throw new IOException(
+ String.format(
+ "a folder with the path '%s' already exists", relativePath.getPath()));
+ }
+ return file;
+ }
+
+ public static File newFile(Path folder, String fileName) throws IOException {
+ File file = new File(folder.toFile(), fileName);
+ if (!file.createNewFile()) {
+ throw new IOException(
+ String.format(
+ "a file with the name '%s' already exists in the test folder",
+ fileName));
+ }
+ return file;
+ }
+}