You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2020/12/07 09:47:16 UTC
[parquet-mr] branch master updated: PARQUET-1928: Interpret Parquet
INT96 type as FIXED[12] AVRO Schema (#831)
This is an automated email from the ASF dual-hosted git repository.
gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new bea6b66 PARQUET-1928: Interpret Parquet INT96 type as FIXED[12] AVRO Schema (#831)
bea6b66 is described below
commit bea6b669d8ff66d938efbefcd7efd5a3d0d8801c
Author: Anant Damle <an...@gmail.com>
AuthorDate: Mon Dec 7 17:47:07 2020 +0800
PARQUET-1928: Interpret Parquet INT96 type as FIXED[12] AVRO Schema (#831)
* Add configuration flag to enable reading INT96 as fixed. The flag is defaulted to false to discourage use of INT96.
---
.../org/apache/parquet/avro/AvroReadSupport.java | 4 ++++
.../apache/parquet/avro/AvroSchemaConverter.java | 11 ++++++++-
.../parquet/avro/TestAvroSchemaConverter.java | 26 ++++++++++++++++++++++
3 files changed, 40 insertions(+), 1 deletion(-)
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
index 5bf0cff..eca1441 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
@@ -51,6 +51,10 @@ public class AvroReadSupport<T> extends ReadSupport<T> {
public static final String AVRO_COMPATIBILITY = "parquet.avro.compatible";
public static final boolean AVRO_DEFAULT_COMPATIBILITY = true;
+ // Support reading Parquet INT96 as a 12-byte array.
+ public static final String READ_INT96_AS_FIXED = "parquet.avro.readInt96AsFixed";
+ public static final boolean READ_INT96_AS_FIXED_DEFAULT = false;
+
/**
* @param configuration a configuration
* @param requestedProjection the requested projection schema
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
index c1bb3c5..4c06e9c 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
@@ -43,6 +43,8 @@ import java.util.Optional;
import static java.util.Optional.empty;
import static java.util.Optional.of;
import static org.apache.avro.JsonProperties.NULL_VALUE;
+import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED;
+import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED_DEFAULT;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE_DEFAULT;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_PARQUET_UUID;
@@ -74,6 +76,7 @@ public class AvroSchemaConverter {
private final boolean assumeRepeatedIsListElement;
private final boolean writeOldListStructure;
private final boolean writeParquetUUID;
+ private final boolean readInt96AsFixed;
public AvroSchemaConverter() {
this(ADD_LIST_ELEMENT_RECORDS_DEFAULT);
@@ -89,6 +92,7 @@ public class AvroSchemaConverter {
this.assumeRepeatedIsListElement = assumeRepeatedIsListElement;
this.writeOldListStructure = WRITE_OLD_LIST_STRUCTURE_DEFAULT;
this.writeParquetUUID = WRITE_PARQUET_UUID_DEFAULT;
+ this.readInt96AsFixed = READ_INT96_AS_FIXED_DEFAULT;
}
public AvroSchemaConverter(Configuration conf) {
@@ -97,6 +101,7 @@ public class AvroSchemaConverter {
this.writeOldListStructure = conf.getBoolean(
WRITE_OLD_LIST_STRUCTURE, WRITE_OLD_LIST_STRUCTURE_DEFAULT);
this.writeParquetUUID = conf.getBoolean(WRITE_PARQUET_UUID, WRITE_PARQUET_UUID_DEFAULT);
+ this.readInt96AsFixed = conf.getBoolean(READ_INT96_AS_FIXED, READ_INT96_AS_FIXED_DEFAULT);
}
/**
@@ -305,7 +310,11 @@ public class AvroSchemaConverter {
}
@Override
public Schema convertINT96(PrimitiveTypeName primitiveTypeName) {
- throw new IllegalArgumentException("INT96 not implemented and is deprecated");
+ if (readInt96AsFixed) {
+ return Schema.createFixed("INT96", "INT96 represented as byte[12]", null, 12);
+ }
+ throw new IllegalArgumentException(
+ "INT96 is deprecated. As interim enable READ_INT96_AS_FIXED flag to read as byte array.");
}
@Override
public Schema convertFLOAT(PrimitiveTypeName primitiveTypeName) {
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
index 6f87acf..065a636 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
@@ -586,6 +586,32 @@ public class TestAvroSchemaConverter {
}
@Test
+ public void testParquetInt96AsFixed12AvroType() throws Exception {
+ Configuration enableInt96ReadingConfig = new Configuration();
+ enableInt96ReadingConfig.setBoolean(AvroReadSupport.READ_INT96_AS_FIXED, true);
+
+ Schema schema = Schema.createRecord("myrecord", null, null, false);
+ Schema int96schema = Schema.createFixed("INT96", "INT96 represented as byte[12]", null, 12);
+ schema.setFields(Collections.singletonList(
+ new Schema.Field("int96_field", int96schema, null, null)));
+
+ testParquetToAvroConversion(enableInt96ReadingConfig, schema, "message myrecord {\n" +
+ " required int96 int96_field;\n" +
+ "}\n");
+ }
+
+ @Test
+ public void testParquetInt96DefaultFail() throws Exception {
+ Schema schema = Schema.createRecord("myrecord", null, null, false);
+
+ MessageType parquetSchemaWithInt96 = MessageTypeParser.parseMessageType("message myrecord {\n required int96 int96_field;\n}\n");
+
+ assertThrows("INT96 is deprecated. As interim enable READ_INT96_AS_FIXED flag to read as byte array.",
+ IllegalArgumentException.class,
+ () -> new AvroSchemaConverter().convert(parquetSchemaWithInt96));
+ }
+
+ @Test
public void testDateType() throws Exception {
Schema date = LogicalTypes.date().addToSchema(Schema.create(INT));
Schema expected = Schema.createRecord("myrecord", null, null, false,