You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2015/11/18 00:10:02 UTC

parquet-mr git commit: PARQUET-364: Fix compatibility for Avro lists of lists.

Repository: parquet-mr
Updated Branches:
  refs/heads/master 6b605a4ea -> 440882c65


PARQUET-364: Fix compatibility for Avro lists of lists.

This fixes lists of lists that have been written with Avro's 2-level
representation. The conversion setup logic missed the case where the
inner field is repeated and cannot be the element in a 3-level list.

This also fixes the schema conversion for cases where an unknown
writer used a 2-level list of lists.

This is based on @liancheng's #264 but fixes the problem in a slightly different way.

Author: Ryan Blue <bl...@apache.org>

Closes #272 from rdblue/PARQUET-364-fix-avro-lists-of-lists and squashes the following commits:

41a70e0 [Ryan Blue] PARQUET-364: Fix compatibility for Avro lists of lists.


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/440882c6
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/440882c6
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/440882c6

Branch: refs/heads/master
Commit: 440882c659967572311402c7fe534cf13d501cf4
Parents: 6b605a4
Author: Ryan Blue <bl...@apache.org>
Authored: Tue Nov 17 15:09:50 2015 -0800
Committer: Ryan Blue <bl...@apache.org>
Committed: Tue Nov 17 15:09:50 2015 -0800

----------------------------------------------------------------------
 .../avro/AvroIndexedRecordConverter.java        |  36 +----
 .../parquet/avro/AvroRecordConverter.java       |   8 +-
 .../parquet/avro/AvroSchemaConverter.java       |  10 +-
 .../parquet/avro/TestArrayCompatibility.java    | 148 ++++++++++++++++++-
 .../parquet/avro/TestAvroSchemaConverter.java   |  92 ++++++++++++
 5 files changed, 248 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/440882c6/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java
