You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/02/28 07:54:50 UTC

[flink] branch master updated (431f757a -> e2463dd)

This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 431f757a [FLINK-26285] Fixes an inconsistency which was introduced by c3a6b51 as part of changes done for FLINK-19543
     new e73d3406 [FLINK-26357][avro-parquet] add FLINK API annotations
     new e2463dd  [FLINK-26357][avro-parquet] make AvroParquetRecordFormat package private and update javadoc.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/formats/parquet/avro/AvroParquetReaders.java   | 13 ++++++++-----
 .../formats/parquet/avro/AvroParquetRecordFormat.java    | 10 ++++++++--
 .../flink/formats/parquet/avro/AvroParquetWriters.java   |  2 ++
 .../flink/formats/parquet/avro/ParquetAvroWriters.java   |  1 +
 .../parquet/avro/AvroParquetRecordFormatTest.java        | 16 +++++++++++-----
 5 files changed, 30 insertions(+), 12 deletions(-)

[flink] 02/02: [FLINK-26357][avro-parquet] make AvroParquetRecordFormat package private and update javadoc.

Posted by fp...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e2463ddace26601c6442dbaa68c198a27c9cfbcf
Author: Jing Ge <ge...@gmail.com>
AuthorDate: Fri Feb 25 12:31:03 2022 +0100

    [FLINK-26357][avro-parquet] make AvroParquetRecordFormat package private and update javadoc.
---
 .../flink/formats/parquet/avro/AvroParquetReaders.java  | 11 ++++++-----
 .../formats/parquet/avro/AvroParquetRecordFormat.java   | 17 ++++++-----------
 .../parquet/avro/AvroParquetRecordFormatTest.java       | 16 +++++++++++-----
 3 files changed, 23 insertions(+), 21 deletions(-)

diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/AvroParquetReaders.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/AvroParquetReaders.java
index cdb3714..67702c8 100644
--- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/AvroParquetReaders.java
+++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/AvroParquetReaders.java
@@ -20,6 +20,7 @@ package org.apache.flink.formats.parquet.avro;
 
 import org.apache.flink.annotation.Experimental;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.connector.file.src.reader.StreamFormat;
 import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
 import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
 
