You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/02/28 07:54:52 UTC
[flink] 02/02: [FLINK-26357][avro-parquet] make AvroParquetRecordFormat package private and update javadoc.
This is an automated email from the ASF dual-hosted git repository.
fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit e2463ddace26601c6442dbaa68c198a27c9cfbcf
Author: Jing Ge <ge...@gmail.com>
AuthorDate: Fri Feb 25 12:31:03 2022 +0100
[FLINK-26357][avro-parquet] make AvroParquetRecordFormat package private and update javadoc.
---
.../flink/formats/parquet/avro/AvroParquetReaders.java | 11 ++++++-----
.../formats/parquet/avro/AvroParquetRecordFormat.java | 17 ++++++-----------
.../parquet/avro/AvroParquetRecordFormatTest.java | 16 +++++++++++-----
3 files changed, 23 insertions(+), 21 deletions(-)
diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/AvroParquetReaders.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/AvroParquetReaders.java
index cdb3714..67702c8 100644
--- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/AvroParquetReaders.java
+++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/AvroParquetReaders.java
@@ -20,6 +20,7 @@ package org.apache.flink.formats.parquet.avro;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.connector.file.src.reader.StreamFormat;
import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
@@ -31,8 +32,8 @@ import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificRecordBase;
/**
- * Convenience factory to create {@link AvroParquetRecordFormat} instances for the different Avro
- * types.
+ * A convenience builder to create {@link AvroParquetRecordFormat} instances for the different kinds
+ * of Avro record types.
*/
@Experimental
public class AvroParquetReaders {
@@ -46,7 +47,7 @@ public class AvroParquetReaders {
*
* @see #forGenericRecord(Schema)
*/
- public static <T extends SpecificRecordBase> AvroParquetRecordFormat<T> forSpecificRecord(
+ public static <T extends SpecificRecordBase> StreamFormat<T> forSpecificRecord(
final Class<T> typeClass) {
return new AvroParquetRecordFormat<>(
new AvroTypeInfo<>(typeClass), () -> SpecificData.get());
@@ -65,7 +66,7 @@ public class AvroParquetReaders {
* @see #forGenericRecord(Schema)
* @see #forSpecificRecord(Class)
*/
- public static <T> AvroParquetRecordFormat<T> forReflectRecord(final Class<T> typeClass) {
+ public static <T> StreamFormat<T> forReflectRecord(final Class<T> typeClass) {
if (SpecificRecordBase.class.isAssignableFrom(typeClass)) {
throw new IllegalArgumentException(
"Please use AvroParquetReaders.forSpecificRecord(Class<T>) for SpecificRecord.");
@@ -95,7 +96,7 @@ public class AvroParquetReaders {
* needs this schema during 'pre-flight' time when the data flow is set up and wired, which is
* before there is access to the files.
*/
- public static AvroParquetRecordFormat<GenericRecord> forGenericRecord(final Schema schema) {
+ public static StreamFormat<GenericRecord> forGenericRecord(final Schema schema) {
return new AvroParquetRecordFormat<>(
new GenericRecordAvroTypeInfo(schema), () -> GenericData.get());
}
diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormat.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormat.java
index a413a1b..da75cc6 100644
--- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormat.java
+++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormat.java
@@ -18,7 +18,6 @@
package org.apache.flink.formats.parquet.avro;
-import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
@@ -44,17 +43,13 @@ import java.io.IOException;
import static org.apache.flink.util.Preconditions.checkArgument;
/**
- * A reader format that reads individual Avro records from a Parquet stream. This class leverages
- * {@link ParquetReader} underneath. Developer should make sure the parquet files can be worked with
- * provided avro schema and take care of any further compatibility issue.
- *
- * <p>It is recommended to use the factory class {@link AvroParquetReaders} which is capable to
- * create {@link AvroParquetRecordFormat AvroParquetRecordFormats} that will work with {@link
- * GenericRecord GenericRecords}, {@link org.apache.avro.specific.SpecificRecord SpecificRecords},
- * or {@link org.apache.avro.reflect.ReflectData reflect records}
+ * A reader format that reads individual Avro records from a Parquet stream. Please refer to the
+ * factory class {@link AvroParquetReaders} for how to create new {@link AvroParquetRecordFormat}.
+ * This class leverages {@link ParquetReader} underneath. The parquet files need to be in a certain
+ * way compatible with the provided Avro schema and Avro data models, i.e. {@link GenericData},
+ * {@link org.apache.avro.specific.SpecificData}, or {@link org.apache.avro.reflect.ReflectData}.
*/
-@PublicEvolving
-public class AvroParquetRecordFormat<E> implements StreamFormat<E> {
+class AvroParquetRecordFormat<E> implements StreamFormat<E> {
private static final long serialVersionUID = 1L;
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormatTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormatTest.java
index fd0e60b..38b5396 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormatTest.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetRecordFormatTest.java
@@ -212,13 +212,19 @@ class AvroParquetRecordFormatTest {
@Test
void getDataModel() {
assertEquals(
- AvroParquetReaders.forGenericRecord(schema).getDataModel().getClass(),
+ ((AvroParquetRecordFormat) AvroParquetReaders.forGenericRecord(schema))
+ .getDataModel()
+ .getClass(),
GenericData.class);
assertEquals(
- AvroParquetReaders.forSpecificRecord(Address.class).getDataModel().getClass(),
+ ((AvroParquetRecordFormat) AvroParquetReaders.forSpecificRecord(Address.class))
+ .getDataModel()
+ .getClass(),
SpecificData.class);
assertEquals(
- AvroParquetReaders.forReflectRecord(Datum.class).getDataModel().getClass(),
+ ((AvroParquetRecordFormat) AvroParquetReaders.forReflectRecord(Datum.class))
+ .getDataModel()
+ .getClass(),
ReflectData.class);
}
@@ -227,7 +233,7 @@ class AvroParquetRecordFormatTest {
// ------------------------------------------------------------------------
private <T> StreamFormat.Reader<T> createReader(
- AvroParquetRecordFormat<T> format,
+ StreamFormat<T> format,
Configuration config,
Path filePath,
long splitOffset,
@@ -250,7 +256,7 @@ class AvroParquetRecordFormatTest {
}
private <T> StreamFormat.Reader<T> restoreReader(
- AvroParquetRecordFormat<T> format,
+ StreamFormat<T> format,
Configuration config,
Path filePath,
long restoredOffset,