index a5e4141..06c66d6 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java
@@ -125,7 +125,8 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter
       }
     }
     if (avroField == null) {
-      throw new InvalidRecordException(String.format("Parquet/Avro schema mismatch. Avro field '%s' not found.",
+      throw new InvalidRecordException(String.format(
+          "Parquet/Avro schema mismatch. Avro field '%s' not found.",
           parquetFieldName));
     }
     return avroField;
@@ -313,7 +314,7 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter
       Type repeatedType = type.getType(0);
       // always determine whether the repeated type is the element type by
       // matching it against the element schema.
-      if (isElementType(repeatedType, elementSchema)) {
+      if (AvroRecordConverter.isElementType(repeatedType, elementSchema)) {
         // the element type is the repeated type (and required)
         converter = newConverter(elementSchema, repeatedType, model, new ParentValueContainer() {
           @Override
@@ -344,37 +345,6 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter
     }
 
     /**
-     * Returns whether the given type is the element type of a list or is a
-     * synthetic group with one field that is the element type. This is
-     * determined by checking whether the type can be a synthetic group and by
-     * checking whether a potential synthetic group matches the expected schema.
-     * <p>
-     * Unlike {@link AvroSchemaConverter#isElementType(Type, String)}, this
-     * method never guesses because the expected schema is known.
-     *
-     * @param repeatedType a type that may be the element type
-     * @param elementSchema the expected Schema for list elements
-     * @return {@code true} if the repeatedType is the element schema
-     */
-    static boolean isElementType(Type repeatedType, Schema elementSchema) {
-      if (repeatedType.isPrimitive() ||
-          repeatedType.asGroupType().getFieldCount() > 1) {
-        // The repeated type must be the element type because it is an invalid
-        // synthetic wrapper (must be a group with one field).
-        return true;
-      } else if (elementSchema != null &&
-          elementSchema.getType() == Schema.Type.RECORD &&
-          elementSchema.getFields().size() == 1 &&
-          elementSchema.getFields().get(0).name().equals(
-              repeatedType.asGroupType().getFieldName(0))) {
-        // The repeated type must be the element type because it matches the
-        // structure of the Avro element's schema.
-        return true;
-      }
-      return false;
-    }
-
-    /**
      * Converter for list elements.
      *
      * <pre>

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/440882c6/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
index 57ad18a..61d7d8e 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
@@ -50,6 +50,7 @@ import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.Type;
 
+import static org.apache.parquet.schema.Type.Repetition.REPEATED;
 import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
 
 /**
@@ -744,11 +745,12 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
    * @param elementSchema the expected Schema for list elements
    * @return {@code true} if the repeatedType is the element schema
    */
-  private static boolean isElementType(Type repeatedType, Schema elementSchema) {
+  static boolean isElementType(Type repeatedType, Schema elementSchema) {
     if (repeatedType.isPrimitive() ||
-        repeatedType.asGroupType().getFieldCount() > 1) {
+        repeatedType.asGroupType().getFieldCount() > 1 ||
+        repeatedType.asGroupType().getType(0).isRepetition(REPEATED)) {
       // The repeated type must be the element type because it is an invalid
-      // synthetic wrapper (must be a group with one field).
+      // synthetic wrapper. Must be a group with one optional or required field
       return true;
     } else if (elementSchema != null &&
         elementSchema.getType() == Schema.Type.RECORD &&

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/440882c6/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
index 04fe3a7..6cfa8d1 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
@@ -36,6 +36,7 @@ import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;
 import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE_DEFAULT;
 import static org.apache.parquet.schema.OriginalType.*;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*;
+import static org.apache.parquet.schema.Type.Repetition.REPEATED;
 
 /**
  * <p>
@@ -135,7 +136,7 @@ public class AvroSchemaConverter {
     } else if (type.equals(Schema.Type.ARRAY)) {
       if (writeOldListStructure) {
         return ConversionPatterns.listType(repetition, fieldName,
-            convertField("array", schema.getElementType(), Type.Repetition.REPEATED));
+            convertField("array", schema.getElementType(), REPEATED));
       } else {
         return ConversionPatterns.listOfElements(repetition, fieldName,
             convertField(AvroWriteSupport.LIST_ELEMENT_NAME, schema.getElementType()));
@@ -213,7 +214,7 @@ public class AvroSchemaConverter {
     List<Schema.Field> fields = new ArrayList<Schema.Field>();
     for (Type parquetType : parquetFields) {
       Schema fieldSchema = convertField(parquetType);
-      if (parquetType.isRepetition(Type.Repetition.REPEATED)) {
+      if (parquetType.isRepetition(REPEATED)) {
         throw new UnsupportedOperationException("REPEATED not supported outside LIST or MAP. Type: " + parquetType);
       } else if (parquetType.isRepetition(Type.Repetition.OPTIONAL)) {
         fields.add(new Schema.Field(parquetType.getName(), optional(fieldSchema), null,
@@ -282,7 +283,7 @@ public class AvroSchemaConverter {
               throw new UnsupportedOperationException("Invalid list type " + parquetGroupType);
             }
             Type repeatedType = parquetGroupType.getType(0);
-            if (!repeatedType.isRepetition(Type.Repetition.REPEATED)) {
+            if (!repeatedType.isRepetition(REPEATED)) {
               throw new UnsupportedOperationException("Invalid list type " + parquetGroupType);
             }
             if (isElementType(repeatedType, parquetGroupType.getName())) {
@@ -302,7 +303,7 @@ public class AvroSchemaConverter {
               throw new UnsupportedOperationException("Invalid map type " + parquetGroupType);
             }
             GroupType mapKeyValType = parquetGroupType.getType(0).asGroupType();
-            if (!mapKeyValType.isRepetition(Type.Repetition.REPEATED) ||
+            if (!mapKeyValType.isRepetition(REPEATED) ||
                 mapKeyValType.getFieldCount()!=2) {
               throw new UnsupportedOperationException("Invalid map type " + parquetGroupType);
             }
@@ -348,6 +349,7 @@ public class AvroSchemaConverter {
         // can't be a synthetic layer because it would be invalid
         repeatedType.isPrimitive() ||
         repeatedType.asGroupType().getFieldCount() > 1 ||
+        repeatedType.asGroupType().getType(0).isRepetition(REPEATED) ||
         // known patterns without the synthetic layer
         repeatedType.getName().equals("array") ||
         repeatedType.getName().equals(parentName + "_tuple") ||

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/440882c6/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java
index c4585a7..9c29e50 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java
@@ -567,9 +567,9 @@ public class TestArrayCompatibility {
   }
 
   @Test
-  public void testAvroCompatRequiredGroupInList() throws Exception {
+  public void testAvroCompatOptionalGroupInList() throws Exception {
     Path test = writeDirect(
-        "message AvroCompatRequiredGroupInList {" +
+        "message AvroCompatOptionalGroupInList {" +
             "  optional group locations (LIST) {" +
             "    repeated group array {" +
             "      optional group element {" +
@@ -634,7 +634,7 @@ public class TestArrayCompatibility {
 
     // old behavior - assume that the repeated type is the element type
     Schema elementRecord = record("array", optionalField("element", location));
-    Schema oldSchema = record("AvroCompatRequiredGroupInList",
+    Schema oldSchema = record("AvroCompatOptionalGroupInList",
         optionalField("locations", array(elementRecord)));
     GenericRecord oldRecord = instance(oldSchema,
         "locations", Arrays.asList(
@@ -649,9 +649,9 @@ public class TestArrayCompatibility {
   }
 
   @Test
-  public void testAvroCompatRequiredGroupInListWithSchema() throws Exception {
+  public void testAvroCompatOptionalGroupInListWithSchema() throws Exception {
     Path test = writeDirect(
-        "message AvroCompatRequiredGroupInListWithSchema {" +
+        "message AvroCompatOptionalGroupInListWithSchema {" +
             "  optional group locations (LIST) {" +
             "    repeated group array {" +
             "      optional group element {" +
@@ -714,7 +714,7 @@ public class TestArrayCompatibility {
         field("latitude", primitive(Schema.Type.DOUBLE)),
         field("longitude", primitive(Schema.Type.DOUBLE)));
 
-    Schema newSchema = record("HiveCompatOptionalGroupInList",
+    Schema newSchema = record("AvroCompatOptionalGroupInListWithSchema",
         optionalField("locations", array(optional(location))));
     GenericRecord newRecord = instance(newSchema,
         "locations", Arrays.asList(
@@ -738,6 +738,142 @@ public class TestArrayCompatibility {
   }
 
   @Test
+  public void testAvroCompatListInList() throws Exception {
+    Path test = writeDirect(
+        "message AvroCompatListInList {" +
+            "  optional group listOfLists (LIST) {" +
+            "    repeated group array (LIST) {" +
+            "      repeated int32 array;" +
+            "    }" +
+            "  }" +
+            "}",
+        new DirectWriter() {
+          @Override
+          public void write(RecordConsumer rc) {
+            rc.startMessage();
+            rc.startField("locations", 0);
+
+            rc.startGroup();
+            rc.startField("array", 0); // start writing array contents
+
+            rc.startGroup();
+            rc.startField("array", 0); // start writing inner array contents
+
+            // write [34, 35, 36]
+            rc.addInteger(34);
+            rc.addInteger(35);
+            rc.addInteger(36);
+
+            rc.endField("array", 0); // finished writing inner array contents
+            rc.endGroup();
+
+            // write an empty list
+            rc.startGroup();
+            rc.endGroup();
+
+            rc.startGroup();
+            rc.startField("array", 0); // start writing inner array contents
+
+            // write [32, 33, 34]
+            rc.addInteger(32);
+            rc.addInteger(33);
+            rc.addInteger(34);
+
+            rc.endField("array", 0); // finished writing inner array contents
+            rc.endGroup();
+
+            rc.endField("array", 0); // finished writing array contents
+            rc.endGroup();
+
+            rc.endField("locations", 0);
+            rc.endMessage();
+          }
+        });
+
+    Schema listOfLists = array(array(primitive(Schema.Type.INT)));
+    Schema oldSchema = record("AvroCompatListInList",
+        optionalField("listOfLists", listOfLists));
+
+    GenericRecord oldRecord = instance(oldSchema,
+        "listOfLists", Arrays.asList(
+            Arrays.asList(34, 35, 36),
+            Arrays.asList(),
+            Arrays.asList(32, 33, 34)));
+
+    // both should detect the "array" name
+    assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);
+    assertReaderContains(newBehaviorReader(test), oldSchema, oldRecord);
+  }
+
+  @Test
+  public void testThriftCompatListInList() throws Exception {
+    Path test = writeDirect(
+        "message ThriftCompatListInList {" +
+            "  optional group listOfLists (LIST) {" +
+            "    repeated group listOfLists_tuple (LIST) {" +
+            "      repeated int32 listOfLists_tuple_tuple;" +
+            "    }" +
+            "  }" +
+            "}",
+        new DirectWriter() {
+          @Override
+          public void write(RecordConsumer rc) {
+            rc.startMessage();
+            rc.startField("locations", 0);
+
+            rc.startGroup();
+            rc.startField("listOfLists_tuple", 0); // start writing array contents
+
+            rc.startGroup();
+            rc.startField("listOfLists_tuple_tuple", 0); // start writing inner array contents
+
+            // write [34, 35, 36]
+            rc.addInteger(34);
+            rc.addInteger(35);
+            rc.addInteger(36);
+
+            rc.endField("listOfLists_tuple_tuple", 0); // finished writing inner array contents
+            rc.endGroup();
+
+            // write an empty list
+            rc.startGroup();
+            rc.endGroup();
+
+            rc.startGroup();
+            rc.startField("listOfLists_tuple_tuple", 0); // start writing inner array contents
+
+            // write [32, 33, 34]
+            rc.addInteger(32);
+            rc.addInteger(33);
+            rc.addInteger(34);
+
+            rc.endField("listOfLists_tuple_tuple", 0); // finished writing inner array contents
+            rc.endGroup();
+
+            rc.endField("listOfLists_tuple", 0); // finished writing array contents
+            rc.endGroup();
+
+            rc.endField("locations", 0);
+            rc.endMessage();
+          }
+        });
+
+    Schema listOfLists = array(array(primitive(Schema.Type.INT)));
+    Schema oldSchema = record("ThriftCompatListInList",
+        optionalField("listOfLists", listOfLists));
+
+    GenericRecord oldRecord = instance(oldSchema,
+        "listOfLists", Arrays.asList(
+            Arrays.asList(34, 35, 36),
+            Arrays.asList(),
+            Arrays.asList(32, 33, 34)));
+
+    // both should detect the "_tuple" names
+    assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);
+    assertReaderContains(newBehaviorReader(test), oldSchema, oldRecord);
+  }
+
+  @Test
   public void testThriftCompatRequiredGroupInList() throws Exception {
     Path test = writeDirect(
         "message ThriftCompatRequiredGroupInList {" +

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/440882c6/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
index 6c802a6..b393615 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
@@ -394,6 +394,98 @@ public class TestAvroSchemaConverter {
   }
 
   @Test
+  public void testOldAvroListOfLists() throws Exception {
+    Schema listOfLists = optional(Schema.createArray(Schema.createArray(
+        Schema.create(Schema.Type.INT))));
+    Schema schema = Schema.createRecord("AvroCompatListInList", null, null, false);
+    schema.setFields(Lists.newArrayList(
+        new Schema.Field("listOfLists", listOfLists, null, NullNode.getInstance())
+    ));
+    System.err.println("Avro schema: " + schema.toString(true));
+
+    testRoundTripConversion(schema,
+        "message AvroCompatListInList {\n" +
+            "  optional group listOfLists (LIST) {\n" +
+            "    repeated group array (LIST) {\n" +
+            "      repeated int32 array;\n" +
+            "    }\n" +
+            "  }\n" +
+            "}");
+    // Cannot use round-trip assertion because 3-level representation is used
+    testParquetToAvroConversion(NEW_BEHAVIOR, schema,
+        "message AvroCompatListInList {\n" +
+            "  optional group listOfLists (LIST) {\n" +
+            "    repeated group array (LIST) {\n" +
+            "      repeated int32 array;\n" +
+            "    }\n" +
+            "  }\n" +
+            "}");
+  }
+
+  @Test
+  public void testOldThriftListOfLists() throws Exception {
+    Schema listOfLists = optional(Schema.createArray(Schema.createArray(
+        Schema.create(Schema.Type.INT))));
+    Schema schema = Schema.createRecord("ThriftCompatListInList", null, null, false);
+    schema.setFields(Lists.newArrayList(
+        new Schema.Field("listOfLists", listOfLists, null, NullNode.getInstance())
+    ));
+    System.err.println("Avro schema: " + schema.toString(true));
+
+    // Cannot use round-trip assertion because repeated group names differ
+    testParquetToAvroConversion(schema,
+        "message ThriftCompatListInList {\n" +
+            "  optional group listOfLists (LIST) {\n" +
+            "    repeated group listOfLists_tuple (LIST) {\n" +
+            "      repeated int32 listOfLists_tuple_tuple;\n" +
+            "    }\n" +
+            "  }\n" +
+            "}");
+    // Cannot use round-trip assertion because 3-level representation is used
+    testParquetToAvroConversion(NEW_BEHAVIOR, schema,
+        "message ThriftCompatListInList {\n" +
+        "  optional group listOfLists (LIST) {\n" +
+        "    repeated group listOfLists_tuple (LIST) {\n" +
+        "      repeated int32 listOfLists_tuple_tuple;\n" +
+        "    }\n" +
+        "  }\n" +
+        "}");
+  }
+
+  @Test
+  public void testUnknownTwoLevelListOfLists() throws Exception {
+    // This tests the case where we don't detect a 2-level list by the repeated
+    // group's name, but it must be 2-level because the repeated group doesn't
+    // contain an optional or repeated element as required for 3-level lists
+    Schema listOfLists = optional(Schema.createArray(Schema.createArray(
+        Schema.create(Schema.Type.INT))));
+    Schema schema = Schema.createRecord("UnknownTwoLevelListInList", null, null, false);
+    schema.setFields(Lists.newArrayList(
+        new Schema.Field("listOfLists", listOfLists, null, NullNode.getInstance())
+    ));
+    System.err.println("Avro schema: " + schema.toString(true));
+
+    // Cannot use round-trip assertion because repeated group names differ
+    testParquetToAvroConversion(schema,
+        "message UnknownTwoLevelListInList {\n" +
+            "  optional group listOfLists (LIST) {\n" +
+            "    repeated group mylist (LIST) {\n" +
+            "      repeated int32 innerlist;\n" +
+            "    }\n" +
+            "  }\n" +
+            "}");
+    // Cannot use round-trip assertion because 3-level representation is used
+    testParquetToAvroConversion(NEW_BEHAVIOR, schema,
+        "message UnknownTwoLevelListInList {\n" +
+            "  optional group listOfLists (LIST) {\n" +
+            "    repeated group mylist (LIST) {\n" +
+            "      repeated int32 innerlist;\n" +
+            "    }\n" +
+            "  }\n" +
+            "}");
+  }
+
+  @Test
   public void testParquetMapWithoutMapKeyValueAnnotation() throws Exception {
     Schema schema = Schema.createRecord("myrecord", null, null, false);
     Schema map = Schema.createMap(Schema.create(Schema.Type.INT));