You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/11/04 16:27:57 UTC

[flink] branch master updated: [FLINK-28449][tests][JUnit5 migration] flink-parquet (#20230)

This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new db3244a6ac2 [FLINK-28449][tests][JUnit5 migration] flink-parquet (#20230)
db3244a6ac2 is described below

commit db3244a6ac2b06cebae9be26d7bcac506cc69093
Author: Ryan Skraba <rs...@apache.org>
AuthorDate: Fri Nov 4 11:27:41 2022 -0500

    [FLINK-28449][tests][JUnit5 migration] flink-parquet (#20230)
---
 .../formats/compress/CompressionFactoryITCase.java |   2 -
 .../architecture/TestCodeArchitectureTest.java     |   2 +-
 .../parquet/ParquetColumnarRowInputFormatTest.java |  96 ++++++++++----------
 .../formats/parquet/ParquetFileSystemITCase.java   |   8 +-
 .../parquet/ParquetFormatStatisticsReportTest.java |   4 +-
 .../parquet/avro/AvroParquetFileReadITCase.java    |  39 +++++---
 .../avro/AvroParquetStreamingFileSinkITCase.java   |  28 ++----
 .../ParquetProtoStreamingFileSinkITCase.java       |  18 ++--
 .../parquet/row/ParquetRowDataWriterTest.java      |  39 ++++----
 .../utils/SerializableConfigurationTest.java       |   6 +-
 .../vector/ParquetColumnarRowSplitReaderTest.java  | 100 ++++++++++-----------
 .../org.junit.jupiter.api.extension.Extension      |  16 ++++
 12 files changed, 183 insertions(+), 175 deletions(-)

diff --git a/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CompressionFactoryITCase.java b/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CompressionFactoryITCase.java
index 66465b70949..c2d08112f67 100644
--- a/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CompressionFactoryITCase.java
+++ b/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CompressionFactoryITCase.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.io.TempDir;
 
@@ -60,7 +59,6 @@ class CompressionFactoryITCase {
     private final List<String> testData = Arrays.asList("line1", "line2", "line3");
 
     @Test
-    @Timeout(20)
     void testWriteCompressedFile(@TempDir java.nio.file.Path tmpDir) throws Exception {
         final File folder = tmpDir.toFile();
         final Path testPath = Path.fromLocalFile(folder);
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java
index 2e94cb407aa..128b218500d 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java
@@ -33,7 +33,7 @@ import com.tngtech.archunit.junit.ArchTests;
             ImportOptions.ExcludeScalaImportOption.class,
             ImportOptions.ExcludeShadedImportOption.class
         })
-public class TestCodeArchitectureTest {
+class TestCodeArchitectureTest {
 
     @ArchTest
     public static final ArchTests COMMON_TESTS = ArchTests.in(TestCodeArchitectureTestBase.class);
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetColumnarRowInputFormatTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetColumnarRowInputFormatTest.java
index e35aff57eb7..fa654052901 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetColumnarRowInputFormatTest.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetColumnarRowInputFormatTest.java
@@ -54,11 +54,9 @@ import org.apache.flink.table.utils.DateTimeUtils;
 import org.apache.flink.util.InstantiationUtil;
 
 import org.apache.hadoop.conf.Configuration;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.File;
 import java.io.IOException;
@@ -84,8 +82,7 @@ import static org.apache.flink.table.utils.PartitionPathUtils.generatePartitionP
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link ParquetColumnarRowInputFormat}. */
-@RunWith(Parameterized.class)
-public class ParquetColumnarRowInputFormatTest {
+class ParquetColumnarRowInputFormatTest {
 
     private static final LocalDateTime BASE_TIME = LocalDateTime.now();
     private static final org.apache.flink.configuration.Configuration EMPTY_CONF =
@@ -129,21 +126,15 @@ public class ParquetColumnarRowInputFormatTest {
                     new MapType(new IntType(), new BooleanType()),
                     RowType.of(new VarCharType(VarCharType.MAX_LENGTH), new IntType()));
 
-    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+    @TempDir private File folder;
 
-    private final int rowGroupSize;
-
-    @Parameterized.Parameters(name = "rowGroupSize-{0}")
     public static Collection<Integer> parameters() {
         return Arrays.asList(10, 1000);
     }
 
-    public ParquetColumnarRowInputFormatTest(int rowGroupSize) {
-        this.rowGroupSize = rowGroupSize;
-    }
-
-    @Test
-    public void testTypesReadWithSplits() throws IOException {
+    @ParameterizedTest
+    @MethodSource("parameters")
+    void testTypesReadWithSplits(int rowGroupSize) throws IOException {
         int number = 10000;
         List<Integer> values = new ArrayList<>(number);
         Random random = new Random();
@@ -152,11 +143,12 @@ public class ParquetColumnarRowInputFormatTest {
             values.add(v % 10 == 0 ? null : v);
         }
 
-        innerTestTypes(values);
+        innerTestTypes(folder, values, rowGroupSize);
     }
 
-    @Test
-    public void testDictionary() throws IOException {
+    @ParameterizedTest
+    @MethodSource("parameters")
+    void testDictionary(int rowGroupSize) throws IOException {
         int number = 10000;
         List<Integer> values = new ArrayList<>(number);
         Random random = new Random();
@@ -170,11 +162,12 @@ public class ParquetColumnarRowInputFormatTest {
             values.add(v == 0 ? null : v);
         }
 
-        innerTestTypes(values);
+        innerTestTypes(folder, values, rowGroupSize);
     }
 
-    @Test
-    public void testPartialDictionary() throws IOException {
+    @ParameterizedTest
+    @MethodSource("parameters")
+    void testPartialDictionary(int rowGroupSize) throws IOException {
         // prepare parquet file
         int number = 10000;
         List<Integer> values = new ArrayList<>(number);
@@ -189,11 +182,12 @@ public class ParquetColumnarRowInputFormatTest {
             values.add(v == 0 ? null : v);
         }
 
-        innerTestTypes(values);
+        innerTestTypes(folder, values, rowGroupSize);
     }
 
-    @Test
-    public void testContinuousRepetition() throws IOException {
+    @ParameterizedTest
+    @MethodSource("parameters")
+    void testContinuousRepetition(int rowGroupSize) throws IOException {
         int number = 10000;
         List<Integer> values = new ArrayList<>(number);
         Random random = new Random();
@@ -204,11 +198,12 @@ public class ParquetColumnarRowInputFormatTest {
             }
         }
 
-        innerTestTypes(values);
+        innerTestTypes(folder, values, rowGroupSize);
     }
 
-    @Test
-    public void testLargeValue() throws IOException {
+    @ParameterizedTest
+    @MethodSource("parameters")
+    void testLargeValue(int rowGroupSize) throws IOException {
         int number = 10000;
         List<Integer> values = new ArrayList<>(number);
         Random random = new Random();
@@ -217,11 +212,12 @@ public class ParquetColumnarRowInputFormatTest {
             values.add(v % 10 == 0 ? null : v);
         }
 
-        innerTestTypes(values);
+        innerTestTypes(folder, values, rowGroupSize);
     }
 
-    @Test
-    public void testProjection() throws IOException {
+    @ParameterizedTest
+    @MethodSource("parameters")
+    void testProjection(int rowGroupSize) throws IOException {
         int number = 1000;
         List<RowData> records = new ArrayList<>(number);
         for (int i = 0; i < number; i++) {
@@ -229,7 +225,7 @@ public class ParquetColumnarRowInputFormatTest {
             records.add(newRow(v));
         }
 
-        Path testPath = createTempParquetFile(TEMPORARY_FOLDER.newFolder(), records, rowGroupSize);
+        Path testPath = createTempParquetFile(folder, records, rowGroupSize);
         // test reader
         LogicalType[] fieldTypes =
                 new LogicalType[] {new DoubleType(), new TinyIntType(), new IntType()};
@@ -256,8 +252,9 @@ public class ParquetColumnarRowInputFormatTest {
                 });
     }
 
-    @Test
-    public void testProjectionReadUnknownField() throws IOException {
+    @ParameterizedTest
+    @MethodSource("parameters")
+    void testProjectionReadUnknownField(int rowGroupSize) throws IOException {
         int number = 1000;
         List<RowData> records = new ArrayList<>(number);
         for (int i = 0; i < number; i++) {
@@ -265,7 +262,7 @@ public class ParquetColumnarRowInputFormatTest {
             records.add(newRow(v));
         }
 
-        Path testPath = createTempParquetFile(TEMPORARY_FOLDER.newFolder(), records, rowGroupSize);
+        Path testPath = createTempParquetFile(folder, records, rowGroupSize);
 
         // test reader
         LogicalType[] fieldTypes =
@@ -297,8 +294,9 @@ public class ParquetColumnarRowInputFormatTest {
                 });
     }
 
-    @Test
-    public void testPartitionValues() throws IOException {
+    @ParameterizedTest
+    @MethodSource("parameters")
+    void testPartitionValues(int rowGroupSize) throws IOException {
         // prepare parquet file
         int number = 1000;
         List<RowData> records = new ArrayList<>(number);
@@ -307,8 +305,6 @@ public class ParquetColumnarRowInputFormatTest {
             records.add(newRow(v));
         }
 
-        File root = TEMPORARY_FOLDER.newFolder();
-
         List<String> partitionKeys =
                 Arrays.asList(
                         "f33", "f34", "f35", "f36", "f37", "f38", "f39", "f40", "f41", "f42", "f43",
@@ -332,7 +328,7 @@ public class ParquetColumnarRowInputFormatTest {
         partSpec.put("f45", "f45");
 
         String partPath = generatePartitionPath(partSpec);
-        Path testPath = createTempParquetFile(new File(root, partPath), records, rowGroupSize);
+        Path testPath = createTempParquetFile(new File(folder, partPath), records, rowGroupSize);
 
         innerTestPartitionValues(testPath, partitionKeys, false);
 
@@ -343,14 +339,15 @@ public class ParquetColumnarRowInputFormatTest {
         }
 
         partPath = generatePartitionPath(partSpec);
-        testPath = createTempParquetFile(new File(root, partPath), records, rowGroupSize);
+        testPath = createTempParquetFile(new File(folder, partPath), records, rowGroupSize);
 
         innerTestPartitionValues(testPath, partitionKeys, true);
     }
 
-    private void innerTestTypes(List<Integer> records) throws IOException {
+    private void innerTestTypes(File folder, List<Integer> records, int rowGroupSize)
+            throws IOException {
         List<RowData> rows = records.stream().map(this::newRow).collect(Collectors.toList());
-        Path testPath = createTempParquetFile(TEMPORARY_FOLDER.newFolder(), rows, rowGroupSize);
+        Path testPath = createTempParquetFile(folder, rows, rowGroupSize);
 
         // test reading and splitting
         long fileLen = testPath.getFileSystem().getFileStatus(testPath).getLen();
@@ -475,7 +472,7 @@ public class ParquetColumnarRowInputFormatTest {
                         assertThat(row.isNullAt(31)).isTrue();
                         assertThat(row.isNullAt(32)).isTrue();
                     } else {
-                        assertThat(row.getString(0).toString()).isEqualTo("" + v);
+                        assertThat(row.getString(0)).hasToString("" + v);
                         assertThat(row.getBoolean(1)).isEqualTo(v % 2 == 0);
                         assertThat(row.getByte(2)).isEqualTo(v.byteValue());
                         assertThat(row.getShort(3)).isEqualTo(v.shortValue());
@@ -516,7 +513,7 @@ public class ParquetColumnarRowInputFormatTest {
                                 .isEqualTo(BigDecimal.valueOf(v));
                         assertThat(row.getDecimal(14, 20, 0).toBigDecimal())
                                 .isEqualTo(BigDecimal.valueOf(v));
-                        assertThat(row.getArray(15).getString(0).toString()).isEqualTo("" + v);
+                        assertThat(row.getArray(15).getString(0)).hasToString("" + v);
                         assertThat(row.getArray(16).getBoolean(0)).isEqualTo(v % 2 == 0);
                         assertThat(row.getArray(17).getByte(0)).isEqualTo(v.byteValue());
                         assertThat(row.getArray(18).getShort(0)).isEqualTo(v.shortValue());
@@ -539,10 +536,9 @@ public class ParquetColumnarRowInputFormatTest {
                         assertThat(row.getArray(29).getDecimal(0, 20, 0))
                                 .isEqualTo(
                                         DecimalData.fromBigDecimal(BigDecimal.valueOf(v), 20, 0));
-                        assertThat(row.getMap(30).valueArray().getString(0).toString())
-                                .isEqualTo("" + v);
+                        assertThat(row.getMap(30).valueArray().getString(0)).hasToString("" + v);
                         assertThat(row.getMap(31).valueArray().getBoolean(0)).isEqualTo(v % 2 == 0);
-                        assertThat(row.getRow(32, 2).getString(0).toString()).isEqualTo("" + v);
+                        assertThat(row.getRow(32, 2).getString(0)).hasToString("" + v);
                         assertThat(row.getRow(32, 2).getInt(1)).isEqualTo(v.intValue());
                     }
                     cnt.incrementAndGet();
@@ -746,7 +742,7 @@ public class ParquetColumnarRowInputFormatTest {
                                 .isEqualTo(DecimalData.fromBigDecimal(new BigDecimal(25), 15, 0));
                         assertThat(row.getDecimal(14, 20, 0))
                                 .isEqualTo(DecimalData.fromBigDecimal(new BigDecimal(26), 20, 0));
-                        assertThat(row.getString(15).toString()).isEqualTo("f45");
+                        assertThat(row.getString(15)).hasToString("f45");
                     }
                     cnt.incrementAndGet();
                 });
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetFileSystemITCase.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetFileSystemITCase.java
index a10f57415d0..f6d9e6509a6 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetFileSystemITCase.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetFileSystemITCase.java
@@ -101,11 +101,11 @@ public class ParquetFileSystemITCase extends BatchFileSystemITCaseBase {
             ParquetMetadata footer =
                     readFooter(new Configuration(), path, range(0, Long.MAX_VALUE));
             if (configure) {
-                assertThat(footer.getBlocks().get(0).getColumns().get(0).getCodec().toString())
-                        .isEqualTo("GZIP");
+                assertThat(footer.getBlocks().get(0).getColumns().get(0).getCodec())
+                        .hasToString("GZIP");
             } else {
-                assertThat(footer.getBlocks().get(0).getColumns().get(0).getCodec().toString())
-                        .isEqualTo("SNAPPY");
+                assertThat(footer.getBlocks().get(0).getColumns().get(0).getCodec())
+                        .hasToString("SNAPPY");
             }
         } catch (IOException e) {
             throw new RuntimeException(e);
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetFormatStatisticsReportTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetFormatStatisticsReportTest.java
index 7e06b79367a..29ebd14c445 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetFormatStatisticsReportTest.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetFormatStatisticsReportTest.java
@@ -69,7 +69,7 @@ public class ParquetFormatStatisticsReportTest extends StatisticsReportTestBase
         // insert data and get statistics.
         DataType dataType = tEnv.from("sourceTable").getResolvedSchema().toPhysicalRowDataType();
         tEnv.fromValues(dataType, getData()).executeInsert("sourceTable").await();
-        assertThat(folder.listFiles()).isNotNull().hasSize(1);
+        assertThat(folder.listFiles()).hasSize(1);
         File[] files = folder.listFiles();
         assert files != null;
         TableStats tableStats =
@@ -84,7 +84,7 @@ public class ParquetFormatStatisticsReportTest extends StatisticsReportTestBase
         DataType dataType = tEnv.from("sourceTable").getResolvedSchema().toPhysicalRowDataType();
         tEnv.fromValues(dataType, getData()).executeInsert("sourceTable").await();
         tEnv.fromValues(dataType, getData()).executeInsert("sourceTable").await();
-        assertThat(folder.listFiles()).isNotNull().hasSize(2);
+        assertThat(folder.listFiles()).hasSize(2);
         File[] files = folder.listFiles();
         List<Path> paths = new ArrayList<>();
         assert files != null;
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetFileReadITCase.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetFileReadITCase.java
index b436eb59c0b..abd79903469 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetFileReadITCase.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetFileReadITCase.java
@@ -24,16 +24,19 @@ import org.apache.flink.connector.file.src.FileSource;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.formats.parquet.ParquetWriterFactory;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.flink.util.CloseableIterator;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -47,18 +50,28 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** ITCase for {@link AvroParquetRecordFormat}. */
-public class AvroParquetFileReadITCase extends AbstractTestBase {
+class AvroParquetFileReadITCase {
 
     private static final int PARALLELISM = 4;
     private static final String USER_PARQUET_FILE_1 = "user1.parquet";
     private static final String USER_PARQUET_FILE_2 = "user2.parquet";
     private static final String USER_PARQUET_FILE_3 = "user3.parquet";
 
+    @TempDir static java.nio.file.Path temporaryFolder;
+
     private static Schema schema;
     private static final List<GenericRecord> userRecords = new ArrayList<>(3);
 
-    @BeforeClass
-    public static void setup() throws IOException {
+    @RegisterExtension
+    public static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(PARALLELISM)
+                            .build());
+
+    @BeforeAll
+    static void setup() throws IOException {
         // Generic records
         schema =
                 new Schema.Parser()
@@ -79,14 +92,14 @@ public class AvroParquetFileReadITCase extends AbstractTestBase {
 
         createParquetFile(
                 AvroParquetWriters.forGenericRecord(schema),
-                Path.fromLocalFile(TEMPORARY_FOLDER.newFile(USER_PARQUET_FILE_1)),
+                Path.fromLocalFile(temporaryFolder.resolve(USER_PARQUET_FILE_1).toFile()),
                 userRecords.toArray(new GenericRecord[0]));
 
         GenericRecord user = createUser("Max", 4, "blue");
         userRecords.add(user);
         createParquetFile(
                 AvroParquetWriters.forGenericRecord(schema),
-                Path.fromLocalFile(TEMPORARY_FOLDER.newFile(USER_PARQUET_FILE_2)),
+                Path.fromLocalFile(temporaryFolder.resolve(USER_PARQUET_FILE_2).toFile()),
                 user);
 
         user = createUser("Alex", 5, "White");
@@ -96,17 +109,17 @@ public class AvroParquetFileReadITCase extends AbstractTestBase {
         userRecords.add(user1);
         createParquetFile(
                 AvroParquetWriters.forGenericRecord(schema),
-                Path.fromLocalFile(TEMPORARY_FOLDER.newFile(USER_PARQUET_FILE_3)),
+                Path.fromLocalFile(temporaryFolder.resolve(USER_PARQUET_FILE_3).toFile()),
                 user,
                 user1);
     }
 
     @Test
-    public void testReadAvroRecord() throws Exception {
+    void testReadAvroRecord() throws Exception {
         final FileSource<GenericRecord> source =
                 FileSource.forRecordStreamFormat(
                                 AvroParquetReaders.forGenericRecord(schema),
-                                Path.fromLocalFile(TEMPORARY_FOLDER.getRoot()))
+                                Path.fromLocalFile(temporaryFolder.toFile()))
                         .monitorContinuously(Duration.ofMillis(5))
                         .build();
         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -128,12 +141,12 @@ public class AvroParquetFileReadITCase extends AbstractTestBase {
     }
 
     @Test
-    public void testReadAvroReflectRecord() throws Exception {
+    void testReadAvroReflectRecord() throws Exception {
         final FileSource<AvroParquetRecordFormatTest.User> source =
                 FileSource.forRecordStreamFormat(
                                 AvroParquetReaders.forReflectRecord(
                                         AvroParquetRecordFormatTest.User.class),
-                                Path.fromLocalFile(TEMPORARY_FOLDER.getRoot()))
+                                Path.fromLocalFile(temporaryFolder.toFile()))
                         .monitorContinuously(Duration.ofMillis(5))
                         .build();
         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetStreamingFileSinkITCase.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetStreamingFileSinkITCase.java
index 4bc0820c134..91de5fbc4fe 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetStreamingFileSinkITCase.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetStreamingFileSinkITCase.java
@@ -27,7 +27,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
 import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.UniqueBucketAssigner;
 import org.apache.flink.streaming.util.FiniteTestSource;
-import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.test.junit5.MiniClusterExtension;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
@@ -39,9 +39,9 @@ import org.apache.parquet.avro.AvroParquetReader;
 import org.apache.parquet.hadoop.ParquetReader;
 import org.apache.parquet.hadoop.util.HadoopInputFile;
 import org.apache.parquet.io.InputFile;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
 import java.io.IOException;
@@ -59,15 +59,11 @@ import static org.assertj.core.api.Assertions.assertThat;
  * Simple integration test case for writing bulk encoded files with the {@link StreamingFileSink}
  * with Parquet.
  */
-@SuppressWarnings("serial")
-public class AvroParquetStreamingFileSinkITCase extends AbstractTestBase {
-
-    @Rule public final Timeout timeoutPerTest = Timeout.seconds(20);
+@ExtendWith(MiniClusterExtension.class)
+class AvroParquetStreamingFileSinkITCase {
 
     @Test
-    public void testWriteParquetAvroSpecific() throws Exception {
-
-        final File folder = TEMPORARY_FOLDER.newFolder();
+    void testWriteParquetAvroSpecific(@TempDir File folder) throws Exception {
 
         final List<Address> data =
                 Arrays.asList(
@@ -95,9 +91,7 @@ public class AvroParquetStreamingFileSinkITCase extends AbstractTestBase {
     }
 
     @Test
-    public void testWriteParquetAvroGeneric() throws Exception {
-
-        final File folder = TEMPORARY_FOLDER.newFolder();
+    void testWriteParquetAvroGeneric(@TempDir File folder) throws Exception {
 
         final Schema schema = Address.getClassSchema();
 
@@ -128,9 +122,7 @@ public class AvroParquetStreamingFileSinkITCase extends AbstractTestBase {
     }
 
     @Test
-    public void testWriteParquetAvroReflect() throws Exception {
-
-        final File folder = TEMPORARY_FOLDER.newFolder();
+    void testWriteParquetAvroReflect(@TempDir File folder) throws Exception {
 
         final List<Datum> data =
                 Arrays.asList(new Datum("a", 1), new Datum("b", 2), new Datum("c", 3));
@@ -159,11 +151,9 @@ public class AvroParquetStreamingFileSinkITCase extends AbstractTestBase {
     private static <T> void validateResults(File folder, GenericData dataModel, List<T> expected)
             throws Exception {
         File[] buckets = folder.listFiles();
-        assertThat(buckets).isNotNull();
         assertThat(buckets).hasSize(1);
 
         File[] partFiles = buckets[0].listFiles();
-        assertThat(partFiles).isNotNull();
         assertThat(partFiles).hasSize(2);
 
         for (File partFile : partFiles) {
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/protobuf/ParquetProtoStreamingFileSinkITCase.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/protobuf/ParquetProtoStreamingFileSinkITCase.java
index 013f11c8384..3b70aa158c3 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/protobuf/ParquetProtoStreamingFileSinkITCase.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/protobuf/ParquetProtoStreamingFileSinkITCase.java
@@ -25,15 +25,15 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
 import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.UniqueBucketAssigner;
 import org.apache.flink.streaming.util.FiniteTestSource;
-import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.test.junit5.MiniClusterExtension;
 
 import com.google.protobuf.Message;
 import com.google.protobuf.MessageOrBuilder;
 import org.apache.parquet.hadoop.ParquetReader;
 import org.apache.parquet.proto.ProtoParquetReader;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
 import java.io.IOException;
@@ -49,13 +49,11 @@ import static org.assertj.core.api.Assertions.assertThat;
  * Simple integration test case for writing bulk encoded files with the {@link StreamingFileSink}
  * with Parquet.
  */
-public class ParquetProtoStreamingFileSinkITCase extends AbstractTestBase {
-
-    @Rule public final Timeout timeoutPerTest = Timeout.seconds(20);
+@ExtendWith(MiniClusterExtension.class)
+class ParquetProtoStreamingFileSinkITCase {
 
     @Test
-    public void testParquetProtoWriters() throws Exception {
-        File folder = TEMPORARY_FOLDER.newFolder();
+    void testParquetProtoWriters(@TempDir File folder) throws Exception {
 
         List<SimpleProtoRecord> data =
                 Arrays.asList(
@@ -88,11 +86,9 @@ public class ParquetProtoStreamingFileSinkITCase extends AbstractTestBase {
     private static <T extends MessageOrBuilder> void validateResults(File folder, List<T> expected)
             throws Exception {
         File[] buckets = folder.listFiles();
-        assertThat(buckets).isNotNull();
         assertThat(buckets).hasSize(1);
 
         File[] partFiles = buckets[0].listFiles();
-        assertThat(partFiles).isNotNull();
         assertThat(partFiles).hasSize(2);
 
         for (File partFile : partFiles) {
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java
index 902b41249ce..38edb278ad8 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java
@@ -53,9 +53,8 @@ import org.apache.parquet.hadoop.ParquetOutputFormat;
 import org.apache.parquet.hadoop.ParquetReader;
 import org.apache.parquet.hadoop.util.HadoopInputFile;
 import org.apache.parquet.io.InputFile;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
 import java.io.IOException;
@@ -72,9 +71,7 @@ import java.util.stream.IntStream;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link ParquetRowDataBuilder} and {@link ParquetRowDataWriter}. */
-public class ParquetRowDataWriterTest {
-
-    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+class ParquetRowDataWriterTest {
 
     private static final RowType ROW_TYPE =
             RowType.of(
@@ -112,26 +109,27 @@ public class ParquetRowDataWriterTest {
                     TypeConversions.fromLogicalToDataType(ROW_TYPE));
 
     @Test
-    public void testTypes() throws Exception {
+    void testTypes(@TempDir java.nio.file.Path folder) throws Exception {
         Configuration conf = new Configuration();
-        innerTest(conf, true);
-        innerTest(conf, false);
-        complexTypeTest(conf, true);
-        complexTypeTest(conf, false);
+        innerTest(folder, conf, true);
+        innerTest(folder, conf, false);
+        complexTypeTest(folder, conf, true);
+        complexTypeTest(folder, conf, false);
     }
 
     @Test
-    public void testCompression() throws Exception {
+    void testCompression(@TempDir java.nio.file.Path folder) throws Exception {
         Configuration conf = new Configuration();
         conf.set(ParquetOutputFormat.COMPRESSION, "GZIP");
-        innerTest(conf, true);
-        innerTest(conf, false);
-        complexTypeTest(conf, true);
-        complexTypeTest(conf, false);
+        innerTest(folder, conf, true);
+        innerTest(folder, conf, false);
+        complexTypeTest(folder, conf, true);
+        complexTypeTest(folder, conf, false);
     }
 
-    private void innerTest(Configuration conf, boolean utcTimestamp) throws IOException {
-        Path path = new Path(TEMPORARY_FOLDER.newFolder().getPath(), UUID.randomUUID().toString());
+    private void innerTest(java.nio.file.Path folder, Configuration conf, boolean utcTimestamp)
+            throws IOException {
+        Path path = new Path(folder.toString(), UUID.randomUUID().toString());
         int number = 1000;
         List<Row> rows = new ArrayList<>(number);
         for (int i = 0; i < number; i++) {
@@ -188,8 +186,9 @@ public class ParquetRowDataWriterTest {
         assertThat(cnt).isEqualTo(number);
     }
 
-    public void complexTypeTest(Configuration conf, boolean utcTimestamp) throws Exception {
-        Path path = new Path(TEMPORARY_FOLDER.newFolder().getPath(), UUID.randomUUID().toString());
+    public void complexTypeTest(java.nio.file.Path folder, Configuration conf, boolean utcTimestamp)
+            throws Exception {
+        Path path = new Path(folder.toString(), UUID.randomUUID().toString());
         int number = 1000;
         List<Row> rows = new ArrayList<>(number);
         Map<String, String> mapData = new HashMap<>();
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/SerializableConfigurationTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/SerializableConfigurationTest.java
index 7f29899baa9..b51a23dd644 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/SerializableConfigurationTest.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/SerializableConfigurationTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.formats.parquet.utils;
 
 import org.apache.hadoop.conf.Configuration;
 import org.jetbrains.annotations.Nullable;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.net.URL;
@@ -29,10 +29,10 @@ import static org.apache.flink.util.InstantiationUtil.deserializeObject;
 import static org.apache.flink.util.InstantiationUtil.serializeObject;
 
 /** Test for {@link SerializableConfiguration}. */
-public class SerializableConfigurationTest {
+class SerializableConfigurationTest {
 
     @Test
-    public void testResource() throws IOException, ClassNotFoundException {
+    void testResource() throws IOException, ClassNotFoundException {
         ClassLoader cl =
                 new ClassLoader(Thread.currentThread().getContextClassLoader()) {
                     @Nullable
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/vector/ParquetColumnarRowSplitReaderTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/vector/ParquetColumnarRowSplitReaderTest.java
index d0e588b0339..1c9a5ed7d4d 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/vector/ParquetColumnarRowSplitReaderTest.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/vector/ParquetColumnarRowSplitReaderTest.java
@@ -52,11 +52,9 @@ import org.apache.flink.table.types.utils.TypeConversions;
 import org.apache.flink.table.utils.DateTimeUtils;
 
 import org.apache.hadoop.conf.Configuration;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.File;
 import java.io.IOException;
@@ -76,8 +74,7 @@ import java.util.stream.IntStream;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link ParquetColumnarRowSplitReader}. */
-@RunWith(Parameterized.class)
-public class ParquetColumnarRowSplitReaderTest {
+class ParquetColumnarRowSplitReaderTest {
 
     private static final int FIELD_NUMBER = 33;
     private static final LocalDateTime BASE_TIME = LocalDateTime.now();
@@ -120,21 +117,15 @@ public class ParquetColumnarRowSplitReaderTest {
                     new MapType(new IntType(), new BooleanType()),
                     RowType.of(new VarCharType(VarCharType.MAX_LENGTH), new IntType()));
 
-    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+    @TempDir File tmpDir;
 
-    private final int rowGroupSize;
-
-    @Parameterized.Parameters(name = "rowGroupSize-{0}")
     public static Collection<Integer> parameters() {
         return Arrays.asList(10, 1000);
     }
 
-    public ParquetColumnarRowSplitReaderTest(int rowGroupSize) {
-        this.rowGroupSize = rowGroupSize;
-    }
-
-    @Test
-    public void testNormalTypesReadWithSplits() throws IOException {
+    @ParameterizedTest
+    @MethodSource("parameters")
+    void testNormalTypesReadWithSplits(int rowGroupSize) throws IOException {
         // prepare parquet file
         int number = 10000;
         List<RowData> records = new ArrayList<>(number);
@@ -151,11 +142,12 @@ public class ParquetColumnarRowSplitReaderTest {
             }
         }
 
-        testNormalTypes(number, records, values);
+        testNormalTypes(number, records, values, rowGroupSize);
     }
 
-    @Test
-    public void testReachEnd() throws Exception {
+    @ParameterizedTest
+    @MethodSource("parameters")
+    void testReachEnd(int rowGroupSize) throws Exception {
         // prepare parquet file
         int number = 5;
         List<RowData> records = new ArrayList<>(number);
@@ -168,8 +160,7 @@ public class ParquetColumnarRowSplitReaderTest {
                 records.add(newRow(v));
             }
         }
-
-        Path testPath = createTempParquetFile(TEMPORARY_FOLDER.newFolder(), records, rowGroupSize);
+        Path testPath = createTempParquetFile(tmpDir, records, rowGroupSize);
         ParquetColumnarRowSplitReader reader =
                 createReader(
                         testPath, 0, testPath.getFileSystem().getFileStatus(testPath).getLen());
@@ -198,9 +189,10 @@ public class ParquetColumnarRowSplitReaderTest {
         return path;
     }
 
-    private void testNormalTypes(int number, List<RowData> records, List<Integer> values)
+    private void testNormalTypes(
+            int number, List<RowData> records, List<Integer> values, int rowGroupSize)
             throws IOException {
-        Path testPath = createTempParquetFile(TEMPORARY_FOLDER.newFolder(), records, rowGroupSize);
+        Path testPath = createTempParquetFile(tmpDir, records, rowGroupSize);
 
         // test reading and splitting
         long fileLen = testPath.getFileSystem().getFileStatus(testPath).getLen();
@@ -285,7 +277,7 @@ public class ParquetColumnarRowSplitReaderTest {
                 assertThat(row.isNullAt(31)).isTrue();
                 assertThat(row.isNullAt(32)).isTrue();
             } else {
-                assertThat(row.getString(0).toString()).isEqualTo("" + v);
+                assertThat(row.getString(0)).hasToString("" + v);
                 assertThat(row.getBoolean(1)).isEqualTo(v % 2 == 0);
                 assertThat(row.getByte(2)).isEqualTo(v.byteValue());
                 assertThat(row.getShort(3)).isEqualTo(v.shortValue());
@@ -317,7 +309,7 @@ public class ParquetColumnarRowSplitReaderTest {
                         .isEqualTo(DecimalData.fromBigDecimal(BigDecimal.valueOf(v), 15, 0));
                 assertThat(row.getDecimal(14, 20, 0))
                         .isEqualTo(DecimalData.fromBigDecimal(BigDecimal.valueOf(v), 20, 0));
-                assertThat(row.getArray(15).getString(0).toString()).isEqualTo("" + v);
+                assertThat(row.getArray(15).getString(0)).hasToString("" + v);
                 assertThat(row.getArray(16).getBoolean(0)).isEqualTo(v % 2 == 0);
                 assertThat(row.getArray(17).getByte(0)).isEqualTo(v.byteValue());
                 assertThat(row.getArray(18).getShort(0)).isEqualTo(v.shortValue());
@@ -336,9 +328,9 @@ public class ParquetColumnarRowSplitReaderTest {
                         .isEqualTo(DecimalData.fromBigDecimal(BigDecimal.valueOf(v), 15, 0));
                 assertThat(row.getArray(29).getDecimal(0, 20, 0))
                         .isEqualTo(DecimalData.fromBigDecimal(BigDecimal.valueOf(v), 20, 0));
-                assertThat(row.getMap(30).valueArray().getString(0).toString()).isEqualTo("" + v);
+                assertThat(row.getMap(30).valueArray().getString(0)).hasToString("" + v);
                 assertThat(row.getMap(31).valueArray().getBoolean(0)).isEqualTo(v % 2 == 0);
-                assertThat(row.getRow(32, 2).getString(0).toString()).isEqualTo("" + v);
+                assertThat(row.getRow(32, 2).getString(0)).hasToString("" + v);
                 assertThat(row.getRow(32, 2).getInt(1)).isEqualTo(v.intValue());
             }
             i++;
@@ -418,8 +410,9 @@ public class ParquetColumnarRowSplitReaderTest {
         return BASE_TIME.plusNanos(v).plusSeconds(v);
     }
 
-    @Test
-    public void testDictionary() throws IOException {
+    @ParameterizedTest
+    @MethodSource("parameters")
+    void testDictionary(int rowGroupSize) throws IOException {
         // prepare parquet file
         int number = 10000;
         List<RowData> records = new ArrayList<>(number);
@@ -441,11 +434,12 @@ public class ParquetColumnarRowSplitReaderTest {
             }
         }
 
-        testNormalTypes(number, records, values);
+        testNormalTypes(number, records, values, rowGroupSize);
     }
 
-    @Test
-    public void testPartialDictionary() throws IOException {
+    @ParameterizedTest
+    @MethodSource("parameters")
+    void testPartialDictionary(int rowGroupSize) throws IOException {
         // prepare parquet file
         int number = 10000;
         List<RowData> records = new ArrayList<>(number);
@@ -467,11 +461,12 @@ public class ParquetColumnarRowSplitReaderTest {
             }
         }
 
-        testNormalTypes(number, records, values);
+        testNormalTypes(number, records, values, rowGroupSize);
     }
 
-    @Test
-    public void testContinuousRepetition() throws IOException {
+    @ParameterizedTest
+    @MethodSource("parameters")
+    void testContinuousRepetition(int rowGroupSize) throws IOException {
         // prepare parquet file
         int number = 10000;
         List<RowData> records = new ArrayList<>(number);
@@ -490,11 +485,12 @@ public class ParquetColumnarRowSplitReaderTest {
             }
         }
 
-        testNormalTypes(number, records, values);
+        testNormalTypes(number, records, values, rowGroupSize);
     }
 
-    @Test
-    public void testLargeValue() throws IOException {
+    @ParameterizedTest
+    @MethodSource("parameters")
+    void testLargeValue(int rowGroupSize) throws IOException {
         // prepare parquet file
         int number = 10000;
         List<RowData> records = new ArrayList<>(number);
@@ -511,11 +507,12 @@ public class ParquetColumnarRowSplitReaderTest {
             }
         }
 
-        testNormalTypes(number, records, values);
+        testNormalTypes(number, records, values, rowGroupSize);
     }
 
-    @Test
-    public void testProject() throws IOException {
+    @ParameterizedTest
+    @MethodSource("parameters")
+    void testProject(int rowGroupSize) throws IOException {
         // prepare parquet file
         int number = 1000;
         List<RowData> records = new ArrayList<>(number);
@@ -523,7 +520,7 @@ public class ParquetColumnarRowSplitReaderTest {
             Integer v = i;
             records.add(newRow(v));
         }
-        Path testPath = createTempParquetFile(TEMPORARY_FOLDER.newFolder(), records, rowGroupSize);
+        Path testPath = createTempParquetFile(tmpDir, records, rowGroupSize);
         RowType rowType = RowType.of(new DoubleType(), new TinyIntType(), new IntType());
         // test reader
         ParquetColumnarRowSplitReader reader =
@@ -549,8 +546,9 @@ public class ParquetColumnarRowSplitReaderTest {
         reader.close();
     }
 
-    @Test
-    public void testPartitionValues() throws IOException {
+    @ParameterizedTest
+    @MethodSource("parameters")
+    void testPartitionValues(int rowGroupSize) throws IOException {
         // prepare parquet file
         int number = 1000;
         List<RowData> records = new ArrayList<>(number);
@@ -558,7 +556,8 @@ public class ParquetColumnarRowSplitReaderTest {
             Integer v = i;
             records.add(newRow(v));
         }
-        Path testPath = createTempParquetFile(TEMPORARY_FOLDER.newFolder(), records, rowGroupSize);
+
+        Path testPath = createTempParquetFile(tmpDir, records, rowGroupSize);
 
         // test reader
         Map<String, Object> partSpec = new HashMap<>();
@@ -576,17 +575,18 @@ public class ParquetColumnarRowSplitReaderTest {
         partSpec.put("f44", new BigDecimal(44));
         partSpec.put("f45", "f45");
 
-        innerTestPartitionValues(testPath, partSpec, false);
+        innerTestPartitionValues(testPath, partSpec, false, rowGroupSize);
 
         for (String k : new ArrayList<>(partSpec.keySet())) {
             partSpec.put(k, null);
         }
 
-        innerTestPartitionValues(testPath, partSpec, true);
+        innerTestPartitionValues(testPath, partSpec, true, rowGroupSize);
     }
 
     private void innerTestPartitionValues(
-            Path testPath, Map<String, Object> partSpec, boolean nullPartValue) throws IOException {
+            Path testPath, Map<String, Object> partSpec, boolean nullPartValue, int rowGroupSize)
+            throws IOException {
         LogicalType[] fieldTypes =
                 new LogicalType[] {
                     new VarCharType(VarCharType.MAX_LENGTH),
@@ -685,7 +685,7 @@ public class ParquetColumnarRowSplitReaderTest {
                         .isEqualTo(DecimalData.fromBigDecimal(new BigDecimal(43), 15, 0));
                 assertThat(row.getDecimal(14, 20, 0))
                         .isEqualTo(DecimalData.fromBigDecimal(new BigDecimal(44), 20, 0));
-                assertThat(row.getString(15).toString()).isEqualTo("f45");
+                assertThat(row.getString(15)).hasToString("f45");
             }
 
             i++;
diff --git a/flink-formats/flink-parquet/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension b/flink-formats/flink-parquet/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
new file mode 100644
index 00000000000..28999133c2b
--- /dev/null
+++ b/flink-formats/flink-parquet/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.util.TestLoggerExtension
\ No newline at end of file