You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/11/04 16:27:57 UTC
[flink] branch master updated: [FLINK-28449][tests][JUnit5 migration] flink-parquet (#20230)
This is an automated email from the ASF dual-hosted git repository.
mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new db3244a6ac2 [FLINK-28449][tests][JUnit5 migration] flink-parquet (#20230)
db3244a6ac2 is described below
commit db3244a6ac2b06cebae9be26d7bcac506cc69093
Author: Ryan Skraba <rs...@apache.org>
AuthorDate: Fri Nov 4 11:27:41 2022 -0500
[FLINK-28449][tests][JUnit5 migration] flink-parquet (#20230)
---
.../formats/compress/CompressionFactoryITCase.java | 2 -
.../architecture/TestCodeArchitectureTest.java | 2 +-
.../parquet/ParquetColumnarRowInputFormatTest.java | 96 ++++++++++----------
.../formats/parquet/ParquetFileSystemITCase.java | 8 +-
.../parquet/ParquetFormatStatisticsReportTest.java | 4 +-
.../parquet/avro/AvroParquetFileReadITCase.java | 39 +++++---
.../avro/AvroParquetStreamingFileSinkITCase.java | 28 ++----
.../ParquetProtoStreamingFileSinkITCase.java | 18 ++--
.../parquet/row/ParquetRowDataWriterTest.java | 39 ++++----
.../utils/SerializableConfigurationTest.java | 6 +-
.../vector/ParquetColumnarRowSplitReaderTest.java | 100 ++++++++++-----------
.../org.junit.jupiter.api.extension.Extension | 16 ++++
12 files changed, 183 insertions(+), 175 deletions(-)
diff --git a/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CompressionFactoryITCase.java b/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CompressionFactoryITCase.java
index 66465b70949..c2d08112f67 100644
--- a/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CompressionFactoryITCase.java
+++ b/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CompressionFactoryITCase.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
@@ -60,7 +59,6 @@ class CompressionFactoryITCase {
private final List<String> testData = Arrays.asList("line1", "line2", "line3");
@Test
- @Timeout(20)
void testWriteCompressedFile(@TempDir java.nio.file.Path tmpDir) throws Exception {
final File folder = tmpDir.toFile();
final Path testPath = Path.fromLocalFile(folder);
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java
index 2e94cb407aa..128b218500d 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java
@@ -33,7 +33,7 @@ import com.tngtech.archunit.junit.ArchTests;
ImportOptions.ExcludeScalaImportOption.class,
ImportOptions.ExcludeShadedImportOption.class
})
-public class TestCodeArchitectureTest {
+class TestCodeArchitectureTest {
@ArchTest
public static final ArchTests COMMON_TESTS = ArchTests.in(TestCodeArchitectureTestBase.class);
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetColumnarRowInputFormatTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetColumnarRowInputFormatTest.java
index e35aff57eb7..fa654052901 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetColumnarRowInputFormatTest.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetColumnarRowInputFormatTest.java
@@ -54,11 +54,9 @@ import org.apache.flink.table.utils.DateTimeUtils;
import org.apache.flink.util.InstantiationUtil;
import org.apache.hadoop.conf.Configuration;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
import java.io.File;
import java.io.IOException;
@@ -84,8 +82,7 @@ import static org.apache.flink.table.utils.PartitionPathUtils.generatePartitionP
import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link ParquetColumnarRowInputFormat}. */
-@RunWith(Parameterized.class)
-public class ParquetColumnarRowInputFormatTest {
+class ParquetColumnarRowInputFormatTest {
private static final LocalDateTime BASE_TIME = LocalDateTime.now();
private static final org.apache.flink.configuration.Configuration EMPTY_CONF =
@@ -129,21 +126,15 @@ public class ParquetColumnarRowInputFormatTest {
new MapType(new IntType(), new BooleanType()),
RowType.of(new VarCharType(VarCharType.MAX_LENGTH), new IntType()));
- @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+ @TempDir private File folder;
- private final int rowGroupSize;
-
- @Parameterized.Parameters(name = "rowGroupSize-{0}")
public static Collection<Integer> parameters() {
return Arrays.asList(10, 1000);
}
- public ParquetColumnarRowInputFormatTest(int rowGroupSize) {
- this.rowGroupSize = rowGroupSize;
- }
-
- @Test
- public void testTypesReadWithSplits() throws IOException {
+ @ParameterizedTest
+ @MethodSource("parameters")
+ void testTypesReadWithSplits(int rowGroupSize) throws IOException {
int number = 10000;
List<Integer> values = new ArrayList<>(number);
Random random = new Random();
@@ -152,11 +143,12 @@ public class ParquetColumnarRowInputFormatTest {
values.add(v % 10 == 0 ? null : v);
}
- innerTestTypes(values);
+ innerTestTypes(folder, values, rowGroupSize);
}
- @Test
- public void testDictionary() throws IOException {
+ @ParameterizedTest
+ @MethodSource("parameters")
+ void testDictionary(int rowGroupSize) throws IOException {
int number = 10000;
List<Integer> values = new ArrayList<>(number);
Random random = new Random();
@@ -170,11 +162,12 @@ public class ParquetColumnarRowInputFormatTest {
values.add(v == 0 ? null : v);
}
- innerTestTypes(values);
+ innerTestTypes(folder, values, rowGroupSize);
}
- @Test
- public void testPartialDictionary() throws IOException {
+ @ParameterizedTest
+ @MethodSource("parameters")
+ void testPartialDictionary(int rowGroupSize) throws IOException {
// prepare parquet file
int number = 10000;
List<Integer> values = new ArrayList<>(number);
@@ -189,11 +182,12 @@ public class ParquetColumnarRowInputFormatTest {
values.add(v == 0 ? null : v);
}
- innerTestTypes(values);
+ innerTestTypes(folder, values, rowGroupSize);
}
- @Test
- public void testContinuousRepetition() throws IOException {
+ @ParameterizedTest
+ @MethodSource("parameters")
+ void testContinuousRepetition(int rowGroupSize) throws IOException {
int number = 10000;
List<Integer> values = new ArrayList<>(number);
Random random = new Random();
@@ -204,11 +198,12 @@ public class ParquetColumnarRowInputFormatTest {
}
}
- innerTestTypes(values);
+ innerTestTypes(folder, values, rowGroupSize);
}
- @Test
- public void testLargeValue() throws IOException {
+ @ParameterizedTest
+ @MethodSource("parameters")
+ void testLargeValue(int rowGroupSize) throws IOException {
int number = 10000;
List<Integer> values = new ArrayList<>(number);
Random random = new Random();
@@ -217,11 +212,12 @@ public class ParquetColumnarRowInputFormatTest {
values.add(v % 10 == 0 ? null : v);
}
- innerTestTypes(values);
+ innerTestTypes(folder, values, rowGroupSize);
}
- @Test
- public void testProjection() throws IOException {
+ @ParameterizedTest
+ @MethodSource("parameters")
+ void testProjection(int rowGroupSize) throws IOException {
int number = 1000;
List<RowData> records = new ArrayList<>(number);
for (int i = 0; i < number; i++) {
@@ -229,7 +225,7 @@ public class ParquetColumnarRowInputFormatTest {
records.add(newRow(v));
}
- Path testPath = createTempParquetFile(TEMPORARY_FOLDER.newFolder(), records, rowGroupSize);
+ Path testPath = createTempParquetFile(folder, records, rowGroupSize);
// test reader
LogicalType[] fieldTypes =
new LogicalType[] {new DoubleType(), new TinyIntType(), new IntType()};
@@ -256,8 +252,9 @@ public class ParquetColumnarRowInputFormatTest {
});
}
- @Test
- public void testProjectionReadUnknownField() throws IOException {
+ @ParameterizedTest
+ @MethodSource("parameters")
+ void testProjectionReadUnknownField(int rowGroupSize) throws IOException {
int number = 1000;
List<RowData> records = new ArrayList<>(number);
for (int i = 0; i < number; i++) {
@@ -265,7 +262,7 @@ public class ParquetColumnarRowInputFormatTest {
records.add(newRow(v));
}
- Path testPath = createTempParquetFile(TEMPORARY_FOLDER.newFolder(), records, rowGroupSize);
+ Path testPath = createTempParquetFile(folder, records, rowGroupSize);
// test reader
LogicalType[] fieldTypes =
@@ -297,8 +294,9 @@ public class ParquetColumnarRowInputFormatTest {
});
}
- @Test
- public void testPartitionValues() throws IOException {
+ @ParameterizedTest
+ @MethodSource("parameters")
+ void testPartitionValues(int rowGroupSize) throws IOException {
// prepare parquet file
int number = 1000;
List<RowData> records = new ArrayList<>(number);
@@ -307,8 +305,6 @@ public class ParquetColumnarRowInputFormatTest {
records.add(newRow(v));
}
- File root = TEMPORARY_FOLDER.newFolder();
-
List<String> partitionKeys =
Arrays.asList(
"f33", "f34", "f35", "f36", "f37", "f38", "f39", "f40", "f41", "f42", "f43",
@@ -332,7 +328,7 @@ public class ParquetColumnarRowInputFormatTest {
partSpec.put("f45", "f45");
String partPath = generatePartitionPath(partSpec);
- Path testPath = createTempParquetFile(new File(root, partPath), records, rowGroupSize);
+ Path testPath = createTempParquetFile(new File(folder, partPath), records, rowGroupSize);
innerTestPartitionValues(testPath, partitionKeys, false);
@@ -343,14 +339,15 @@ public class ParquetColumnarRowInputFormatTest {
}
partPath = generatePartitionPath(partSpec);
- testPath = createTempParquetFile(new File(root, partPath), records, rowGroupSize);
+ testPath = createTempParquetFile(new File(folder, partPath), records, rowGroupSize);
innerTestPartitionValues(testPath, partitionKeys, true);
}
- private void innerTestTypes(List<Integer> records) throws IOException {
+ private void innerTestTypes(File folder, List<Integer> records, int rowGroupSize)
+ throws IOException {
List<RowData> rows = records.stream().map(this::newRow).collect(Collectors.toList());
- Path testPath = createTempParquetFile(TEMPORARY_FOLDER.newFolder(), rows, rowGroupSize);
+ Path testPath = createTempParquetFile(folder, rows, rowGroupSize);
// test reading and splitting
long fileLen = testPath.getFileSystem().getFileStatus(testPath).getLen();
@@ -475,7 +472,7 @@ public class ParquetColumnarRowInputFormatTest {
assertThat(row.isNullAt(31)).isTrue();
assertThat(row.isNullAt(32)).isTrue();
} else {
- assertThat(row.getString(0).toString()).isEqualTo("" + v);
+ assertThat(row.getString(0)).hasToString("" + v);
assertThat(row.getBoolean(1)).isEqualTo(v % 2 == 0);
assertThat(row.getByte(2)).isEqualTo(v.byteValue());
assertThat(row.getShort(3)).isEqualTo(v.shortValue());
@@ -516,7 +513,7 @@ public class ParquetColumnarRowInputFormatTest {
.isEqualTo(BigDecimal.valueOf(v));
assertThat(row.getDecimal(14, 20, 0).toBigDecimal())
.isEqualTo(BigDecimal.valueOf(v));
- assertThat(row.getArray(15).getString(0).toString()).isEqualTo("" + v);
+ assertThat(row.getArray(15).getString(0)).hasToString("" + v);
assertThat(row.getArray(16).getBoolean(0)).isEqualTo(v % 2 == 0);
assertThat(row.getArray(17).getByte(0)).isEqualTo(v.byteValue());
assertThat(row.getArray(18).getShort(0)).isEqualTo(v.shortValue());
@@ -539,10 +536,9 @@ public class ParquetColumnarRowInputFormatTest {
assertThat(row.getArray(29).getDecimal(0, 20, 0))
.isEqualTo(
DecimalData.fromBigDecimal(BigDecimal.valueOf(v), 20, 0));
- assertThat(row.getMap(30).valueArray().getString(0).toString())
- .isEqualTo("" + v);
+ assertThat(row.getMap(30).valueArray().getString(0)).hasToString("" + v);
assertThat(row.getMap(31).valueArray().getBoolean(0)).isEqualTo(v % 2 == 0);
- assertThat(row.getRow(32, 2).getString(0).toString()).isEqualTo("" + v);
+ assertThat(row.getRow(32, 2).getString(0)).hasToString("" + v);
assertThat(row.getRow(32, 2).getInt(1)).isEqualTo(v.intValue());
}
cnt.incrementAndGet();
@@ -746,7 +742,7 @@ public class ParquetColumnarRowInputFormatTest {
.isEqualTo(DecimalData.fromBigDecimal(new BigDecimal(25), 15, 0));
assertThat(row.getDecimal(14, 20, 0))
.isEqualTo(DecimalData.fromBigDecimal(new BigDecimal(26), 20, 0));
- assertThat(row.getString(15).toString()).isEqualTo("f45");
+ assertThat(row.getString(15)).hasToString("f45");
}
cnt.incrementAndGet();
});
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetFileSystemITCase.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetFileSystemITCase.java
index a10f57415d0..f6d9e6509a6 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetFileSystemITCase.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetFileSystemITCase.java
@@ -101,11 +101,11 @@ public class ParquetFileSystemITCase extends BatchFileSystemITCaseBase {
ParquetMetadata footer =
readFooter(new Configuration(), path, range(0, Long.MAX_VALUE));
if (configure) {
- assertThat(footer.getBlocks().get(0).getColumns().get(0).getCodec().toString())
- .isEqualTo("GZIP");
+ assertThat(footer.getBlocks().get(0).getColumns().get(0).getCodec())
+ .hasToString("GZIP");
} else {
- assertThat(footer.getBlocks().get(0).getColumns().get(0).getCodec().toString())
- .isEqualTo("SNAPPY");
+ assertThat(footer.getBlocks().get(0).getColumns().get(0).getCodec())
+ .hasToString("SNAPPY");
}
} catch (IOException e) {
throw new RuntimeException(e);
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetFormatStatisticsReportTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetFormatStatisticsReportTest.java
index 7e06b79367a..29ebd14c445 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetFormatStatisticsReportTest.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetFormatStatisticsReportTest.java
@@ -69,7 +69,7 @@ public class ParquetFormatStatisticsReportTest extends StatisticsReportTestBase
// insert data and get statistics.
DataType dataType = tEnv.from("sourceTable").getResolvedSchema().toPhysicalRowDataType();
tEnv.fromValues(dataType, getData()).executeInsert("sourceTable").await();
- assertThat(folder.listFiles()).isNotNull().hasSize(1);
+ assertThat(folder.listFiles()).hasSize(1);
File[] files = folder.listFiles();
assert files != null;
TableStats tableStats =
@@ -84,7 +84,7 @@ public class ParquetFormatStatisticsReportTest extends StatisticsReportTestBase
DataType dataType = tEnv.from("sourceTable").getResolvedSchema().toPhysicalRowDataType();
tEnv.fromValues(dataType, getData()).executeInsert("sourceTable").await();
tEnv.fromValues(dataType, getData()).executeInsert("sourceTable").await();
- assertThat(folder.listFiles()).isNotNull().hasSize(2);
+ assertThat(folder.listFiles()).hasSize(2);
File[] files = folder.listFiles();
List<Path> paths = new ArrayList<>();
assert files != null;
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetFileReadITCase.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetFileReadITCase.java
index b436eb59c0b..abd79903469 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetFileReadITCase.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetFileReadITCase.java
@@ -24,16 +24,19 @@ import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.parquet.ParquetWriterFactory;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.util.CloseableIterator;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
import java.time.Duration;
@@ -47,18 +50,28 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.assertj.core.api.Assertions.assertThat;
/** ITCase for {@link AvroParquetRecordFormat}. */
-public class AvroParquetFileReadITCase extends AbstractTestBase {
+class AvroParquetFileReadITCase {
private static final int PARALLELISM = 4;
private static final String USER_PARQUET_FILE_1 = "user1.parquet";
private static final String USER_PARQUET_FILE_2 = "user2.parquet";
private static final String USER_PARQUET_FILE_3 = "user3.parquet";
+ @TempDir static java.nio.file.Path temporaryFolder;
+
private static Schema schema;
private static final List<GenericRecord> userRecords = new ArrayList<>(3);
- @BeforeClass
- public static void setup() throws IOException {
+ @RegisterExtension
+ public static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+ new MiniClusterExtension(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(PARALLELISM)
+ .build());
+
+ @BeforeAll
+ static void setup() throws IOException {
// Generic records
schema =
new Schema.Parser()
@@ -79,14 +92,14 @@ public class AvroParquetFileReadITCase extends AbstractTestBase {
createParquetFile(
AvroParquetWriters.forGenericRecord(schema),
- Path.fromLocalFile(TEMPORARY_FOLDER.newFile(USER_PARQUET_FILE_1)),
+ Path.fromLocalFile(temporaryFolder.resolve(USER_PARQUET_FILE_1).toFile()),
userRecords.toArray(new GenericRecord[0]));
GenericRecord user = createUser("Max", 4, "blue");
userRecords.add(user);
createParquetFile(
AvroParquetWriters.forGenericRecord(schema),
- Path.fromLocalFile(TEMPORARY_FOLDER.newFile(USER_PARQUET_FILE_2)),
+ Path.fromLocalFile(temporaryFolder.resolve(USER_PARQUET_FILE_2).toFile()),
user);
user = createUser("Alex", 5, "White");
@@ -96,17 +109,17 @@ public class AvroParquetFileReadITCase extends AbstractTestBase {
userRecords.add(user1);
createParquetFile(
AvroParquetWriters.forGenericRecord(schema),
- Path.fromLocalFile(TEMPORARY_FOLDER.newFile(USER_PARQUET_FILE_3)),
+ Path.fromLocalFile(temporaryFolder.resolve(USER_PARQUET_FILE_3).toFile()),
user,
user1);
}
@Test
- public void testReadAvroRecord() throws Exception {
+ void testReadAvroRecord() throws Exception {
final FileSource<GenericRecord> source =
FileSource.forRecordStreamFormat(
AvroParquetReaders.forGenericRecord(schema),
- Path.fromLocalFile(TEMPORARY_FOLDER.getRoot()))
+ Path.fromLocalFile(temporaryFolder.toFile()))
.monitorContinuously(Duration.ofMillis(5))
.build();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -128,12 +141,12 @@ public class AvroParquetFileReadITCase extends AbstractTestBase {
}
@Test
- public void testReadAvroReflectRecord() throws Exception {
+ void testReadAvroReflectRecord() throws Exception {
final FileSource<AvroParquetRecordFormatTest.User> source =
FileSource.forRecordStreamFormat(
AvroParquetReaders.forReflectRecord(
AvroParquetRecordFormatTest.User.class),
- Path.fromLocalFile(TEMPORARY_FOLDER.getRoot()))
+ Path.fromLocalFile(temporaryFolder.toFile()))
.monitorContinuously(Duration.ofMillis(5))
.build();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetStreamingFileSinkITCase.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetStreamingFileSinkITCase.java
index 4bc0820c134..91de5fbc4fe 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetStreamingFileSinkITCase.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetStreamingFileSinkITCase.java
@@ -27,7 +27,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.UniqueBucketAssigner;
import org.apache.flink.streaming.util.FiniteTestSource;
-import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
@@ -39,9 +39,9 @@ import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.InputFile;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.io.IOException;
@@ -59,15 +59,11 @@ import static org.assertj.core.api.Assertions.assertThat;
* Simple integration test case for writing bulk encoded files with the {@link StreamingFileSink}
* with Parquet.
*/
-@SuppressWarnings("serial")
-public class AvroParquetStreamingFileSinkITCase extends AbstractTestBase {
-
- @Rule public final Timeout timeoutPerTest = Timeout.seconds(20);
+@ExtendWith(MiniClusterExtension.class)
+class AvroParquetStreamingFileSinkITCase {
@Test
- public void testWriteParquetAvroSpecific() throws Exception {
-
- final File folder = TEMPORARY_FOLDER.newFolder();
+ void testWriteParquetAvroSpecific(@TempDir File folder) throws Exception {
final List<Address> data =
Arrays.asList(
@@ -95,9 +91,7 @@ public class AvroParquetStreamingFileSinkITCase extends AbstractTestBase {
}
@Test
- public void testWriteParquetAvroGeneric() throws Exception {
-
- final File folder = TEMPORARY_FOLDER.newFolder();
+ void testWriteParquetAvroGeneric(@TempDir File folder) throws Exception {
final Schema schema = Address.getClassSchema();
@@ -128,9 +122,7 @@ public class AvroParquetStreamingFileSinkITCase extends AbstractTestBase {
}
@Test
- public void testWriteParquetAvroReflect() throws Exception {
-
- final File folder = TEMPORARY_FOLDER.newFolder();
+ void testWriteParquetAvroReflect(@TempDir File folder) throws Exception {
final List<Datum> data =
Arrays.asList(new Datum("a", 1), new Datum("b", 2), new Datum("c", 3));
@@ -159,11 +151,9 @@ public class AvroParquetStreamingFileSinkITCase extends AbstractTestBase {
private static <T> void validateResults(File folder, GenericData dataModel, List<T> expected)
throws Exception {
File[] buckets = folder.listFiles();
- assertThat(buckets).isNotNull();
assertThat(buckets).hasSize(1);
File[] partFiles = buckets[0].listFiles();
- assertThat(partFiles).isNotNull();
assertThat(partFiles).hasSize(2);
for (File partFile : partFiles) {
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/protobuf/ParquetProtoStreamingFileSinkITCase.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/protobuf/ParquetProtoStreamingFileSinkITCase.java
index 013f11c8384..3b70aa158c3 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/protobuf/ParquetProtoStreamingFileSinkITCase.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/protobuf/ParquetProtoStreamingFileSinkITCase.java
@@ -25,15 +25,15 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.UniqueBucketAssigner;
import org.apache.flink.streaming.util.FiniteTestSource;
-import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.test.junit5.MiniClusterExtension;
import com.google.protobuf.Message;
import com.google.protobuf.MessageOrBuilder;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.proto.ProtoParquetReader;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.io.IOException;
@@ -49,13 +49,11 @@ import static org.assertj.core.api.Assertions.assertThat;
* Simple integration test case for writing bulk encoded files with the {@link StreamingFileSink}
* with Parquet.
*/
-public class ParquetProtoStreamingFileSinkITCase extends AbstractTestBase {
-
- @Rule public final Timeout timeoutPerTest = Timeout.seconds(20);
+@ExtendWith(MiniClusterExtension.class)
+class ParquetProtoStreamingFileSinkITCase {
@Test
- public void testParquetProtoWriters() throws Exception {
- File folder = TEMPORARY_FOLDER.newFolder();
+ void testParquetProtoWriters(@TempDir File folder) throws Exception {
List<SimpleProtoRecord> data =
Arrays.asList(
@@ -88,11 +86,9 @@ public class ParquetProtoStreamingFileSinkITCase extends AbstractTestBase {
private static <T extends MessageOrBuilder> void validateResults(File folder, List<T> expected)
throws Exception {
File[] buckets = folder.listFiles();
- assertThat(buckets).isNotNull();
assertThat(buckets).hasSize(1);
File[] partFiles = buckets[0].listFiles();
- assertThat(partFiles).isNotNull();
assertThat(partFiles).hasSize(2);
for (File partFile : partFiles) {
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java
index 902b41249ce..38edb278ad8 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java
@@ -53,9 +53,8 @@ import org.apache.parquet.hadoop.ParquetOutputFormat;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.InputFile;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.io.IOException;
@@ -72,9 +71,7 @@ import java.util.stream.IntStream;
import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link ParquetRowDataBuilder} and {@link ParquetRowDataWriter}. */
-public class ParquetRowDataWriterTest {
-
- @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+class ParquetRowDataWriterTest {
private static final RowType ROW_TYPE =
RowType.of(
@@ -112,26 +109,27 @@ public class ParquetRowDataWriterTest {
TypeConversions.fromLogicalToDataType(ROW_TYPE));
@Test
- public void testTypes() throws Exception {
+ void testTypes(@TempDir java.nio.file.Path folder) throws Exception {
Configuration conf = new Configuration();
- innerTest(conf, true);
- innerTest(conf, false);
- complexTypeTest(conf, true);
- complexTypeTest(conf, false);
+ innerTest(folder, conf, true);
+ innerTest(folder, conf, false);
+ complexTypeTest(folder, conf, true);
+ complexTypeTest(folder, conf, false);
}
@Test
- public void testCompression() throws Exception {
+ void testCompression(@TempDir java.nio.file.Path folder) throws Exception {
Configuration conf = new Configuration();
conf.set(ParquetOutputFormat.COMPRESSION, "GZIP");
- innerTest(conf, true);
- innerTest(conf, false);
- complexTypeTest(conf, true);
- complexTypeTest(conf, false);
+ innerTest(folder, conf, true);
+ innerTest(folder, conf, false);
+ complexTypeTest(folder, conf, true);
+ complexTypeTest(folder, conf, false);
}
- private void innerTest(Configuration conf, boolean utcTimestamp) throws IOException {
- Path path = new Path(TEMPORARY_FOLDER.newFolder().getPath(), UUID.randomUUID().toString());
+ private void innerTest(java.nio.file.Path folder, Configuration conf, boolean utcTimestamp)
+ throws IOException {
+ Path path = new Path(folder.toString(), UUID.randomUUID().toString());
int number = 1000;
List<Row> rows = new ArrayList<>(number);
for (int i = 0; i < number; i++) {
@@ -188,8 +186,9 @@ public class ParquetRowDataWriterTest {
assertThat(cnt).isEqualTo(number);
}
- public void complexTypeTest(Configuration conf, boolean utcTimestamp) throws Exception {
- Path path = new Path(TEMPORARY_FOLDER.newFolder().getPath(), UUID.randomUUID().toString());
+ public void complexTypeTest(java.nio.file.Path folder, Configuration conf, boolean utcTimestamp)
+ throws Exception {
+ Path path = new Path(folder.toString(), UUID.randomUUID().toString());
int number = 1000;
List<Row> rows = new ArrayList<>(number);
Map<String, String> mapData = new HashMap<>();
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/SerializableConfigurationTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/SerializableConfigurationTest.java
index 7f29899baa9..b51a23dd644 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/SerializableConfigurationTest.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/SerializableConfigurationTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.formats.parquet.utils;
import org.apache.hadoop.conf.Configuration;
import org.jetbrains.annotations.Nullable;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.net.URL;
@@ -29,10 +29,10 @@ import static org.apache.flink.util.InstantiationUtil.deserializeObject;
import static org.apache.flink.util.InstantiationUtil.serializeObject;
/** Test for {@link SerializableConfiguration}. */
-public class SerializableConfigurationTest {
+class SerializableConfigurationTest {
@Test
- public void testResource() throws IOException, ClassNotFoundException {
+ void testResource() throws IOException, ClassNotFoundException {
ClassLoader cl =
new ClassLoader(Thread.currentThread().getContextClassLoader()) {
@Nullable
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/vector/ParquetColumnarRowSplitReaderTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/vector/ParquetColumnarRowSplitReaderTest.java
index d0e588b0339..1c9a5ed7d4d 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/vector/ParquetColumnarRowSplitReaderTest.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/vector/ParquetColumnarRowSplitReaderTest.java
@@ -52,11 +52,9 @@ import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.table.utils.DateTimeUtils;
import org.apache.hadoop.conf.Configuration;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
import java.io.File;
import java.io.IOException;
@@ -76,8 +74,7 @@ import java.util.stream.IntStream;
import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link ParquetColumnarRowSplitReader}. */
-@RunWith(Parameterized.class)
-public class ParquetColumnarRowSplitReaderTest {
+class ParquetColumnarRowSplitReaderTest {
private static final int FIELD_NUMBER = 33;
private static final LocalDateTime BASE_TIME = LocalDateTime.now();
@@ -120,21 +117,15 @@ public class ParquetColumnarRowSplitReaderTest {
new MapType(new IntType(), new BooleanType()),
RowType.of(new VarCharType(VarCharType.MAX_LENGTH), new IntType()));
- @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+ @TempDir File tmpDir;
- private final int rowGroupSize;
-
- @Parameterized.Parameters(name = "rowGroupSize-{0}")
public static Collection<Integer> parameters() {
return Arrays.asList(10, 1000);
}
- public ParquetColumnarRowSplitReaderTest(int rowGroupSize) {
- this.rowGroupSize = rowGroupSize;
- }
-
- @Test
- public void testNormalTypesReadWithSplits() throws IOException {
+ @ParameterizedTest
+ @MethodSource("parameters")
+ void testNormalTypesReadWithSplits(int rowGroupSize) throws IOException {
// prepare parquet file
int number = 10000;
List<RowData> records = new ArrayList<>(number);
@@ -151,11 +142,12 @@ public class ParquetColumnarRowSplitReaderTest {
}
}
- testNormalTypes(number, records, values);
+ testNormalTypes(number, records, values, rowGroupSize);
}
- @Test
- public void testReachEnd() throws Exception {
+ @ParameterizedTest
+ @MethodSource("parameters")
+ void testReachEnd(int rowGroupSize) throws Exception {
// prepare parquet file
int number = 5;
List<RowData> records = new ArrayList<>(number);
@@ -168,8 +160,7 @@ public class ParquetColumnarRowSplitReaderTest {
records.add(newRow(v));
}
}
-
- Path testPath = createTempParquetFile(TEMPORARY_FOLDER.newFolder(), records, rowGroupSize);
+ Path testPath = createTempParquetFile(tmpDir, records, rowGroupSize);
ParquetColumnarRowSplitReader reader =
createReader(
testPath, 0, testPath.getFileSystem().getFileStatus(testPath).getLen());
@@ -198,9 +189,10 @@ public class ParquetColumnarRowSplitReaderTest {
return path;
}
- private void testNormalTypes(int number, List<RowData> records, List<Integer> values)
+ private void testNormalTypes(
+ int number, List<RowData> records, List<Integer> values, int rowGroupSize)
throws IOException {
- Path testPath = createTempParquetFile(TEMPORARY_FOLDER.newFolder(), records, rowGroupSize);
+ Path testPath = createTempParquetFile(tmpDir, records, rowGroupSize);
// test reading and splitting
long fileLen = testPath.getFileSystem().getFileStatus(testPath).getLen();
@@ -285,7 +277,7 @@ public class ParquetColumnarRowSplitReaderTest {
assertThat(row.isNullAt(31)).isTrue();
assertThat(row.isNullAt(32)).isTrue();
} else {
- assertThat(row.getString(0).toString()).isEqualTo("" + v);
+ assertThat(row.getString(0)).hasToString("" + v);
assertThat(row.getBoolean(1)).isEqualTo(v % 2 == 0);
assertThat(row.getByte(2)).isEqualTo(v.byteValue());
assertThat(row.getShort(3)).isEqualTo(v.shortValue());
@@ -317,7 +309,7 @@ public class ParquetColumnarRowSplitReaderTest {
.isEqualTo(DecimalData.fromBigDecimal(BigDecimal.valueOf(v), 15, 0));
assertThat(row.getDecimal(14, 20, 0))
.isEqualTo(DecimalData.fromBigDecimal(BigDecimal.valueOf(v), 20, 0));
- assertThat(row.getArray(15).getString(0).toString()).isEqualTo("" + v);
+ assertThat(row.getArray(15).getString(0)).hasToString("" + v);
assertThat(row.getArray(16).getBoolean(0)).isEqualTo(v % 2 == 0);
assertThat(row.getArray(17).getByte(0)).isEqualTo(v.byteValue());
assertThat(row.getArray(18).getShort(0)).isEqualTo(v.shortValue());
@@ -336,9 +328,9 @@ public class ParquetColumnarRowSplitReaderTest {
.isEqualTo(DecimalData.fromBigDecimal(BigDecimal.valueOf(v), 15, 0));
assertThat(row.getArray(29).getDecimal(0, 20, 0))
.isEqualTo(DecimalData.fromBigDecimal(BigDecimal.valueOf(v), 20, 0));
- assertThat(row.getMap(30).valueArray().getString(0).toString()).isEqualTo("" + v);
+ assertThat(row.getMap(30).valueArray().getString(0)).hasToString("" + v);
assertThat(row.getMap(31).valueArray().getBoolean(0)).isEqualTo(v % 2 == 0);
- assertThat(row.getRow(32, 2).getString(0).toString()).isEqualTo("" + v);
+ assertThat(row.getRow(32, 2).getString(0)).hasToString("" + v);
assertThat(row.getRow(32, 2).getInt(1)).isEqualTo(v.intValue());
}
i++;
@@ -418,8 +410,9 @@ public class ParquetColumnarRowSplitReaderTest {
return BASE_TIME.plusNanos(v).plusSeconds(v);
}
- @Test
- public void testDictionary() throws IOException {
+ @ParameterizedTest
+ @MethodSource("parameters")
+ void testDictionary(int rowGroupSize) throws IOException {
// prepare parquet file
int number = 10000;
List<RowData> records = new ArrayList<>(number);
@@ -441,11 +434,12 @@ public class ParquetColumnarRowSplitReaderTest {
}
}
- testNormalTypes(number, records, values);
+ testNormalTypes(number, records, values, rowGroupSize);
}
- @Test
- public void testPartialDictionary() throws IOException {
+ @ParameterizedTest
+ @MethodSource("parameters")
+ void testPartialDictionary(int rowGroupSize) throws IOException {
// prepare parquet file
int number = 10000;
List<RowData> records = new ArrayList<>(number);
@@ -467,11 +461,12 @@ public class ParquetColumnarRowSplitReaderTest {
}
}
- testNormalTypes(number, records, values);
+ testNormalTypes(number, records, values, rowGroupSize);
}
- @Test
- public void testContinuousRepetition() throws IOException {
+ @ParameterizedTest
+ @MethodSource("parameters")
+ void testContinuousRepetition(int rowGroupSize) throws IOException {
// prepare parquet file
int number = 10000;
List<RowData> records = new ArrayList<>(number);
@@ -490,11 +485,12 @@ public class ParquetColumnarRowSplitReaderTest {
}
}
- testNormalTypes(number, records, values);
+ testNormalTypes(number, records, values, rowGroupSize);
}
- @Test
- public void testLargeValue() throws IOException {
+ @ParameterizedTest
+ @MethodSource("parameters")
+ void testLargeValue(int rowGroupSize) throws IOException {
// prepare parquet file
int number = 10000;
List<RowData> records = new ArrayList<>(number);
@@ -511,11 +507,12 @@ public class ParquetColumnarRowSplitReaderTest {
}
}
- testNormalTypes(number, records, values);
+ testNormalTypes(number, records, values, rowGroupSize);
}
- @Test
- public void testProject() throws IOException {
+ @ParameterizedTest
+ @MethodSource("parameters")
+ void testProject(int rowGroupSize) throws IOException {
// prepare parquet file
int number = 1000;
List<RowData> records = new ArrayList<>(number);
@@ -523,7 +520,7 @@ public class ParquetColumnarRowSplitReaderTest {
Integer v = i;
records.add(newRow(v));
}
- Path testPath = createTempParquetFile(TEMPORARY_FOLDER.newFolder(), records, rowGroupSize);
+ Path testPath = createTempParquetFile(tmpDir, records, rowGroupSize);
RowType rowType = RowType.of(new DoubleType(), new TinyIntType(), new IntType());
// test reader
ParquetColumnarRowSplitReader reader =
@@ -549,8 +546,9 @@ public class ParquetColumnarRowSplitReaderTest {
reader.close();
}
- @Test
- public void testPartitionValues() throws IOException {
+ @ParameterizedTest
+ @MethodSource("parameters")
+ void testPartitionValues(int rowGroupSize) throws IOException {
// prepare parquet file
int number = 1000;
List<RowData> records = new ArrayList<>(number);
@@ -558,7 +556,8 @@ public class ParquetColumnarRowSplitReaderTest {
Integer v = i;
records.add(newRow(v));
}
- Path testPath = createTempParquetFile(TEMPORARY_FOLDER.newFolder(), records, rowGroupSize);
+
+ Path testPath = createTempParquetFile(tmpDir, records, rowGroupSize);
// test reader
Map<String, Object> partSpec = new HashMap<>();
@@ -576,17 +575,18 @@ public class ParquetColumnarRowSplitReaderTest {
partSpec.put("f44", new BigDecimal(44));
partSpec.put("f45", "f45");
- innerTestPartitionValues(testPath, partSpec, false);
+ innerTestPartitionValues(testPath, partSpec, false, rowGroupSize);
for (String k : new ArrayList<>(partSpec.keySet())) {
partSpec.put(k, null);
}
- innerTestPartitionValues(testPath, partSpec, true);
+ innerTestPartitionValues(testPath, partSpec, true, rowGroupSize);
}
private void innerTestPartitionValues(
- Path testPath, Map<String, Object> partSpec, boolean nullPartValue) throws IOException {
+ Path testPath, Map<String, Object> partSpec, boolean nullPartValue, int rowGroupSize)
+ throws IOException {
LogicalType[] fieldTypes =
new LogicalType[] {
new VarCharType(VarCharType.MAX_LENGTH),
@@ -685,7 +685,7 @@ public class ParquetColumnarRowSplitReaderTest {
.isEqualTo(DecimalData.fromBigDecimal(new BigDecimal(43), 15, 0));
assertThat(row.getDecimal(14, 20, 0))
.isEqualTo(DecimalData.fromBigDecimal(new BigDecimal(44), 20, 0));
- assertThat(row.getString(15).toString()).isEqualTo("f45");
+ assertThat(row.getString(15)).hasToString("f45");
}
i++;
diff --git a/flink-formats/flink-parquet/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension b/flink-formats/flink-parquet/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
new file mode 100644
index 00000000000..28999133c2b
--- /dev/null
+++ b/flink-formats/flink-parquet/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.util.TestLoggerExtension
\ No newline at end of file