You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ta...@apache.org on 2022/07/11 11:46:24 UTC

[flink] branch master updated: [FLINK-25315][tests] Introduce extensions and utils to help the Junit5 migration

This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3bd3a0258b7 [FLINK-25315][tests] Introduce extensions and utils to help the Junit5 migration
3bd3a0258b7 is described below

commit 3bd3a0258b74d0d59a4161ae85ae0e81adb8b5b9
Author: yuchengxin <ca...@qq.com>
AuthorDate: Wed Dec 15 14:05:05 2021 +0800

    [FLINK-25315][tests] Introduce extensions and utils to help the Junit5 migration
    
    Co-authored-by: Hang Ruan <ru...@hotmail.com>
    
    This closes #20145.
---
 .../program/PackagedProgramUtilsPipelineTest.java  |  35 +--
 .../table/runtime/hashtable/LongHashTableTest.java |  58 ++---
 .../junit/extensions/parameterized/Parameter.java  |  36 +++
 .../parameterized/ParameterizedTestExtension.java  | 251 +++++++++++++++++++++
 .../junit/extensions/parameterized/Parameters.java |  34 +++
 .../flink/testutils/junit/utils/TempDirUtils.java  | 100 ++++++++
 6 files changed, 472 insertions(+), 42 deletions(-)

diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramUtilsPipelineTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramUtilsPipelineTest.java
index 8e356b202e2..ecc0792c1ae 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramUtilsPipelineTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramUtilsPipelineTest.java
@@ -28,12 +28,14 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.testutils.ClassLoaderUtils;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
 
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
 import java.net.URL;
@@ -51,14 +53,14 @@ import static org.junit.Assert.assertThat;
  * Tests for {@link PackagedProgramUtils} methods that should be executed for {@link
  * StreamExecutionEnvironment} and {@link Environment}.
  */
