You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2021/05/12 08:08:17 UTC
[parquet-mr] branch master updated: PARQUET-2037: Write INT96 with
parquet-avro (#901)
This is an automated email from the ASF dual-hosted git repository.
gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new c72862b PARQUET-2037: Write INT96 with parquet-avro (#901)
c72862b is described below
commit c72862b61399ff516e968fbd02885e573d4be81c
Author: Gabor Szadovszky <ga...@apache.org>
AuthorDate: Wed May 12 10:08:05 2021 +0200
PARQUET-2037: Write INT96 with parquet-avro (#901)
---
parquet-avro/README.md | 2 +
.../apache/parquet/avro/AvroSchemaConverter.java | 66 ++++++++++++------
.../org/apache/parquet/avro/AvroWriteSupport.java | 3 +
.../parquet/avro/TestAvroSchemaConverter.java | 32 +++++++++
parquet-avro/src/test/resources/fixedToInt96.avsc | 80 ++++++++++++++++++++++
5 files changed, 161 insertions(+), 22 deletions(-)
diff --git a/parquet-avro/README.md b/parquet-avro/README.md
index 8b1cca2..4f67491 100644
--- a/parquet-avro/README.md
+++ b/parquet-avro/README.md
@@ -32,6 +32,7 @@ Apache Avro integration
| `parquet.avro.read.schema` | `String` | The Avro schema to be used for reading. It shall be compatible with the file schema. The file schema will be used directly if not set. |
| `parquet.avro.projection` | `String` | The Avro schema to be used for projection. |
| `parquet.avro.compatible` | `boolean` | Flag for compatibility mode. `true` for materializing Avro `IndexedRecord` objects, `false` for materializing the related objects for either generic, specific, or reflect records.<br/>The default value is `true`. |
+| `parquet.avro.readInt96AsFixed` | `boolean` | Flag for handling the `INT96` Parquet types. `true` for converting it to the `fixed` Avro type, `false` for not handling `INT96` types (throwing exception).<br/>The default value is `false`.<br/>**NOTE: The `INT96` Parquet type is deprecated. This option is only to support old data.** |
### Configuration for writing
@@ -42,3 +43,4 @@ Apache Avro integration
| `parquet.avro.write-old-list-structure` | `boolean` | Flag whether to write list structures in the old way (2 levels) or the new one (3 levels). When writing at 2 levels no null values are available at the element level.<br/>The default value is `true` |
| `parquet.avro.add-list-element-records` | `boolean` | Flag whether to assume that any repeated element in the schema is a list element.<br/>The default value is `true`. |
| `parquet.avro.write-parquet-uuid` | `boolean` | Flag whether to write the [Parquet UUID logical type](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#uuid) in case of an [Avro UUID type](https://avro.apache.org/docs/current/spec.html#UUID) is present.<br/>The default value is `false`. |
+| `parquet.avro.writeFixedAsInt96` | `String` | Comma separated list of paths pointing to Avro schema elements which are to be converted to `INT96` Parquet types.<br/>The path is a `'.'` separated list of field names and does not contain the name of the schema nor the namespace. The type of the referenced schema elements must be `fixed` with the size of 12 bytes.<br/>**NOTE: The `INT96` Parquet type is deprecated. This option is only to support old data.** |
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
index 4c06e9c..7d1f3ca 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
@@ -35,16 +35,20 @@ import org.apache.parquet.schema.LogicalTypeAnnotation.UUIDLogicalTypeAnnotation
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import static java.util.Optional.empty;
import static java.util.Optional.of;
import static org.apache.avro.JsonProperties.NULL_VALUE;
import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED;
import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED_DEFAULT;
+import static org.apache.parquet.avro.AvroWriteSupport.WRITE_FIXED_AS_INT96;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE_DEFAULT;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_PARQUET_UUID;
@@ -77,6 +81,7 @@ public class AvroSchemaConverter {
private final boolean writeOldListStructure;
private final boolean writeParquetUUID;
private final boolean readInt96AsFixed;
+ private final Set<String> pathsToInt96;
public AvroSchemaConverter() {
this(ADD_LIST_ELEMENT_RECORDS_DEFAULT);
@@ -93,6 +98,7 @@ public class AvroSchemaConverter {
this.writeOldListStructure = WRITE_OLD_LIST_STRUCTURE_DEFAULT;
this.writeParquetUUID = WRITE_PARQUET_UUID_DEFAULT;
this.readInt96AsFixed = READ_INT96_AS_FIXED_DEFAULT;
+ this.pathsToInt96 = Collections.emptySet();
}
public AvroSchemaConverter(Configuration conf) {
@@ -102,6 +108,7 @@ public class AvroSchemaConverter {
WRITE_OLD_LIST_STRUCTURE, WRITE_OLD_LIST_STRUCTURE_DEFAULT);
this.writeParquetUUID = conf.getBoolean(WRITE_PARQUET_UUID, WRITE_PARQUET_UUID_DEFAULT);
this.readInt96AsFixed = conf.getBoolean(READ_INT96_AS_FIXED, READ_INT96_AS_FIXED_DEFAULT);
+ this.pathsToInt96 = new HashSet<>(Arrays.asList(conf.getStrings(WRITE_FIXED_AS_INT96, new String[0])));
}
/**
@@ -134,26 +141,26 @@ public class AvroSchemaConverter {
if (!avroSchema.getType().equals(Schema.Type.RECORD)) {
throw new IllegalArgumentException("Avro schema must be a record.");
}
- return new MessageType(avroSchema.getFullName(), convertFields(avroSchema.getFields()));
+ return new MessageType(avroSchema.getFullName(), convertFields(avroSchema.getFields(), ""));
}
- private List<Type> convertFields(List<Schema.Field> fields) {
+ private List<Type> convertFields(List<Schema.Field> fields, String schemaPath) {
List<Type> types = new ArrayList<Type>();
for (Schema.Field field : fields) {
if (field.schema().getType().equals(Schema.Type.NULL)) {
continue; // Avro nulls are not encoded, unless they are null unions
}
- types.add(convertField(field));
+ types.add(convertField(field, appendPath(schemaPath, field.name())));
}
return types;
}
- private Type convertField(String fieldName, Schema schema) {
- return convertField(fieldName, schema, Type.Repetition.REQUIRED);
+ private Type convertField(String fieldName, Schema schema, String schemaPath) {
+ return convertField(fieldName, schema, Type.Repetition.REQUIRED, schemaPath);
}
@SuppressWarnings("deprecation")
- private Type convertField(String fieldName, Schema schema, Type.Repetition repetition) {
+ private Type convertField(String fieldName, Schema schema, Type.Repetition repetition, String schemaPath) {
Types.PrimitiveBuilder<PrimitiveType> builder;
Schema.Type type = schema.getType();
LogicalType logicalType = schema.getLogicalType();
@@ -177,26 +184,33 @@ public class AvroSchemaConverter {
builder = Types.primitive(BINARY, repetition).as(stringType());
}
} else if (type.equals(Schema.Type.RECORD)) {
- return new GroupType(repetition, fieldName, convertFields(schema.getFields()));
+ return new GroupType(repetition, fieldName, convertFields(schema.getFields(), schemaPath));
} else if (type.equals(Schema.Type.ENUM)) {
builder = Types.primitive(BINARY, repetition).as(enumType());
} else if (type.equals(Schema.Type.ARRAY)) {
if (writeOldListStructure) {
return ConversionPatterns.listType(repetition, fieldName,
- convertField("array", schema.getElementType(), REPEATED));
+ convertField("array", schema.getElementType(), REPEATED, schemaPath));
} else {
return ConversionPatterns.listOfElements(repetition, fieldName,
- convertField(AvroWriteSupport.LIST_ELEMENT_NAME, schema.getElementType()));
+ convertField(AvroWriteSupport.LIST_ELEMENT_NAME, schema.getElementType(), schemaPath));
}
} else if (type.equals(Schema.Type.MAP)) {
- Type valType = convertField("value", schema.getValueType());
+ Type valType = convertField("value", schema.getValueType(), schemaPath);
// avro map key type is always string
return ConversionPatterns.stringKeyMapType(repetition, fieldName, valType);
} else if (type.equals(Schema.Type.FIXED)) {
- builder = Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
- .length(schema.getFixedSize());
+ if (pathsToInt96.contains(schemaPath)) {
+ if (schema.getFixedSize() != 12) {
+ throw new IllegalArgumentException(
+ "The size of the fixed type field " + schemaPath + " must be 12 bytes for INT96 conversion");
+ }
+ builder = Types.primitive(PrimitiveTypeName.INT96, repetition);
+ } else {
+ builder = Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition).length(schema.getFixedSize());
+ }
} else if (type.equals(Schema.Type.UNION)) {
- return convertUnion(fieldName, schema, repetition);
+ return convertUnion(fieldName, schema, repetition, schemaPath);
} else {
throw new UnsupportedOperationException("Cannot convert Avro type " + type);
}
@@ -218,7 +232,7 @@ public class AvroSchemaConverter {
return builder.named(fieldName);
}
- private Type convertUnion(String fieldName, Schema schema, Type.Repetition repetition) {
+ private Type convertUnion(String fieldName, Schema schema, Type.Repetition repetition, String schemaPath) {
List<Schema> nonNullSchemas = new ArrayList<Schema>(schema.getTypes().size());
// Found any schemas in the union? Required for the edge case, where the union contains only a single type.
boolean foundNullSchema = false;
@@ -239,25 +253,26 @@ public class AvroSchemaConverter {
throw new UnsupportedOperationException("Cannot convert Avro union of only nulls");
case 1:
- return foundNullSchema ? convertField(fieldName, nonNullSchemas.get(0), repetition) :
- convertUnionToGroupType(fieldName, repetition, nonNullSchemas);
+ return foundNullSchema ? convertField(fieldName, nonNullSchemas.get(0), repetition, schemaPath) :
+ convertUnionToGroupType(fieldName, repetition, nonNullSchemas, schemaPath);
default: // complex union type
- return convertUnionToGroupType(fieldName, repetition, nonNullSchemas);
+ return convertUnionToGroupType(fieldName, repetition, nonNullSchemas, schemaPath);
}
}
- private Type convertUnionToGroupType(String fieldName, Type.Repetition repetition, List<Schema> nonNullSchemas) {
+ private Type convertUnionToGroupType(String fieldName, Type.Repetition repetition, List<Schema> nonNullSchemas,
+ String schemaPath) {
List<Type> unionTypes = new ArrayList<Type>(nonNullSchemas.size());
int index = 0;
for (Schema childSchema : nonNullSchemas) {
- unionTypes.add( convertField("member" + index++, childSchema, Type.Repetition.OPTIONAL));
+ unionTypes.add( convertField("member" + index++, childSchema, Type.Repetition.OPTIONAL, schemaPath));
}
return new GroupType(repetition, fieldName, unionTypes);
}
- private Type convertField(Schema.Field field) {
- return convertField(field.name(), field.schema());
+ private Type convertField(Schema.Field field, String schemaPath) {
+ return convertField(field.name(), field.schema(), schemaPath);
}
public Schema convert(MessageType parquetSchema) {
@@ -314,7 +329,7 @@ public class AvroSchemaConverter {
return Schema.createFixed("INT96", "INT96 represented as byte[12]", null, 12);
}
throw new IllegalArgumentException(
- "INT96 is deprecated. As interim enable READ_INT96_AS_FIXED flag to read as byte array.");
+ "INT96 is deprecated. As interim enable READ_INT96_AS_FIXED flag to read as byte array.");
}
@Override
public Schema convertFLOAT(PrimitiveTypeName primitiveTypeName) {
@@ -524,4 +539,11 @@ public class AvroSchemaConverter {
Schema.create(Schema.Type.NULL),
original));
}
+
+ private static String appendPath(String path, String fieldName) {
+ if (path == null || path.isEmpty()) {
+ return fieldName;
+ }
+ return path + '.' + fieldName;
+ }
}
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
index 4406587..82a80d3 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
@@ -67,6 +67,9 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
public static final String WRITE_PARQUET_UUID = "parquet.avro.write-parquet-uuid";
static final boolean WRITE_PARQUET_UUID_DEFAULT = false;
+ // Support writing Parquet INT96 from a 12-byte Avro fixed.
+ public static final String WRITE_FIXED_AS_INT96 = "parquet.avro.writeFixedAsInt96";
+
private static final String MAP_REPEATED_NAME = "key_value";
private static final String MAP_KEY_NAME = "key";
private static final String MAP_VALUE_NAME = "value";
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
index 065a636..1bafdec 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
@@ -44,6 +44,7 @@ import static org.apache.parquet.avro.AvroTestUtil.field;
import static org.apache.parquet.avro.AvroTestUtil.optionalField;
import static org.apache.parquet.avro.AvroTestUtil.primitive;
import static org.apache.parquet.avro.AvroTestUtil.record;
+import static org.apache.parquet.avro.AvroWriteSupport.WRITE_FIXED_AS_INT96;
import static org.apache.parquet.schema.OriginalType.DATE;
import static org.apache.parquet.schema.OriginalType.TIMESTAMP_MICROS;
import static org.apache.parquet.schema.OriginalType.TIMESTAMP_MILLIS;
@@ -824,6 +825,37 @@ public class TestAvroSchemaConverter {
"}\n");
}
+ @Test
+ public void testAvroFixed12AsParquetInt96Type() throws Exception {
+ Schema schema = new Schema.Parser().parse(
+ Resources.getResource("fixedToInt96.avsc").openStream());
+
+ Configuration conf = new Configuration();
+ conf.setStrings(WRITE_FIXED_AS_INT96, "int96", "mynestedrecord.int96inrecord", "mynestedrecord.myarrayofoptional",
+ "mynestedrecord.mymap");
+ testAvroToParquetConversion(conf, schema, "message org.apache.parquet.avro.fixedToInt96 {\n"
+ + " required int96 int96;\n"
+ + " required fixed_len_byte_array(12) notanint96;\n"
+ + " required group mynestedrecord {\n"
+ + " required int96 int96inrecord;\n"
+ + " required group myarrayofoptional (LIST) {\n"
+ + " repeated int96 array;\n"
+ + " }\n"
+ + " required group mymap (MAP) {\n"
+ + " repeated group key_value (MAP_KEY_VALUE) {\n"
+ + " required binary key (STRING);\n"
+ + " required int96 value;\n"
+ + " }\n"
+ + " }\n"
+ + " }\n"
+ + " required fixed_len_byte_array(1) onebytefixed;\n"
+ + "}");
+
+ conf.setStrings(WRITE_FIXED_AS_INT96, "onebytefixed");
+ assertThrows("Exception should be thrown for fixed types to be converted to INT96 where the size is not 12 bytes",
+ IllegalArgumentException.class, () -> new AvroSchemaConverter(conf).convert(schema));
+ }
+
public static Schema optional(Schema original) {
return Schema.createUnion(Lists.newArrayList(
Schema.create(Schema.Type.NULL),
diff --git a/parquet-avro/src/test/resources/fixedToInt96.avsc b/parquet-avro/src/test/resources/fixedToInt96.avsc
new file mode 100644
index 0000000..9702852
--- /dev/null
+++ b/parquet-avro/src/test/resources/fixedToInt96.avsc
@@ -0,0 +1,80 @@
+{
+ "name": "fixedToInt96",
+ "namespace": "org.apache.parquet.avro",
+ "type": "record",
+ "fields": [
+ {
+ "name": "int96",
+ "type": {
+ "type": "fixed",
+ "name": "ignored1",
+ "namespace": "",
+ "size": 12
+ }
+ },
+ {
+ "name": "notanint96",
+ "type": {
+ "type": "fixed",
+ "name": "ignored2",
+ "namespace": "",
+ "size": 12
+ }
+ },
+ {
+ "name": "mynestedrecord",
+ "type": {
+ "type": "record",
+ "name": "ignored3",
+ "namespace": "",
+ "fields": [
+ {
+ "name": "int96inrecord",
+ "type": {
+ "type": "fixed",
+ "name": "ignored4",
+ "namespace": "",
+ "size": 12
+ }
+ },
+ {
+ "name": "myarrayofoptional",
+ "type": {
+ "type": "array",
+ "items": [
+ "null",
+ {
+ "type": "fixed",
+ "name": "ignored5",
+ "namespace": "",
+ "size": 12
+ }
+ ]
+ }
+ },
+ {
+ "name": "mymap",
+ "type": {
+ "type": "map",
+ "values": {
+ "type": "fixed",
+ "name": "ignored6",
+ "namespace": "",
+ "size": 12
+ }
+ }
+ }
+ ]
+ }
+ },
+ {
+ "name": "onebytefixed",
+ "type": {
+ "type": "fixed",
+ "name": "ignored7",
+ "namespace": "",
+ "size": 1
+ }
+ }
+ ]
+}