You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2022/03/11 07:06:11 UTC

[flink] branch master updated (155f17d -> 128e191)

This is an automated email from the ASF dual-hosted git repository.

arvid pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 155f17d  [FLINK-26564][connectors/filesystem] Fix the bug that CompactCoordinatorStateHandler doesn't properly handle the cleanup-in-progress requests.
     new e7d9c59  [FLINK-26349][AvroParquet][test] add test for reading reflect records from parquet file created with generic record schema.
     new 128e191  [FLINK-26349][test] migrate AvroParquetRecordFormatTest to AssertJ

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../parquet/avro/AvroParquetFileReadITCase.java    |  54 ++++++-
 .../parquet/avro/AvroParquetRecordFormatTest.java  | 171 ++++++++++++++-------
 2 files changed, 163 insertions(+), 62 deletions(-)

[flink] 01/02: [FLINK-26349][AvroParquet][test] add test for reading reflect records from parquet file created with generic record schema.

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e7d9c59d68644a943fc5d059c4a7a9fa3366da10
Author: Jing Ge <ge...@gmail.com>
AuthorDate: Wed Mar 9 15:08:43 2022 +0100

    [FLINK-26349][AvroParquet][test] add test for reading reflect records from parquet file created with generic record schema.
---
 .../parquet/avro/AvroParquetFileReadITCase.java    | 54 +++++++++++++++++++---
 .../parquet/avro/AvroParquetRecordFormatTest.java  | 53 ++++++++++++++++++++-
 2 files changed, 99 insertions(+), 8 deletions(-)

diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetFileReadITCase.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetFileReadITCase.java
index 8ba5a40..1f8e812 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetFileReadITCase.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetFileReadITCase.java
@@ -32,16 +32,19 @@ import org.apache.flink.util.CloseableIterator;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
-import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -53,17 +56,18 @@ public class AvroParquetFileReadITCase extends AbstractTestBase {
     private static final String USER_PARQUET_FILE_2 = "user2.parquet";
     private static final String USER_PARQUET_FILE_3 = "user3.parquet";
 
-    private Schema schema;
-    private final List<GenericRecord> userRecords = new ArrayList<>(3);
+    private static Schema schema;
+    private static final List<GenericRecord> userRecords = new ArrayList<>(3);
 
-    @Before
-    public void setup() throws IOException {
+    @BeforeClass
+    public static void setup() throws IOException {
         // Generic records
         schema =
                 new Schema.Parser()
                         .parse(
                                 "{\"type\": \"record\", "
                                         + "\"name\": \"User\", "
+                                        + "\"namespace\": \"org.apache.flink.formats.parquet.avro.AvroParquetRecordFormatTest\", "
                                         + "\"fields\": [\n"
                                         + "        {\"name\": \"name\", \"type\": \"string\" },\n"
                                         + "        {\"name\": \"favoriteNumber\",  \"type\": [\"int\", \"null\"] },\n"
@@ -125,6 +129,36 @@ public class AvroParquetFileReadITCase extends AbstractTestBase {
         }
     }
 
+    @Test
+    public void testReadAvroReflectRecord() throws Exception {
+        final FileSource<AvroParquetRecordFormatTest.User> source =
+                FileSource.forRecordStreamFormat(
+                                AvroParquetReaders.forReflectRecord(
+                                        AvroParquetRecordFormatTest.User.class),
+                                Path.fromLocalFile(TEMPORARY_FOLDER.getRoot()))
+                        .monitorContinuously(Duration.ofMillis(5))
+                        .build();
+        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(PARALLELISM);
+        env.enableCheckpointing(10L);
+
+        DataStream<AvroParquetRecordFormatTest.User> stream =
+                env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
+
+        try (CloseableIterator<AvroParquetRecordFormatTest.User> iterator =
+                stream.executeAndCollect("Reading Avro Reflect Records")) {
+            List<AvroParquetRecordFormatTest.User> list = collectRecords(iterator, 6);
+            Collections.sort(
+                    list,
+                    Comparator.comparing(AvroParquetRecordFormatTest.User::getFavoriteNumber));
+            assertEquals(list.size(), 6);
+
+            for (int i = 0; i < 6; i++) {
+                assertUserEquals(list.get(i), userRecords.get(i));
+            }
+        }
+    }
+
     private static <E> List<E> collectRecords(
             final CloseableIterator<E> iterator, final int numElements) {
 
@@ -161,7 +195,15 @@ public class AvroParquetFileReadITCase extends AbstractTestBase {
         writer.finish();
     }
 
-    private GenericRecord createUser(String name, int favoriteNumber, String favoriteColor) {
+    private void assertUserEquals(AvroParquetRecordFormatTest.User user, GenericRecord expected) {
+        assertThat(user).isNotNull();
+        assertThat(String.valueOf(user.getName())).isNotNull().isEqualTo(expected.get("name"));
+        assertThat(user.getFavoriteNumber()).isEqualTo(expected.get("favoriteNumber"));
+        assertThat(String.valueOf(user.getFavoriteColor()))
+                .isEqualTo(String.valueOf(expected.get("favoriteColor")));
+    }
+
+    private static GenericRecord createUser(String name, int favoriteNumber, String favoriteColor) {
         GenericRecord record = new GenericData.Record(schema);
         record.put("name", name);
         record.put("favoriteNumber", favoriteNumber);
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormatTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormatTest.java
index 38b5396..92b1269 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormatTest.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormatTest.java
@@ -84,6 +84,7 @@ class AvroParquetRecordFormatTest {
                         .parse(
                                 "{\"type\": \"record\", "
                                         + "\"name\": \"User\", "
+                                        + "\"namespace\": \"org.apache.flink.formats.parquet.avro.AvroParquetRecordFormatTest\", "
                                         + "\"fields\": [\n"
                                         + "        {\"name\": \"name\", \"type\": \"string\" },\n"
                                         + "        {\"name\": \"favoriteNumber\",  \"type\": [\"int\", \"null\"] },\n"
@@ -141,6 +142,21 @@ class AvroParquetRecordFormatTest {
     }
 
     @Test
+    void testReflectReadFromGenericRecords() throws IOException {
+        StreamFormat.Reader<User> reader =
+                createReader(
+                        AvroParquetReaders.forReflectRecord(User.class),
+                        new Configuration(),
+                        userPath,
+                        0,
+                        userPath.getFileSystem().getFileStatus(userPath).getLen());
+        for (GenericRecord record : userRecords) {
+            User user = reader.read();
+            assertUserEquals(Objects.requireNonNull(user), record);
+        }
+    }
+
+    @Test
     void testCreateGenericReader() throws IOException {
         StreamFormat.Reader<GenericRecord> reader =
                 createReader(
@@ -309,9 +325,16 @@ class AvroParquetRecordFormatTest {
     }
 
     private void assertUserEquals(GenericRecord user, GenericRecord expected) {
-        assertEquals(user.get("name").toString(), expected.get("name"));
+        assertEquals(user.get("name").toString(), expected.get("name").toString());
         assertEquals(user.get("favoriteNumber"), expected.get("favoriteNumber"));
-        assertEquals(user.get("favoriteColor").toString(), expected.get("favoriteColor"));
+        assertEquals(
+                user.get("favoriteColor").toString(), expected.get("favoriteColor").toString());
+    }
+
+    private void assertUserEquals(User user, GenericRecord expected) {
+        assertEquals(user.getName(), expected.get("name").toString());
+        assertEquals(user.getFavoriteNumber(), expected.get("favoriteNumber"));
+        assertEquals(user.getFavoriteColor(), expected.get("favoriteColor").toString());
     }
 
     private static List<Address> createAddressList() {
@@ -324,4 +347,30 @@ class AvroParquetRecordFormatTest {
     private static List<Datum> createDatumList() {
         return Arrays.asList(new Datum("a", 1), new Datum("b", 2), new Datum("c", 3));
     }
+
+    private static final class User {
+        private String name;
+        private Integer favoriteNumber;
+        private String favoriteColor;
+
+        public User() {}
+
+        public User(String name, Integer favoriteNumber, String favoriteColor) {
+            this.name = name;
+            this.favoriteNumber = favoriteNumber;
+            this.favoriteColor = favoriteColor;
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public Integer getFavoriteNumber() {
+            return favoriteNumber;
+        }
+
+        public String getFavoriteColor() {
+            return favoriteColor;
+        }
+    }
 }

[flink] 02/02: [FLINK-26349][test] migrate AvroParquetRecordFormatTest to AssertJ

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 128e19156208a306669f1736514f61d85f9ee065
Author: Jing Ge <ge...@gmail.com>
AuthorDate: Wed Mar 9 20:11:53 2022 +0100

    [FLINK-26349][test] migrate AvroParquetRecordFormatTest to AssertJ
---
 .../parquet/avro/AvroParquetRecordFormatTest.java  | 138 +++++++++++----------
 1 file changed, 74 insertions(+), 64 deletions(-)

diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormatTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormatTest.java
index 92b1269..3fac6ca 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormatTest.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormatTest.java
@@ -45,9 +45,8 @@ import java.util.List;
 import java.util.Objects;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /**
  * Unit test for {@link AvroParquetRecordFormat} and {@link
@@ -95,6 +94,7 @@ class AvroParquetRecordFormatTest {
         userRecords.add(createUser("Peter", 1, "red"));
         userRecords.add(createUser("Tom", 2, "yellow"));
         userRecords.add(createUser("Jack", 3, "green"));
+        userRecords.add(createUser("Max", null, null));
 
         userPath = new Path(temporaryFolder.resolve(USER_PARQUET_FILE).toUri());
         createParquetFile(AvroParquetWriters.forGenericRecord(schema), userPath, userRecords);
@@ -121,9 +121,9 @@ class AvroParquetRecordFormatTest {
                         addressPath,
                         0,
                         addressPath.getFileSystem().getFileStatus(addressPath).getLen());
-        for (Address address : addressRecords) {
-            Address address1 = Objects.requireNonNull(reader.read());
-            assertEquals(address1, address);
+        for (Address expected : addressRecords) {
+            Address address = Objects.requireNonNull(reader.read());
+            assertThat(address).isEqualTo(expected);
         }
     }
 
@@ -136,8 +136,8 @@ class AvroParquetRecordFormatTest {
                         datumPath,
                         0,
                         datumPath.getFileSystem().getFileStatus(datumPath).getLen());
-        for (Datum datum : datumRecords) {
-            assertEquals(Objects.requireNonNull(reader.read()), datum);
+        for (Datum expected : datumRecords) {
+            assertThat(reader.read()).isNotNull().isEqualTo(expected);
         }
     }
 
@@ -150,9 +150,8 @@ class AvroParquetRecordFormatTest {
                         userPath,
                         0,
                         userPath.getFileSystem().getFileStatus(userPath).getLen());
-        for (GenericRecord record : userRecords) {
-            User user = reader.read();
-            assertUserEquals(Objects.requireNonNull(user), record);
+        for (GenericRecord expected : userRecords) {
+            assertUserEquals(reader.read(), expected);
         }
     }
 
@@ -165,37 +164,37 @@ class AvroParquetRecordFormatTest {
                         userPath,
                         0,
                         userPath.getFileSystem().getFileStatus(userPath).getLen());
-        for (GenericRecord record : userRecords) {
-            assertUserEquals(Objects.requireNonNull(reader.read()), record);
+        for (GenericRecord expected : userRecords) {
+            assertUserEquals(reader.read(), expected);
         }
     }
 
     /** Expect exception since splitting is not supported now. */
     @Test
     void testCreateGenericReaderWithSplitting() {
-        assertThrows(
-                IllegalArgumentException.class,
-                () ->
-                        createReader(
-                                AvroParquetReaders.forGenericRecord(schema),
-                                new Configuration(),
-                                userPath,
-                                5,
-                                5));
+        assertThatThrownBy(
+                        () ->
+                                createReader(
+                                        AvroParquetReaders.forGenericRecord(schema),
+                                        new Configuration(),
+                                        userPath,
+                                        5,
+                                        5))
+                .isInstanceOf(IllegalArgumentException.class);
     }
 
     @Test
     void testRestoreGenericReaderWithWrongOffset() {
-        assertThrows(
-                IllegalArgumentException.class,
-                () ->
-                        restoreReader(
-                                AvroParquetReaders.forGenericRecord(schema),
-                                new Configuration(),
-                                userPath,
-                                10,
-                                0,
-                                userPath.getFileSystem().getFileStatus(userPath).getLen()));
+        assertThatThrownBy(
+                        () ->
+                                restoreReader(
+                                        AvroParquetReaders.forGenericRecord(schema),
+                                        new Configuration(),
+                                        userPath,
+                                        10,
+                                        0,
+                                        userPath.getFileSystem().getFileStatus(userPath).getLen()))
+                .isInstanceOf(IllegalArgumentException.class);
     }
 
     @Test
@@ -208,40 +207,40 @@ class AvroParquetRecordFormatTest {
                         CheckpointedPosition.NO_OFFSET,
                         0,
                         userPath.getFileSystem().getFileStatus(userPath).getLen());
-        for (GenericRecord record : userRecords) {
-            assertUserEquals(Objects.requireNonNull(reader.read()), record);
+        for (GenericRecord expected : userRecords) {
+            assertUserEquals(reader.read(), expected);
         }
     }
 
     @Test
     void testSplittable() {
-        assertFalse(AvroParquetReaders.forGenericRecord(schema).isSplittable());
+        assertThat(AvroParquetReaders.forGenericRecord(schema).isSplittable()).isFalse();
     }
 
     @Test
     void getProducedType() {
-        assertEquals(
-                AvroParquetReaders.forGenericRecord(schema).getProducedType().getTypeClass(),
-                GenericRecord.class);
+        assertThat(AvroParquetReaders.forGenericRecord(schema).getProducedType().getTypeClass())
+                .isEqualTo(GenericRecord.class);
     }
 
     @Test
     void getDataModel() {
-        assertEquals(
-                ((AvroParquetRecordFormat) AvroParquetReaders.forGenericRecord(schema))
-                        .getDataModel()
-                        .getClass(),
-                GenericData.class);
-        assertEquals(
-                ((AvroParquetRecordFormat) AvroParquetReaders.forSpecificRecord(Address.class))
-                        .getDataModel()
-                        .getClass(),
-                SpecificData.class);
-        assertEquals(
-                ((AvroParquetRecordFormat) AvroParquetReaders.forReflectRecord(Datum.class))
-                        .getDataModel()
-                        .getClass(),
-                ReflectData.class);
+        assertThat(
+                        ((AvroParquetRecordFormat) AvroParquetReaders.forGenericRecord(schema))
+                                .getDataModel()
+                                .getClass())
+                .isEqualTo(GenericData.class);
+        assertThat(
+                        ((AvroParquetRecordFormat)
+                                        AvroParquetReaders.forSpecificRecord(Address.class))
+                                .getDataModel()
+                                .getClass())
+                .isEqualTo(SpecificData.class);
+        assertThat(
+                        ((AvroParquetRecordFormat) AvroParquetReaders.forReflectRecord(Datum.class))
+                                .getDataModel()
+                                .getClass())
+                .isEqualTo(ReflectData.class);
     }
 
     // ------------------------------------------------------------------------
@@ -316,25 +315,36 @@ class AvroParquetRecordFormatTest {
         writer.finish();
     }
 
-    private static GenericRecord createUser(String name, int favoriteNumber, String favoriteColor) {
+    private static GenericRecord createUser(
+            String name, Integer favoriteNumber, String favoriteColor) {
         GenericRecord record = new GenericData.Record(schema);
         record.put("name", name);
-        record.put("favoriteNumber", favoriteNumber);
-        record.put("favoriteColor", favoriteColor);
+        if (favoriteNumber != null) {
+            record.put("favoriteNumber", favoriteNumber);
+        }
+
+        if (favoriteColor != null) {
+            record.put("favoriteColor", favoriteColor);
+        }
+
         return record;
     }
 
     private void assertUserEquals(GenericRecord user, GenericRecord expected) {
-        assertEquals(user.get("name").toString(), expected.get("name").toString());
-        assertEquals(user.get("favoriteNumber"), expected.get("favoriteNumber"));
-        assertEquals(
-                user.get("favoriteColor").toString(), expected.get("favoriteColor").toString());
+        assertThat(user).isNotNull();
+        assertThat(String.valueOf(user.get("name"))).isEqualTo(expected.get("name"));
+        assertThat(user.get("favoriteNumber")).isEqualTo(expected.get("favoriteNumber"));
+        // TODO use CharSequence.compare(...,...) after migrating to Java 11
+        assertThat(String.valueOf(user.get("favoriteColor")))
+                .isEqualTo(String.valueOf(expected.get("favoriteColor")));
     }
 
     private void assertUserEquals(User user, GenericRecord expected) {
-        assertEquals(user.getName(), expected.get("name").toString());
-        assertEquals(user.getFavoriteNumber(), expected.get("favoriteNumber"));
-        assertEquals(user.getFavoriteColor(), expected.get("favoriteColor").toString());
+        assertThat(user).isNotNull();
+        assertThat(String.valueOf(user.getName())).isNotNull().isEqualTo(expected.get("name"));
+        assertThat(user.getFavoriteNumber()).isEqualTo(expected.get("favoriteNumber"));
+        assertThat(String.valueOf(user.getFavoriteColor()))
+                .isEqualTo(String.valueOf(expected.get("favoriteColor")));
     }
 
     private static List<Address> createAddressList() {
@@ -348,7 +358,7 @@ class AvroParquetRecordFormatTest {
         return Arrays.asList(new Datum("a", 1), new Datum("b", 2), new Datum("c", 3));
     }
 
-    private static final class User {
+    public static final class User {
         private String name;
         private Integer favoriteNumber;
         private String favoriteColor;