You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2022/03/11 07:06:12 UTC

[flink] 01/02: [FLINK-26349][AvroParquet][test] add test for reading reflect records from parquet file created with generic record schema.

This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e7d9c59d68644a943fc5d059c4a7a9fa3366da10
Author: Jing Ge <ge...@gmail.com>
AuthorDate: Wed Mar 9 15:08:43 2022 +0100

    [FLINK-26349][AvroParquet][test] add test for reading reflect records from parquet file created with generic record schema.
---
 .../parquet/avro/AvroParquetFileReadITCase.java    | 54 +++++++++++++++++++---
 .../parquet/avro/AvroParquetRecordFormatTest.java  | 53 ++++++++++++++++++++-
 2 files changed, 99 insertions(+), 8 deletions(-)

diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetFileReadITCase.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetFileReadITCase.java
index 8ba5a40..1f8e812 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetFileReadITCase.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetFileReadITCase.java
@@ -32,16 +32,19 @@ import org.apache.flink.util.CloseableIterator;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
-import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -53,17 +56,18 @@ public class AvroParquetFileReadITCase extends AbstractTestBase {
     private static final String USER_PARQUET_FILE_2 = "user2.parquet";
     private static final String USER_PARQUET_FILE_3 = "user3.parquet";
 
-    private Schema schema;
-    private final List<GenericRecord> userRecords = new ArrayList<>(3);
+    private static Schema schema;
+    private static final List<GenericRecord> userRecords = new ArrayList<>(3);
 
-    @Before
-    public void setup() throws IOException {
+    @BeforeClass
+    public static void setup() throws IOException {
         // Generic records
         schema =
                 new Schema.Parser()
                         .parse(
                                 "{\"type\": \"record\", "
                                         + "\"name\": \"User\", "
+                                        + "\"namespace\": \"org.apache.flink.formats.parquet.avro.AvroParquetRecordFormatTest\", "
                                         + "\"fields\": [\n"
                                         + "        {\"name\": \"name\", \"type\": \"string\" },\n"
                                         + "        {\"name\": \"favoriteNumber\",  \"type\": [\"int\", \"null\"] },\n"
@@ -125,6 +129,36 @@ public class AvroParquetFileReadITCase extends AbstractTestBase {
         }
     }
 
+    @Test
+    public void testReadAvroReflectRecord() throws Exception {
+        final FileSource<AvroParquetRecordFormatTest.User> source =
+                FileSource.forRecordStreamFormat(
+                                AvroParquetReaders.forReflectRecord(
+                                        AvroParquetRecordFormatTest.User.class),
+                                Path.fromLocalFile(TEMPORARY_FOLDER.getRoot()))
+                        .monitorContinuously(Duration.ofMillis(5))
+                        .build();
+        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(PARALLELISM);
+        env.enableCheckpointing(10L);
+
+        DataStream<AvroParquetRecordFormatTest.User> stream =
+                env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
+
+        try (CloseableIterator<AvroParquetRecordFormatTest.User> iterator =
+                stream.executeAndCollect("Reading Avro Reflect Records")) {
+            List<AvroParquetRecordFormatTest.User> list = collectRecords(iterator, 6);
+            Collections.sort(
+                    list,
+                    Comparator.comparing(AvroParquetRecordFormatTest.User::getFavoriteNumber));
+            assertEquals(list.size(), 6);
+
+            for (int i = 0; i < 6; i++) {
+                assertUserEquals(list.get(i), userRecords.get(i));
+            }
+        }
+    }
+
     private static <E> List<E> collectRecords(
             final CloseableIterator<E> iterator, final int numElements) {
 
@@ -161,7 +195,15 @@ public class AvroParquetFileReadITCase extends AbstractTestBase {
         writer.finish();
     }
 
-    private GenericRecord createUser(String name, int favoriteNumber, String favoriteColor) {
+    private void assertUserEquals(AvroParquetRecordFormatTest.User user, GenericRecord expected) {
+        assertThat(user).isNotNull();
+        assertThat(String.valueOf(user.getName())).isNotNull().isEqualTo(expected.get("name"));
+        assertThat(user.getFavoriteNumber()).isEqualTo(expected.get("favoriteNumber"));
+        assertThat(String.valueOf(user.getFavoriteColor()))
+                .isEqualTo(String.valueOf(expected.get("favoriteColor")));
+    }
+
+    private static GenericRecord createUser(String name, int favoriteNumber, String favoriteColor) {
         GenericRecord record = new GenericData.Record(schema);
         record.put("name", name);
         record.put("favoriteNumber", favoriteNumber);
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormatTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormatTest.java
index 38b5396..92b1269 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormatTest.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormatTest.java
@@ -84,6 +84,7 @@ class AvroParquetRecordFormatTest {
                         .parse(
                                 "{\"type\": \"record\", "
                                         + "\"name\": \"User\", "
+                                        + "\"namespace\": \"org.apache.flink.formats.parquet.avro.AvroParquetRecordFormatTest\", "
                                         + "\"fields\": [\n"
                                         + "        {\"name\": \"name\", \"type\": \"string\" },\n"
                                         + "        {\"name\": \"favoriteNumber\",  \"type\": [\"int\", \"null\"] },\n"
@@ -141,6 +142,21 @@ class AvroParquetRecordFormatTest {
     }
 
     @Test
+    void testReflectReadFromGenericRecords() throws IOException {
+        StreamFormat.Reader<User> reader =
+                createReader(
+                        AvroParquetReaders.forReflectRecord(User.class),
+                        new Configuration(),
+                        userPath,
+                        0,
+                        userPath.getFileSystem().getFileStatus(userPath).getLen());
+        for (GenericRecord record : userRecords) {
+            User user = reader.read();
+            assertUserEquals(Objects.requireNonNull(user), record);
+        }
+    }
+
+    @Test
     void testCreateGenericReader() throws IOException {
         StreamFormat.Reader<GenericRecord> reader =
                 createReader(
@@ -309,9 +325,16 @@ class AvroParquetRecordFormatTest {
     }
 
     private void assertUserEquals(GenericRecord user, GenericRecord expected) {
-        assertEquals(user.get("name").toString(), expected.get("name"));
+        assertEquals(user.get("name").toString(), expected.get("name").toString());
         assertEquals(user.get("favoriteNumber"), expected.get("favoriteNumber"));
-        assertEquals(user.get("favoriteColor").toString(), expected.get("favoriteColor"));
+        assertEquals(
+                user.get("favoriteColor").toString(), expected.get("favoriteColor").toString());
+    }
+
+    private void assertUserEquals(User user, GenericRecord expected) {
+        assertEquals(user.getName(), expected.get("name").toString());
+        assertEquals(user.getFavoriteNumber(), expected.get("favoriteNumber"));
+        assertEquals(user.getFavoriteColor(), expected.get("favoriteColor").toString());
     }
 
     private static List<Address> createAddressList() {
@@ -324,4 +347,30 @@ class AvroParquetRecordFormatTest {
     private static List<Datum> createDatumList() {
         return Arrays.asList(new Datum("a", 1), new Datum("b", 2), new Datum("c", 3));
     }
+
+    private static final class User {
+        private String name;
+        private Integer favoriteNumber;
+        private String favoriteColor;
+
+        public User() {}
+
+        public User(String name, Integer favoriteNumber, String favoriteColor) {
+            this.name = name;
+            this.favoriteNumber = favoriteNumber;
+            this.favoriteColor = favoriteColor;
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public Integer getFavoriteNumber() {
+            return favoriteNumber;
+        }
+
+        public String getFavoriteColor() {
+            return favoriteColor;
+        }
+    }
 }