You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/02/28 07:54:52 UTC

[flink] 02/02: [FLINK-26357][avro-parquet] make AvroParquetRecordFormat package private and update javadoc.

This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e2463ddace26601c6442dbaa68c198a27c9cfbcf
Author: Jing Ge <ge...@gmail.com>
AuthorDate: Fri Feb 25 12:31:03 2022 +0100

    [FLINK-26357][avro-parquet] make AvroParquetRecordFormat package private and update javadoc.
---
 .../flink/formats/parquet/avro/AvroParquetReaders.java  | 11 ++++++-----
 .../formats/parquet/avro/AvroParquetRecordFormat.java   | 17 ++++++-----------
 .../parquet/avro/AvroParquetRecordFormatTest.java       | 16 +++++++++++-----
 3 files changed, 23 insertions(+), 21 deletions(-)

diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/AvroParquetReaders.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/AvroParquetReaders.java
index cdb3714..67702c8 100644
--- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/AvroParquetReaders.java
+++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/AvroParquetReaders.java
@@ -20,6 +20,7 @@ package org.apache.flink.formats.parquet.avro;
 
 import org.apache.flink.annotation.Experimental;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.connector.file.src.reader.StreamFormat;
 import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
 import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
 
@@ -31,8 +32,8 @@ import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificRecordBase;
 
 /**
- * Convenience factory to create {@link AvroParquetRecordFormat} instances for the different Avro
- * types.
+ * A convenience builder to create {@link AvroParquetRecordFormat} instances for the different kinds
+ * of Avro record types.
  */
 @Experimental
 public class AvroParquetReaders {
@@ -46,7 +47,7 @@ public class AvroParquetReaders {
      *
      * @see #forGenericRecord(Schema)
      */
-    public static <T extends SpecificRecordBase> AvroParquetRecordFormat<T> forSpecificRecord(
+    public static <T extends SpecificRecordBase> StreamFormat<T> forSpecificRecord(
             final Class<T> typeClass) {
         return new AvroParquetRecordFormat<>(
                 new AvroTypeInfo<>(typeClass), () -> SpecificData.get());
@@ -65,7 +66,7 @@ public class AvroParquetReaders {
      * @see #forGenericRecord(Schema)
      * @see #forSpecificRecord(Class)
      */
-    public static <T> AvroParquetRecordFormat<T> forReflectRecord(final Class<T> typeClass) {
+    public static <T> StreamFormat<T> forReflectRecord(final Class<T> typeClass) {
         if (SpecificRecordBase.class.isAssignableFrom(typeClass)) {
             throw new IllegalArgumentException(
                     "Please use AvroParquetReaders.forSpecificRecord(Class<T>) for SpecificRecord.");
@@ -95,7 +96,7 @@ public class AvroParquetReaders {
      * needs this schema during 'pre-flight' time when the data flow is set up and wired, which is
      * before there is access to the files.
      */
-    public static AvroParquetRecordFormat<GenericRecord> forGenericRecord(final Schema schema) {
+    public static StreamFormat<GenericRecord> forGenericRecord(final Schema schema) {
         return new AvroParquetRecordFormat<>(
                 new GenericRecordAvroTypeInfo(schema), () -> GenericData.get());
     }
diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormat.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormat.java
index a413a1b..da75cc6 100644
--- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormat.java
+++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormat.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.formats.parquet.avro;
 
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
@@ -44,17 +43,13 @@ import java.io.IOException;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
- * A reader format that reads individual Avro records from a Parquet stream. This class leverages
- * {@link ParquetReader} underneath. Developer should make sure the parquet files can be worked with
- * provided avro schema and take care of any further compatibility issue.
- *
- * <p>It is recommended to use the factory class {@link AvroParquetReaders} which is capable to
- * create {@link AvroParquetRecordFormat AvroParquetRecordFormats} that will work with {@link
- * GenericRecord GenericRecords}, {@link org.apache.avro.specific.SpecificRecord SpecificRecords},
- * or {@link org.apache.avro.reflect.ReflectData reflect records}
+ * A reader format that reads individual Avro records from a Parquet stream. Please refer to the
+ * factory class {@link AvroParquetReaders} for how to create new {@link AvroParquetRecordFormat}.
+ * This class leverages {@link ParquetReader} underneath. The parquet files need to be in a certain
+ * way compatible with the provided Avro schema and Avro data models, i.e. {@link GenericData},
+ * {@link org.apache.avro.specific.SpecificData}, or {@link org.apache.avro.reflect.ReflectData}.
  */
-@PublicEvolving
-public class AvroParquetRecordFormat<E> implements StreamFormat<E> {
+class AvroParquetRecordFormat<E> implements StreamFormat<E> {
 
     private static final long serialVersionUID = 1L;
 
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormatTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormatTest.java
index fd0e60b..38b5396 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormatTest.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormatTest.java
@@ -212,13 +212,19 @@ class AvroParquetRecordFormatTest {
     @Test
     void getDataModel() {
         assertEquals(
-                AvroParquetReaders.forGenericRecord(schema).getDataModel().getClass(),
+                ((AvroParquetRecordFormat) AvroParquetReaders.forGenericRecord(schema))
+                        .getDataModel()
+                        .getClass(),
                 GenericData.class);
         assertEquals(
-                AvroParquetReaders.forSpecificRecord(Address.class).getDataModel().getClass(),
+                ((AvroParquetRecordFormat) AvroParquetReaders.forSpecificRecord(Address.class))
+                        .getDataModel()
+                        .getClass(),
                 SpecificData.class);
         assertEquals(
-                AvroParquetReaders.forReflectRecord(Datum.class).getDataModel().getClass(),
+                ((AvroParquetRecordFormat) AvroParquetReaders.forReflectRecord(Datum.class))
+                        .getDataModel()
+                        .getClass(),
                 ReflectData.class);
     }
 
@@ -227,7 +233,7 @@ class AvroParquetRecordFormatTest {
     // ------------------------------------------------------------------------
 
     private <T> StreamFormat.Reader<T> createReader(
-            AvroParquetRecordFormat<T> format,
+            StreamFormat<T> format,
             Configuration config,
             Path filePath,
             long splitOffset,
@@ -250,7 +256,7 @@ class AvroParquetRecordFormatTest {
     }
 
     private <T> StreamFormat.Reader<T> restoreReader(
-            AvroParquetRecordFormat<T> format,
+            StreamFormat<T> format,
             Configuration config,
             Path filePath,
             long restoredOffset,