You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2022/03/11 07:06:12 UTC
[flink] 01/02: [FLINK-26349][AvroParquet][test] add test for reading reflect records from parquet file created with generic record schema.
This is an automated email from the ASF dual-hosted git repository.
arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit e7d9c59d68644a943fc5d059c4a7a9fa3366da10
Author: Jing Ge <ge...@gmail.com>
AuthorDate: Wed Mar 9 15:08:43 2022 +0100
[FLINK-26349][AvroParquet][test] add test for reading reflect records from parquet file created with generic record schema.
---
.../parquet/avro/AvroParquetFileReadITCase.java | 54 +++++++++++++++++++---
.../parquet/avro/AvroParquetRecordFormatTest.java | 53 ++++++++++++++++++++-
2 files changed, 99 insertions(+), 8 deletions(-)
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetFileReadITCase.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetFileReadITCase.java
index 8ba5a40..1f8e812 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetFileReadITCase.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetFileReadITCase.java
@@ -32,16 +32,19 @@ import org.apache.flink.util.CloseableIterator;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
-import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -53,17 +56,18 @@ public class AvroParquetFileReadITCase extends AbstractTestBase {
private static final String USER_PARQUET_FILE_2 = "user2.parquet";
private static final String USER_PARQUET_FILE_3 = "user3.parquet";
- private Schema schema;
- private final List<GenericRecord> userRecords = new ArrayList<>(3);
+ private static Schema schema;
+ private static final List<GenericRecord> userRecords = new ArrayList<>(3);
- @Before
- public void setup() throws IOException {
+ @BeforeClass
+ public static void setup() throws IOException {
// Generic records
schema =
new Schema.Parser()
.parse(
"{\"type\": \"record\", "
+ "\"name\": \"User\", "
+ + "\"namespace\": \"org.apache.flink.formats.parquet.avro.AvroParquetRecordFormatTest\", "
+ "\"fields\": [\n"
+ " {\"name\": \"name\", \"type\": \"string\" },\n"
+ " {\"name\": \"favoriteNumber\", \"type\": [\"int\", \"null\"] },\n"
@@ -125,6 +129,36 @@ public class AvroParquetFileReadITCase extends AbstractTestBase {
}
}
+ @Test
+ public void testReadAvroReflectRecord() throws Exception {
+ final FileSource<AvroParquetRecordFormatTest.User> source =
+ FileSource.forRecordStreamFormat(
+ AvroParquetReaders.forReflectRecord(
+ AvroParquetRecordFormatTest.User.class),
+ Path.fromLocalFile(TEMPORARY_FOLDER.getRoot()))
+ .monitorContinuously(Duration.ofMillis(5))
+ .build();
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(PARALLELISM);
+ env.enableCheckpointing(10L);
+
+ DataStream<AvroParquetRecordFormatTest.User> stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
+
+ try (CloseableIterator<AvroParquetRecordFormatTest.User> iterator =
+ stream.executeAndCollect("Reading Avro Reflect Records")) {
+ List<AvroParquetRecordFormatTest.User> list = collectRecords(iterator, 6);
+ Collections.sort(
+ list,
+ Comparator.comparing(AvroParquetRecordFormatTest.User::getFavoriteNumber));
+ assertEquals(list.size(), 6);
+
+ for (int i = 0; i < 6; i++) {
+ assertUserEquals(list.get(i), userRecords.get(i));
+ }
+ }
+ }
+
private static <E> List<E> collectRecords(
final CloseableIterator<E> iterator, final int numElements) {
@@ -161,7 +195,15 @@ public class AvroParquetFileReadITCase extends AbstractTestBase {
writer.finish();
}
- private GenericRecord createUser(String name, int favoriteNumber, String favoriteColor) {
+ private void assertUserEquals(AvroParquetRecordFormatTest.User user, GenericRecord expected) {
+ assertThat(user).isNotNull();
+ assertThat(String.valueOf(user.getName())).isNotNull().isEqualTo(expected.get("name"));
+ assertThat(user.getFavoriteNumber()).isEqualTo(expected.get("favoriteNumber"));
+ assertThat(String.valueOf(user.getFavoriteColor()))
+ .isEqualTo(String.valueOf(expected.get("favoriteColor")));
+ }
+
+ private static GenericRecord createUser(String name, int favoriteNumber, String favoriteColor) {
GenericRecord record = new GenericData.Record(schema);
record.put("name", name);
record.put("favoriteNumber", favoriteNumber);
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormatTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormatTest.java
index 38b5396..92b1269 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormatTest.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormatTest.java
@@ -84,6 +84,7 @@ class AvroParquetRecordFormatTest {
.parse(
"{\"type\": \"record\", "
+ "\"name\": \"User\", "
+ + "\"namespace\": \"org.apache.flink.formats.parquet.avro.AvroParquetRecordFormatTest\", "
+ "\"fields\": [\n"
+ " {\"name\": \"name\", \"type\": \"string\" },\n"
+ " {\"name\": \"favoriteNumber\", \"type\": [\"int\", \"null\"] },\n"
@@ -141,6 +142,21 @@ class AvroParquetRecordFormatTest {
}
@Test
+ void testReflectReadFromGenericRecords() throws IOException {
+ StreamFormat.Reader<User> reader =
+ createReader(
+ AvroParquetReaders.forReflectRecord(User.class),
+ new Configuration(),
+ userPath,
+ 0,
+ userPath.getFileSystem().getFileStatus(userPath).getLen());
+ for (GenericRecord record : userRecords) {
+ User user = reader.read();
+ assertUserEquals(Objects.requireNonNull(user), record);
+ }
+ }
+
+ @Test
void testCreateGenericReader() throws IOException {
StreamFormat.Reader<GenericRecord> reader =
createReader(
@@ -309,9 +325,16 @@ class AvroParquetRecordFormatTest {
}
private void assertUserEquals(GenericRecord user, GenericRecord expected) {
- assertEquals(user.get("name").toString(), expected.get("name"));
+ assertEquals(user.get("name").toString(), expected.get("name").toString());
assertEquals(user.get("favoriteNumber"), expected.get("favoriteNumber"));
- assertEquals(user.get("favoriteColor").toString(), expected.get("favoriteColor"));
+ assertEquals(
+ user.get("favoriteColor").toString(), expected.get("favoriteColor").toString());
+ }
+
+ private void assertUserEquals(User user, GenericRecord expected) {
+ assertEquals(user.getName(), expected.get("name").toString());
+ assertEquals(user.getFavoriteNumber(), expected.get("favoriteNumber"));
+ assertEquals(user.getFavoriteColor(), expected.get("favoriteColor").toString());
}
private static List<Address> createAddressList() {
@@ -324,4 +347,30 @@ class AvroParquetRecordFormatTest {
private static List<Datum> createDatumList() {
return Arrays.asList(new Datum("a", 1), new Datum("b", 2), new Datum("c", 3));
}
+
+ private static final class User {
+ private String name;
+ private Integer favoriteNumber;
+ private String favoriteColor;
+
+ public User() {}
+
+ public User(String name, Integer favoriteNumber, String favoriteColor) {
+ this.name = name;
+ this.favoriteNumber = favoriteNumber;
+ this.favoriteColor = favoriteColor;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public Integer getFavoriteNumber() {
+ return favoriteNumber;
+ }
+
+ public String getFavoriteColor() {
+ return favoriteColor;
+ }
+ }
}