-@RunWith(Parameterized.class)
+@ExtendWith(ParameterizedTestExtension.class)
 public class PackagedProgramUtilsPipelineTest {
 
-    @Parameterized.Parameter public TestParameter testParameter;
+    @Parameter public TestParameter testParameter;
 
-    @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+    @TempDir private java.nio.file.Path temporaryFolder;
 
-    @Parameterized.Parameters
+    @Parameters(name = "testParameter-{0}")
     public static Collection<TestParameter> parameters() {
         return Arrays.asList(
                 TestParameter.of(
@@ -73,8 +75,8 @@ public class PackagedProgramUtilsPipelineTest {
      * This tests whether configuration forwarding from a {@link Configuration} to the environment
      * works.
      */
-    @Test
-    public void testConfigurationForwarding() throws Exception {
+    @TestTemplate
+    void testConfigurationForwarding() throws Exception {
         // we want to test forwarding with this config, ensure that the default is what we expect.
         assertThat(
                 ExecutionEnvironment.getExecutionEnvironment()
@@ -100,8 +102,8 @@ public class PackagedProgramUtilsPipelineTest {
         assertThat(executionConfig.isAutoTypeRegistrationDisabled(), is(true));
     }
 
-    @Test
-    public void testUserClassloaderForConfiguration() throws Exception {
+    @TestTemplate
+    void testUserClassloaderForConfiguration() throws Exception {
         String userSerializerClassName = "UserSerializer";
         List<URL> userUrls = getClassUrls(userSerializerClassName);
 
@@ -137,7 +139,7 @@ public class PackagedProgramUtilsPipelineTest {
     private List<URL> getClassUrls(String className) throws IOException {
         URLClassLoader urlClassLoader =
                 ClassLoaderUtils.compileAndLoadJava(
-                        temporaryFolder.newFolder(),
+                        TempDirUtils.newFolder(temporaryFolder),
                         className + ".java",
                         "import com.esotericsoftware.kryo.Kryo;\n"
                                 + "import com.esotericsoftware.kryo.Serializer;\n"
@@ -178,6 +180,11 @@ public class PackagedProgramUtilsPipelineTest {
                 public ExecutionConfig extractExecutionConfig(Pipeline pipeline) {
                     return executionConfigExtractor.apply(pipeline);
                 }
+
+                @Override
+                public String toString() {
+                    return entryClass.getSimpleName();
+                }
             };
         }
     }
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/hashtable/LongHashTableTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/hashtable/LongHashTableTest.java
index a36ccbd7d0f..4ae6506ec50 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/hashtable/LongHashTableTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/hashtable/LongHashTableTest.java
@@ -33,12 +33,13 @@ import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
 import org.apache.flink.table.runtime.util.RowIterator;
 import org.apache.flink.table.runtime.util.UniformBinaryRowGenerator;
+import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 import org.apache.flink.util.MutableObjectIterator;
 
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -48,10 +49,10 @@ import java.util.List;
 import java.util.Map;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.fail;
+import static org.assertj.core.api.Fail.fail;
 
 /** Test for {@link LongHashPartition}. */
-@RunWith(Parameterized.class)
+@ExtendWith(ParameterizedTestExtension.class)
 public class LongHashTableTest {
 
     private static final int PAGE_SIZE = 32 * 1024;
@@ -68,12 +69,12 @@ public class LongHashTableTest {
         this.useCompress = useCompress;
     }
 
-    @Parameterized.Parameters(name = "useCompress-{0}")
+    @Parameters(name = "useCompress-{0}")
     public static List<Boolean> getVarSeg() {
         return Arrays.asList(true, false);
     }
 
-    @Before
+    @BeforeEach
     public void init() {
         TypeInformation[] types = new TypeInformation[] {Types.INT, Types.INT};
         this.buildSideSerializer = new BinaryRowDataSerializer(types.length);
@@ -115,8 +116,8 @@ public class LongHashTableTest {
         }
     }
 
-    @Test
-    public void testInMemory() throws IOException {
+    @TestTemplate
+    void testInMemory() throws IOException {
         final int numKeys = 100000;
         final int buildValsPerKey = 3;
         final int probeValsPerKey = 10;
@@ -132,6 +133,7 @@ public class LongHashTableTest {
         final MyHashTable table = new MyHashTable(500 * PAGE_SIZE);
 
         int numRecordsInJoinResult = join(table, buildInput, probeInput);
+
         assertThat(numRecordsInJoinResult)
                 .as("Wrong number of records in join result.")
                 .isEqualTo(numKeys * buildValsPerKey * probeValsPerKey);
@@ -141,8 +143,8 @@ public class LongHashTableTest {
         table.free();
     }
 
-    @Test
-    public void testSpillingHashJoinOneRecursion() throws IOException {
+    @TestTemplate
+    void testSpillingHashJoinOneRecursion() throws IOException {
         final int numKeys = 100000;
         final int buildValsPerKey = 3;
         final int probeValsPerKey = 10;
@@ -171,8 +173,8 @@ public class LongHashTableTest {
     }
 
     /** Non partition in memory in level 0. */
-    @Test
-    public void testSpillingHashJoinOneRecursionPerformance() throws IOException {
+    @TestTemplate
+    void testSpillingHashJoinOneRecursionPerformance() throws IOException {
         final int numKeys = 1000000;
         final int buildValsPerKey = 3;
         final int probeValsPerKey = 10;
@@ -200,8 +202,8 @@ public class LongHashTableTest {
         table.free();
     }
 
-    @Test
-    public void testSpillingHashJoinOneRecursionValidity() throws IOException {
+    @TestTemplate
+    void testSpillingHashJoinOneRecursionValidity() throws IOException {
         final int numKeys = 1000000;
         final int buildValsPerKey = 3;
         final int probeValsPerKey = 10;
@@ -254,8 +256,8 @@ public class LongHashTableTest {
         table.free();
     }
 
-    @Test
-    public void testSpillingHashJoinWithMassiveCollisions() throws IOException {
+    @TestTemplate
+    void testSpillingHashJoinWithMassiveCollisions() throws IOException {
         // the following two values are known to have a hash-code collision on the initial level.
         // we use them to make sure one partition grows over-proportionally large
         final int repeatedValue1 = 40559;
@@ -339,8 +341,8 @@ public class LongHashTableTest {
         table.free();
     }
 
-    @Test
-    public void testSpillingHashJoinWithTwoRecursions() throws IOException {
+    @TestTemplate
+    void testSpillingHashJoinWithTwoRecursions() throws IOException {
         // the following two values are known to have a hash-code collision on the first recursion
         // level.
         // we use them to make sure one partition grows over-proportionally large
@@ -430,8 +432,8 @@ public class LongHashTableTest {
      * of repeated values (causing bucket collisions) are large enough to make sure that their target partition no longer
      * fits into memory by itself and needs to be repartitioned in the recursion again.
      */
-    @Test
-    public void testFailingHashJoinTooManyRecursions() throws IOException {
+    @TestTemplate
+    void testFailingHashJoinTooManyRecursions() throws IOException {
         // the following two values are known to have a hash-code collision on the first recursion
         // level.
         // we use them to make sure one partition grows over-proportionally large
@@ -489,8 +491,8 @@ public class LongHashTableTest {
         table.free();
     }
 
-    @Test
-    public void testSparseProbeSpilling() throws IOException, MemoryAllocationException {
+    @TestTemplate
+    void testSparseProbeSpilling() throws IOException, MemoryAllocationException {
         final int numBuildKeys = 1000000;
         final int numBuildVals = 1;
         final int numProbeKeys = 20;
@@ -518,8 +520,8 @@ public class LongHashTableTest {
         table.free();
     }
 
-    @Test
-    public void validateSpillingDuringInsertion() throws IOException, MemoryAllocationException {
+    @TestTemplate
+    void validateSpillingDuringInsertion() throws IOException, MemoryAllocationException {
         final int numBuildKeys = 500000;
         final int numBuildVals = 1;
         final int numProbeKeys = 10;
@@ -547,8 +549,8 @@ public class LongHashTableTest {
         table.free();
     }
 
-    @Test
-    public void testBucketsNotFulfillSegment() throws Exception {
+    @TestTemplate
+    void testBucketsNotFulfillSegment() throws Exception {
         final int numKeys = 10000;
         final int buildValsPerKey = 3;
         final int probeValsPerKey = 10;
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/parameterized/Parameter.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/parameterized/Parameter.java
new file mode 100644
index 00000000000..1cfbf69e89b
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/parameterized/Parameter.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.junit.extensions.parameterized;
+
+import org.junit.runners.Parameterized;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * The annotation is used to replace {@link Parameterized.Parameter} for Junit 5 parameterized
+ * tests.
+ */
+@Target(ElementType.FIELD)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface Parameter {
+    int value() default 0;
+}
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/parameterized/ParameterizedTestExtension.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/parameterized/ParameterizedTestExtension.java
new file mode 100644
index 00000000000..f32f52041c7
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/parameterized/ParameterizedTestExtension.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.junit.extensions.parameterized;
+
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.Extension;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.ParameterContext;
+import org.junit.jupiter.api.extension.ParameterResolutionException;
+import org.junit.jupiter.api.extension.ParameterResolver;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContextProvider;
+import org.junit.platform.commons.support.AnnotationSupport;
+import org.junit.platform.commons.support.HierarchyTraversalMode;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.text.MessageFormat;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+/**
+ * This extension is used to implement parameterized tests for Junit 5 to replace Parameterized in
+ * Junit4.
+ *
+ * <p>When use this extension, all tests must be annotated by {@link TestTemplate}.
+ */
+public class ParameterizedTestExtension implements TestTemplateInvocationContextProvider {
+
+    private static final ExtensionContext.Namespace NAMESPACE =
+            ExtensionContext.Namespace.create("parameterized");
+    private static final String PARAMETERS_STORE_KEY = "parameters";
+    private static final String PARAMETER_FIELD_STORE_KEY_PREFIX = "parameterField_";
+    private static final String INDEX_TEMPLATE = "{index}";
+
+    @Override
+    public boolean supportsTestTemplate(ExtensionContext context) {
+        return true;
+    }
+
+    @Override
+    public Stream<TestTemplateInvocationContext> provideTestTemplateInvocationContexts(
+            ExtensionContext context) {
+
+        // Search method annotated with @Parameters
+        final List<Method> parameterProviders =
+                AnnotationSupport.findAnnotatedMethods(
+                        context.getRequiredTestClass(),
+                        Parameters.class,
+                        HierarchyTraversalMode.TOP_DOWN);
+        if (parameterProviders.isEmpty()) {
+            throw new IllegalStateException("Cannot find any parameter provider");
+        }
+        if (parameterProviders.size() > 1) {
+            throw new IllegalStateException("Multiple parameter providers are found");
+        }
+
+        Method parameterProvider = parameterProviders.get(0);
+        // Get potential test name
+        String testNameTemplate = parameterProvider.getAnnotation(Parameters.class).name();
+
+        // Get parameter values
+        final Object parameterValues;
+        try {
+            parameterValues = parameterProvider.invoke(null);
+            context.getStore(NAMESPACE).put(PARAMETERS_STORE_KEY, parameterValues);
+        } catch (Exception e) {
+            throw new IllegalStateException("Failed to invoke parameter provider", e);
+        }
+        assert parameterValues != null;
+
+        // Parameter values could be Object[][]
+        if (parameterValues instanceof Object[][]) {
+            Object[][] typedParameterValues = (Object[][]) parameterValues;
+            return createContextForParameters(
+                    Arrays.stream(typedParameterValues), testNameTemplate, context);
+        }
+
+        // or a Collection
+        if (parameterValues instanceof Collection) {
+            final Collection<?> typedParameterValues = (Collection<?>) parameterValues;
+            final Stream<Object[]> parameterValueStream =
+                    typedParameterValues.stream()
+                            .map(
+                                    (Function<Object, Object[]>)
+                                            parameterValue -> {
+                                                if (parameterValue instanceof Object[]) {
+                                                    return (Object[]) parameterValue;
+                                                } else {
+                                                    return new Object[] {parameterValue};
+                                                }
+                                            });
+            return createContextForParameters(parameterValueStream, testNameTemplate, context);
+        }
+
+        throw new IllegalStateException(
+                String.format(
+                        "Return type of @Parameters annotated method \"%s\" should be either Object[][] or Collection",
+                        parameterProvider));
+    }
+
+    private static class FieldInjectingInvocationContext implements TestTemplateInvocationContext {
+
+        private final String testNameTemplate;
+        private final Object[] parameterValues;
+
+        public FieldInjectingInvocationContext(String testNameTemplate, Object[] parameterValues) {
+            this.testNameTemplate = testNameTemplate;
+            this.parameterValues = parameterValues;
+        }
+
+        @Override
+        public String getDisplayName(int invocationIndex) {
+            if (INDEX_TEMPLATE.equals(testNameTemplate)) {
+                return TestTemplateInvocationContext.super.getDisplayName(invocationIndex);
+            } else {
+                return MessageFormat.format(testNameTemplate, parameterValues);
+            }
+        }
+
+        @Override
+        public List<Extension> getAdditionalExtensions() {
+            return Collections.singletonList(new FieldInjectingHook(parameterValues));
+        }
+
+        private static class FieldInjectingHook implements BeforeEachCallback {
+
+            private final Object[] parameterValues;
+
+            public FieldInjectingHook(Object[] parameterValues) {
+                this.parameterValues = parameterValues;
+            }
+
+            @Override
+            public void beforeEach(ExtensionContext context) throws Exception {
+                for (int i = 0; i < parameterValues.length; i++) {
+                    getParameterField(i, context)
+                            .set(context.getRequiredTestInstance(), parameterValues[i]);
+                }
+            }
+        }
+    }
+
+    private static class ConstructorParameterResolverInvocationContext
+            implements TestTemplateInvocationContext {
+
+        private final String testNameTemplate;
+        private final Object[] parameterValues;
+
+        public ConstructorParameterResolverInvocationContext(
+                String testNameTemplate, Object[] parameterValues) {
+            this.testNameTemplate = testNameTemplate;
+            this.parameterValues = parameterValues;
+        }
+
+        @Override
+        public String getDisplayName(int invocationIndex) {
+            if (INDEX_TEMPLATE.equals(testNameTemplate)) {
+                return TestTemplateInvocationContext.super.getDisplayName(invocationIndex);
+            } else {
+                return MessageFormat.format(testNameTemplate, parameterValues);
+            }
+        }
+
+        @Override
+        public List<Extension> getAdditionalExtensions() {
+            return Collections.singletonList(new ConstructorParameterResolver(parameterValues));
+        }
+
+        private static class ConstructorParameterResolver implements ParameterResolver {
+
+            private final Object[] parameterValues;
+
+            public ConstructorParameterResolver(Object[] parameterValues) {
+                this.parameterValues = parameterValues;
+            }
+
+            @Override
+            public boolean supportsParameter(
+                    ParameterContext parameterContext, ExtensionContext extensionContext)
+                    throws ParameterResolutionException {
+                return true;
+            }
+
+            @Override
+            public Object resolveParameter(
+                    ParameterContext parameterContext, ExtensionContext extensionContext)
+                    throws ParameterResolutionException {
+                return parameterValues[parameterContext.getIndex()];
+            }
+        }
+    }
+
+    // -------------------------------- Helper functions -------------------------------------------
+
+    private Stream<TestTemplateInvocationContext> createContextForParameters(
+            Stream<Object[]> parameterValueStream,
+            String testNameTemplate,
+            ExtensionContext context) {
+        // Search fields annotated by @Parameter
+        final List<Field> parameterFields =
+                AnnotationSupport.findAnnotatedFields(
+                        context.getRequiredTestClass(), Parameter.class);
+
+        // Use constructor parameter style
+        if (parameterFields.isEmpty()) {
+            return parameterValueStream.map(
+                    parameterValue ->
+                            new ConstructorParameterResolverInvocationContext(
+                                    testNameTemplate, parameterValue));
+        }
+
+        // Use field injection style
+        for (Field parameterField : parameterFields) {
+            final int index = parameterField.getAnnotation(Parameter.class).value();
+            context.getStore(NAMESPACE).put(getParameterFieldStoreKey(index), parameterField);
+        }
+        return parameterValueStream.map(
+                parameterValue ->
+                        new FieldInjectingInvocationContext(testNameTemplate, parameterValue));
+    }
+
+    private static String getParameterFieldStoreKey(int parameterIndex) {
+        return PARAMETER_FIELD_STORE_KEY_PREFIX + parameterIndex;
+    }
+
+    private static Field getParameterField(int parameterIndex, ExtensionContext context) {
+        return (Field) context.getStore(NAMESPACE).get(getParameterFieldStoreKey(parameterIndex));
+    }
+}
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/parameterized/Parameters.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/parameterized/Parameters.java
new file mode 100644
index 00000000000..c2ddee14db0
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/extensions/parameterized/Parameters.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.junit.extensions.parameterized;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * The annotation is used to replace Parameterized.Parameters(Junit4) for Junit 5 parameterized
+ * tests.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface Parameters {
+    String name() default "{index}";
+}
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/utils/TempDirUtils.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/utils/TempDirUtils.java
new file mode 100644
index 00000000000..4410d45048a
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/utils/TempDirUtils.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.junit.utils;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+/** The utils contains some methods same as org.junit.rules.TemporaryFolder in Junit4. */
+public class TempDirUtils {
+    private static final String TMP_PREFIX = "junit";
+
+    public static File newFolder(Path path) throws IOException {
+        Path tempPath;
+        if (path != null) {
+            tempPath = Files.createTempDirectory(path, TMP_PREFIX);
+        } else {
+            tempPath = Files.createTempDirectory(TMP_PREFIX);
+        }
+        return tempPath.toFile();
+    }
+
+    public static File newFile(Path path) throws IOException {
+        return File.createTempFile(TMP_PREFIX, null, path.toFile());
+    }
+
+    public static File newFolder(Path base, String... paths) throws IOException {
+        if (paths.length == 0) {
+            throw new IllegalArgumentException("must pass at least one path");
+        }
+
+        /*
+         * Before checking if the paths are absolute paths, check if create() was ever called,
+         * and if it wasn't, throw IllegalStateException.
+         */
+        File root = base.toFile();
+        for (String path : paths) {
+            if (new File(path).isAbsolute()) {
+                throw new IOException(
+                        String.format("folder path '%s' is not a relative path", path));
+            }
+        }
+
+        File relativePath = null;
+        File file = root;
+        boolean lastMkdirsCallSuccessful = true;
+        for (String path : paths) {
+            relativePath = new File(relativePath, path);
+            file = new File(root, relativePath.getPath());
+
+            lastMkdirsCallSuccessful = file.mkdirs();
+            if (!lastMkdirsCallSuccessful && !file.isDirectory()) {
+                if (file.exists()) {
+                    throw new IOException(
+                            String.format(
+                                    "a file with the path '%s' exists", relativePath.getPath()));
+                } else {
+                    throw new IOException(
+                            String.format(
+                                    "could not create a folder with the path: '%s'",
+                                    relativePath.getPath()));
+                }
+            }
+        }
+        if (!lastMkdirsCallSuccessful) {
+            throw new IOException(
+                    String.format(
+                            "a folder with the path '%s' already exists", relativePath.getPath()));
+        }
+        return file;
+    }
+
+    public static File newFile(Path folder, String fileName) throws IOException {
+        File file = new File(folder.toFile(), fileName);
+        if (!file.createNewFile()) {
+            throw new IOException(
+                    String.format(
+                            "a file with the name '%s' already exists in the test folder",
+                            fileName));
+        }
+        return file;
+    }
+}