@@ -31,8 +32,8 @@ import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificRecordBase;
 
 /**
- * Convenience factory to create {@link AvroParquetRecordFormat} instances for the different Avro
- * types.
+ * A convenience builder to create {@link AvroParquetRecordFormat} instances for the different kinds
+ * of Avro record types.
  */
 @Experimental
 public class AvroParquetReaders {
@@ -46,7 +47,7 @@ public class AvroParquetReaders {
      *
      * @see #forGenericRecord(Schema)
      */
-    public static <T extends SpecificRecordBase> AvroParquetRecordFormat<T> forSpecificRecord(
+    public static <T extends SpecificRecordBase> StreamFormat<T> forSpecificRecord(
             final Class<T> typeClass) {
         return new AvroParquetRecordFormat<>(
                 new AvroTypeInfo<>(typeClass), () -> SpecificData.get());
@@ -65,7 +66,7 @@ public class AvroParquetReaders {
      * @see #forGenericRecord(Schema)
      * @see #forSpecificRecord(Class)
      */
-    public static <T> AvroParquetRecordFormat<T> forReflectRecord(final Class<T> typeClass) {
+    public static <T> StreamFormat<T> forReflectRecord(final Class<T> typeClass) {
         if (SpecificRecordBase.class.isAssignableFrom(typeClass)) {
             throw new IllegalArgumentException(
                     "Please use AvroParquetReaders.forSpecificRecord(Class<T>) for SpecificRecord.");
@@ -95,7 +96,7 @@ public class AvroParquetReaders {
      * needs this schema during 'pre-flight' time when the data flow is set up and wired, which is
      * before there is access to the files.
      */
-    public static AvroParquetRecordFormat<GenericRecord> forGenericRecord(final Schema schema) {
+    public static StreamFormat<GenericRecord> forGenericRecord(final Schema schema) {
         return new AvroParquetRecordFormat<>(
                 new GenericRecordAvroTypeInfo(schema), () -> GenericData.get());
     }
diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormat.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormat.java
index a413a1b..da75cc6 100644
--- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormat.java
+++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormat.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.formats.parquet.avro;
 
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
@@ -44,17 +43,13 @@ import java.io.IOException;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
- * A reader format that reads individual Avro records from a Parquet stream. This class leverages
- * {@link ParquetReader} underneath. Developer should make sure the parquet files can be worked with
- * provided avro schema and take care of any further compatibility issue.
- *
- * <p>It is recommended to use the factory class {@link AvroParquetReaders} which is capable to
- * create {@link AvroParquetRecordFormat AvroParquetRecordFormats} that will work with {@link
- * GenericRecord GenericRecords}, {@link org.apache.avro.specific.SpecificRecord SpecificRecords},
- * or {@link org.apache.avro.reflect.ReflectData reflect records}
+ * A reader format that reads individual Avro records from a Parquet stream. Please refer to the
+ * factory class {@link AvroParquetReaders} for how to create new {@link AvroParquetRecordFormat}.
+ * This class leverages {@link ParquetReader} underneath. The parquet files need to be in a certain
+ * way compatible with the provided Avro schema and Avro data models, i.e. {@link GenericData},
+ * {@link org.apache.avro.specific.SpecificData}, or {@link org.apache.avro.reflect.ReflectData}.
  */
-@PublicEvolving
-public class AvroParquetRecordFormat<E> implements StreamFormat<E> {
+class AvroParquetRecordFormat<E> implements StreamFormat<E> {
 
     private static final long serialVersionUID = 1L;
 
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormatTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormatTest.java
index fd0e60b..38b5396 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormatTest.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormatTest.java
@@ -212,13 +212,19 @@ class AvroParquetRecordFormatTest {
     @Test
     void getDataModel() {
         assertEquals(
-                AvroParquetReaders.forGenericRecord(schema).getDataModel().getClass(),
+                ((AvroParquetRecordFormat) AvroParquetReaders.forGenericRecord(schema))
+                        .getDataModel()
+                        .getClass(),
                 GenericData.class);
         assertEquals(
-                AvroParquetReaders.forSpecificRecord(Address.class).getDataModel().getClass(),
+                ((AvroParquetRecordFormat) AvroParquetReaders.forSpecificRecord(Address.class))
+                        .getDataModel()
+                        .getClass(),
                 SpecificData.class);
         assertEquals(
-                AvroParquetReaders.forReflectRecord(Datum.class).getDataModel().getClass(),
+                ((AvroParquetRecordFormat) AvroParquetReaders.forReflectRecord(Datum.class))
+                        .getDataModel()
+                        .getClass(),
                 ReflectData.class);
     }
 
@@ -227,7 +233,7 @@ class AvroParquetRecordFormatTest {
     // ------------------------------------------------------------------------
 
     private <T> StreamFormat.Reader<T> createReader(
-            AvroParquetRecordFormat<T> format,
+            StreamFormat<T> format,
             Configuration config,
             Path filePath,
             long splitOffset,
@@ -250,7 +256,7 @@ class AvroParquetRecordFormatTest {
     }
 
     private <T> StreamFormat.Reader<T> restoreReader(
-            AvroParquetRecordFormat<T> format,
+            StreamFormat<T> format,
             Configuration config,
             Path filePath,
             long restoredOffset,

[flink] 01/02: [FLINK-26357][avro-parquet] add FLINK API annotations

Posted by fp...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e73d340668c6264ed847ac7d6134efb4094f90d3
Author: Jing Ge <ge...@gmail.com>
AuthorDate: Thu Feb 24 17:11:16 2022 +0100

    [FLINK-26357][avro-parquet] add FLINK API annotations
---
 .../flink/formats/parquet/avro/AvroParquetReaders.java      |  4 +++-
 .../flink/formats/parquet/avro/AvroParquetRecordFormat.java | 13 ++++++++++++-
 .../flink/formats/parquet/avro/AvroParquetWriters.java      |  2 ++
 .../flink/formats/parquet/avro/ParquetAvroWriters.java      |  1 +
 4 files changed, 18 insertions(+), 2 deletions(-)

diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/AvroParquetReaders.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/AvroParquetReaders.java
index edbc807..cdb3714 100644
--- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/AvroParquetReaders.java
+++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/AvroParquetReaders.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.formats.parquet.avro;
 
+import org.apache.flink.annotation.Experimental;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
 import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
@@ -30,9 +31,10 @@ import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificRecordBase;
 
 /**
- * Convenience builder to create {@link AvroParquetRecordFormat} instances for the different Avro
+ * Convenience factory to create {@link AvroParquetRecordFormat} instances for the different Avro
  * types.
  */
+@Experimental
 public class AvroParquetReaders {
 
     /**
diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormat.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormat.java
index d83c245..a413a1b 100644
--- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormat.java
+++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormat.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.formats.parquet.avro;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
@@ -42,7 +43,17 @@ import java.io.IOException;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 
-/** A reader format that reads individual Avro records from a Parquet stream. */
+/**
+ * A reader format that reads individual Avro records from a Parquet stream. This class leverages
+ * {@link ParquetReader} underneath. Developer should make sure the parquet files can be worked with
+ * provided avro schema and take care of any further compatibility issue.
+ *
+ * <p>It is recommended to use the factory class {@link AvroParquetReaders} which is capable to
+ * create {@link AvroParquetRecordFormat AvroParquetRecordFormats} that will work with {@link
+ * GenericRecord GenericRecords}, {@link org.apache.avro.specific.SpecificRecord SpecificRecords},
+ * or {@link org.apache.avro.reflect.ReflectData reflect records}
+ */
+@PublicEvolving
 public class AvroParquetRecordFormat<E> implements StreamFormat<E> {
 
     private static final long serialVersionUID = 1L;
diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/AvroParquetWriters.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/AvroParquetWriters.java
index 6f6fe8e..28b47fc 100644
--- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/AvroParquetWriters.java
+++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/AvroParquetWriters.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.formats.parquet.avro;
 
+import org.apache.flink.annotation.Experimental;
 import org.apache.flink.formats.parquet.ParquetBuilder;
 import org.apache.flink.formats.parquet.ParquetWriterFactory;
 
@@ -37,6 +38,7 @@ import java.io.IOException;
  * Convenience builder to create {@link ParquetWriterFactory} instances for the different Avro
  * types.
  */
+@Experimental
 public class AvroParquetWriters {
     /**
      * Creates a ParquetWriterFactory for an Avro specific type. The Parquet writers will use the
diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/ParquetAvroWriters.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/ParquetAvroWriters.java
index 7a0f4e7..92a4892 100644
--- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/ParquetAvroWriters.java
+++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/ParquetAvroWriters.java
@@ -30,6 +30,7 @@ import org.apache.avro.specific.SpecificRecordBase;
  *
  * @deprecated use {@link AvroParquetWriters} instead.
  */
+@Deprecated
 public class ParquetAvroWriters {
 
     /**