You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2021/05/12 08:08:17 UTC

[parquet-mr] branch master updated: PARQUET-2037: Write INT96 with parquet-avro (#901)

This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new c72862b  PARQUET-2037: Write INT96 with parquet-avro (#901)
c72862b is described below

commit c72862b61399ff516e968fbd02885e573d4be81c
Author: Gabor Szadovszky <ga...@apache.org>
AuthorDate: Wed May 12 10:08:05 2021 +0200

    PARQUET-2037: Write INT96 with parquet-avro (#901)
---
 parquet-avro/README.md                             |  2 +
 .../apache/parquet/avro/AvroSchemaConverter.java   | 66 ++++++++++++------
 .../org/apache/parquet/avro/AvroWriteSupport.java  |  3 +
 .../parquet/avro/TestAvroSchemaConverter.java      | 32 +++++++++
 parquet-avro/src/test/resources/fixedToInt96.avsc  | 80 ++++++++++++++++++++++
 5 files changed, 161 insertions(+), 22 deletions(-)

diff --git a/parquet-avro/README.md b/parquet-avro/README.md
index 8b1cca2..4f67491 100644
--- a/parquet-avro/README.md
+++ b/parquet-avro/README.md
@@ -32,6 +32,7 @@ Apache Avro integration
 | `parquet.avro.read.schema`              | `String`  | The Avro schema to be used for reading. It shall be compatible with the file schema. The file schema will be used directly if not set. |
 | `parquet.avro.projection`               | `String`  | The Avro schema to be used for projection.                           |
 | `parquet.avro.compatible`               | `boolean` | Flag for compatibility mode. `true` for materializing Avro `IndexedRecord` objects, `false` for materializing the related objects for either generic, specific, or reflect records.<br/>The default value is `true`. |
+| `parquet.avro.readInt96AsFixed`        | `boolean` | Flag for handling the `INT96` Parquet types. `true` for converting it to the `fixed` Avro type, `false` for not handling `INT96` types (throwing exception).<br/>The default value is `false`.<br/>**NOTE: The `INT96` Parquet type is deprecated. This option is only to support old data.** |
 
 ### Configuration for writing
 
@@ -42,3 +43,4 @@ Apache Avro integration
 | `parquet.avro.write-old-list-structure` | `boolean` | Flag whether to write list structures in the old way (2 levels) or the new one (3 levels). When writing at 2 levels no null values are available at the element level.<br/>The default value is `true` |
 | `parquet.avro.add-list-element-records` | `boolean` | Flag whether to assume that any repeated element in the schema is a list element.<br/>The default value is `true`. |
 | `parquet.avro.write-parquet-uuid`       | `boolean` | Flag whether to write the [Parquet UUID logical type](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#uuid) in case of an [Avro UUID type](https://avro.apache.org/docs/current/spec.html#UUID) is present.<br/>The default value is `false`. |
+| `parquet.avro.writeFixedAsInt96`    | `String` | Comma separated list of paths pointing to Avro schema elements which are to be converted to `INT96` Parquet types.<br/>The path is a `'.'` separated list of field names and does not contain the name of the schema nor the namespace. The type of the referenced schema elements must be `fixed` with the size of 12 bytes.<br/>**NOTE: The `INT96` Parquet type is deprecated. This option is only to support old data.** |
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
index 4c06e9c..7d1f3ca 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
@@ -35,16 +35,20 @@ import org.apache.parquet.schema.LogicalTypeAnnotation.UUIDLogicalTypeAnnotation
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 
 import static java.util.Optional.empty;
 import static java.util.Optional.of;
 import static org.apache.avro.JsonProperties.NULL_VALUE;
 import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED;
 import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED_DEFAULT;
+import static org.apache.parquet.avro.AvroWriteSupport.WRITE_FIXED_AS_INT96;
 import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;
 import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE_DEFAULT;
 import static org.apache.parquet.avro.AvroWriteSupport.WRITE_PARQUET_UUID;
@@ -77,6 +81,7 @@ public class AvroSchemaConverter {
   private final boolean writeOldListStructure;
   private final boolean writeParquetUUID;
   private final boolean readInt96AsFixed;
+  private final Set<String> pathsToInt96;
 
   public AvroSchemaConverter() {
     this(ADD_LIST_ELEMENT_RECORDS_DEFAULT);
@@ -93,6 +98,7 @@ public class AvroSchemaConverter {
     this.writeOldListStructure = WRITE_OLD_LIST_STRUCTURE_DEFAULT;
     this.writeParquetUUID = WRITE_PARQUET_UUID_DEFAULT;
     this.readInt96AsFixed = READ_INT96_AS_FIXED_DEFAULT;
+    this.pathsToInt96 = Collections.emptySet();
   }
 
   public AvroSchemaConverter(Configuration conf) {
@@ -102,6 +108,7 @@ public class AvroSchemaConverter {
         WRITE_OLD_LIST_STRUCTURE, WRITE_OLD_LIST_STRUCTURE_DEFAULT);
     this.writeParquetUUID = conf.getBoolean(WRITE_PARQUET_UUID, WRITE_PARQUET_UUID_DEFAULT);
     this.readInt96AsFixed = conf.getBoolean(READ_INT96_AS_FIXED, READ_INT96_AS_FIXED_DEFAULT);
+    this.pathsToInt96 = new HashSet<>(Arrays.asList(conf.getStrings(WRITE_FIXED_AS_INT96, new String[0])));
   }
 
   /**
@@ -134,26 +141,26 @@ public class AvroSchemaConverter {
     if (!avroSchema.getType().equals(Schema.Type.RECORD)) {
       throw new IllegalArgumentException("Avro schema must be a record.");
     }
-    return new MessageType(avroSchema.getFullName(), convertFields(avroSchema.getFields()));
+    return new MessageType(avroSchema.getFullName(), convertFields(avroSchema.getFields(), ""));
   }
 
-  private List<Type> convertFields(List<Schema.Field> fields) {
+  private List<Type> convertFields(List<Schema.Field> fields, String schemaPath) {
     List<Type> types = new ArrayList<Type>();
     for (Schema.Field field : fields) {
       if (field.schema().getType().equals(Schema.Type.NULL)) {
         continue; // Avro nulls are not encoded, unless they are null unions
       }
-      types.add(convertField(field));
+      types.add(convertField(field, appendPath(schemaPath, field.name())));
     }
     return types;
   }
 
-  private Type convertField(String fieldName, Schema schema) {
-    return convertField(fieldName, schema, Type.Repetition.REQUIRED);
+  private Type convertField(String fieldName, Schema schema, String schemaPath) {
+    return convertField(fieldName, schema, Type.Repetition.REQUIRED, schemaPath);
   }
 
   @SuppressWarnings("deprecation")
-  private Type convertField(String fieldName, Schema schema, Type.Repetition repetition) {
+  private Type convertField(String fieldName, Schema schema, Type.Repetition repetition, String schemaPath) {
     Types.PrimitiveBuilder<PrimitiveType> builder;
     Schema.Type type = schema.getType();
     LogicalType logicalType = schema.getLogicalType();
@@ -177,26 +184,33 @@ public class AvroSchemaConverter {
         builder = Types.primitive(BINARY, repetition).as(stringType());
       }
     } else if (type.equals(Schema.Type.RECORD)) {
-      return new GroupType(repetition, fieldName, convertFields(schema.getFields()));
+      return new GroupType(repetition, fieldName, convertFields(schema.getFields(), schemaPath));
     } else if (type.equals(Schema.Type.ENUM)) {
       builder = Types.primitive(BINARY, repetition).as(enumType());
     } else if (type.equals(Schema.Type.ARRAY)) {
       if (writeOldListStructure) {
         return ConversionPatterns.listType(repetition, fieldName,
-            convertField("array", schema.getElementType(), REPEATED));
+            convertField("array", schema.getElementType(), REPEATED, schemaPath));
       } else {
         return ConversionPatterns.listOfElements(repetition, fieldName,
-            convertField(AvroWriteSupport.LIST_ELEMENT_NAME, schema.getElementType()));
+            convertField(AvroWriteSupport.LIST_ELEMENT_NAME, schema.getElementType(), schemaPath));
       }
     } else if (type.equals(Schema.Type.MAP)) {
-      Type valType = convertField("value", schema.getValueType());
+      Type valType = convertField("value", schema.getValueType(), schemaPath);
       // avro map key type is always string
       return ConversionPatterns.stringKeyMapType(repetition, fieldName, valType);
     } else if (type.equals(Schema.Type.FIXED)) {
-      builder = Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
-          .length(schema.getFixedSize());
+      if (pathsToInt96.contains(schemaPath)) {
+        if (schema.getFixedSize() != 12) {
+          throw new IllegalArgumentException(
+              "The size of the fixed type field " + schemaPath + " must be 12 bytes for INT96 conversion");
+        }
+        builder = Types.primitive(PrimitiveTypeName.INT96, repetition);
+      } else {
+        builder = Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition).length(schema.getFixedSize());
+      }
     } else if (type.equals(Schema.Type.UNION)) {
-      return convertUnion(fieldName, schema, repetition);
+      return convertUnion(fieldName, schema, repetition, schemaPath);
     } else {
       throw new UnsupportedOperationException("Cannot convert Avro type " + type);
     }
@@ -218,7 +232,7 @@ public class AvroSchemaConverter {
     return builder.named(fieldName);
   }
 
-  private Type convertUnion(String fieldName, Schema schema, Type.Repetition repetition) {
+  private Type convertUnion(String fieldName, Schema schema, Type.Repetition repetition, String schemaPath) {
     List<Schema> nonNullSchemas = new ArrayList<Schema>(schema.getTypes().size());
     // Found any schemas in the union? Required for the edge case, where the union contains only a single type.
     boolean foundNullSchema = false;
@@ -239,25 +253,26 @@ public class AvroSchemaConverter {
         throw new UnsupportedOperationException("Cannot convert Avro union of only nulls");
 
       case 1:
-        return foundNullSchema ? convertField(fieldName, nonNullSchemas.get(0), repetition) :
-          convertUnionToGroupType(fieldName, repetition, nonNullSchemas);
+        return foundNullSchema ? convertField(fieldName, nonNullSchemas.get(0), repetition, schemaPath) :
+          convertUnionToGroupType(fieldName, repetition, nonNullSchemas, schemaPath);
 
       default: // complex union type
-        return convertUnionToGroupType(fieldName, repetition, nonNullSchemas);
+        return convertUnionToGroupType(fieldName, repetition, nonNullSchemas, schemaPath);
     }
   }
 
-  private Type convertUnionToGroupType(String fieldName, Type.Repetition repetition, List<Schema> nonNullSchemas) {
+  private Type convertUnionToGroupType(String fieldName, Type.Repetition repetition, List<Schema> nonNullSchemas,
+      String schemaPath) {
     List<Type> unionTypes = new ArrayList<Type>(nonNullSchemas.size());
     int index = 0;
     for (Schema childSchema : nonNullSchemas) {
-      unionTypes.add( convertField("member" + index++, childSchema, Type.Repetition.OPTIONAL));
+      unionTypes.add( convertField("member" + index++, childSchema, Type.Repetition.OPTIONAL, schemaPath));
     }
     return new GroupType(repetition, fieldName, unionTypes);
   }
 
-  private Type convertField(Schema.Field field) {
-    return convertField(field.name(), field.schema());
+  private Type convertField(Schema.Field field, String schemaPath) {
+    return convertField(field.name(), field.schema(), schemaPath);
   }
 
   public Schema convert(MessageType parquetSchema) {
@@ -314,7 +329,7 @@ public class AvroSchemaConverter {
                 return Schema.createFixed("INT96", "INT96 represented as byte[12]", null, 12);
               }
               throw new IllegalArgumentException(
-                "INT96 is deprecated. As interim enable READ_INT96_AS_FIXED  flag to read as byte array.");
+                "INT96 is deprecated. As interim enable READ_INT96_AS_FIXED flag to read as byte array.");
             }
             @Override
             public Schema convertFLOAT(PrimitiveTypeName primitiveTypeName) {
@@ -524,4 +539,11 @@ public class AvroSchemaConverter {
         Schema.create(Schema.Type.NULL),
         original));
   }
+
+  private static String appendPath(String path, String fieldName) {
+    if (path == null || path.isEmpty()) {
+      return fieldName;
+    }
+    return path + '.' + fieldName;
+  }
 }
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
index 4406587..82a80d3 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
@@ -67,6 +67,9 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
   public static final String WRITE_PARQUET_UUID = "parquet.avro.write-parquet-uuid";
   static final boolean WRITE_PARQUET_UUID_DEFAULT = false;
 
+  // Support writing Parquet INT96 from a 12-byte Avro fixed.
+  public static final String WRITE_FIXED_AS_INT96 = "parquet.avro.writeFixedAsInt96";
+
   private static final String MAP_REPEATED_NAME = "key_value";
   private static final String MAP_KEY_NAME = "key";
   private static final String MAP_VALUE_NAME = "value";
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
index 065a636..1bafdec 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
@@ -44,6 +44,7 @@ import static org.apache.parquet.avro.AvroTestUtil.field;
 import static org.apache.parquet.avro.AvroTestUtil.optionalField;
 import static org.apache.parquet.avro.AvroTestUtil.primitive;
 import static org.apache.parquet.avro.AvroTestUtil.record;
+import static org.apache.parquet.avro.AvroWriteSupport.WRITE_FIXED_AS_INT96;
 import static org.apache.parquet.schema.OriginalType.DATE;
 import static org.apache.parquet.schema.OriginalType.TIMESTAMP_MICROS;
 import static org.apache.parquet.schema.OriginalType.TIMESTAMP_MILLIS;
@@ -824,6 +825,37 @@ public class TestAvroSchemaConverter {
             "}\n");
   }
 
+  @Test
+  public void testAvroFixed12AsParquetInt96Type() throws Exception {
+    Schema schema = new Schema.Parser().parse(
+        Resources.getResource("fixedToInt96.avsc").openStream());
+
+    Configuration conf = new Configuration();
+    conf.setStrings(WRITE_FIXED_AS_INT96, "int96", "mynestedrecord.int96inrecord", "mynestedrecord.myarrayofoptional",
+        "mynestedrecord.mymap");
+    testAvroToParquetConversion(conf, schema, "message org.apache.parquet.avro.fixedToInt96 {\n"
+        + "  required int96 int96;\n"
+        + "  required fixed_len_byte_array(12) notanint96;\n"
+        + "  required group mynestedrecord {\n"
+        + "    required int96 int96inrecord;\n"
+        + "    required group myarrayofoptional (LIST) {\n"
+        + "      repeated int96 array;\n"
+        + "    }\n"
+        + "    required group mymap (MAP) {\n"
+        + "      repeated group key_value (MAP_KEY_VALUE) {\n"
+        + "        required binary key (STRING);\n"
+        + "        required int96 value;\n"
+        + "      }\n"
+        + "    }\n"
+        + "  }\n"
+        + "  required fixed_len_byte_array(1) onebytefixed;\n"
+        + "}");
+
+    conf.setStrings(WRITE_FIXED_AS_INT96, "onebytefixed");
+    assertThrows("Exception should be thrown for fixed types to be converted to INT96 where the size is not 12 bytes",
+        IllegalArgumentException.class, () -> new AvroSchemaConverter(conf).convert(schema));
+  }
+
   public static Schema optional(Schema original) {
     return Schema.createUnion(Lists.newArrayList(
         Schema.create(Schema.Type.NULL),
diff --git a/parquet-avro/src/test/resources/fixedToInt96.avsc b/parquet-avro/src/test/resources/fixedToInt96.avsc
new file mode 100644
index 0000000..9702852
--- /dev/null
+++ b/parquet-avro/src/test/resources/fixedToInt96.avsc
@@ -0,0 +1,80 @@
+{
+  "name": "fixedToInt96",
+  "namespace": "org.apache.parquet.avro",
+  "type": "record",
+  "fields": [
+    {
+      "name": "int96",
+      "type": {
+        "type": "fixed",
+        "name": "ignored1",
+        "namespace": "",
+        "size": 12
+      }
+    },
+    {
+      "name": "notanint96",
+      "type": {
+        "type": "fixed",
+        "name": "ignored2",
+        "namespace": "",
+        "size": 12
+      }
+    },
+    {
+      "name": "mynestedrecord",
+      "type": {
+        "type": "record",
+        "name": "ignored3",
+        "namespace": "",
+        "fields": [
+          {
+            "name": "int96inrecord",
+            "type": {
+              "type": "fixed",
+              "name": "ignored4",
+              "namespace": "",
+              "size": 12
+            }
+          },
+          {
+            "name": "myarrayofoptional",
+            "type": {
+              "type": "array",
+              "items": [
+                "null",
+                {
+                  "type": "fixed",
+                  "name": "ignored5",
+                  "namespace": "",
+                  "size": 12
+                }
+              ]
+            }
+          },
+          {
+            "name": "mymap",
+            "type": {
+              "type": "map",
+              "values": {
+                "type": "fixed",
+                "name": "ignored6",
+                "namespace": "",
+                "size": 12
+              }
+            }
+          }
+        ]
+      }
+    },
+    {
+      "name": "onebytefixed",
+      "type": {
+        "type": "fixed",
+        "name": "ignored7",
+        "namespace": "",
+        "size": 1
+      }
+    }
+  ]
+}