You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by zi...@apache.org on 2019/05/28 12:53:07 UTC

[parquet-mr] branch master updated: PARQUET-1441: SchemaParseException: Can't redefine: list in AvroIndexedRecordConverter (#560)

This is an automated email from the ASF dual-hosted git repository.

zivanfi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e5fda5  PARQUET-1441: SchemaParseException: Can't redefine: list in AvroIndexedRecordConverter (#560)
1e5fda5 is described below

commit 1e5fda5310687b0856e74f00a4ea420b6b1ab34d
Author: Nándor Kollár <na...@users.noreply.github.com>
AuthorDate: Tue May 28 14:53:02 2019 +0200

    PARQUET-1441: SchemaParseException: Can't redefine: list in AvroIndexedRecordConverter (#560)
    
    Parquet Avro reader couldn't convert a schema where a group field name is reused
    in an inner structure. The converter created an Avro record schema in this case,
    but in Avro record types should have a unique name, therefore the result was an invalid Avro
    schema. This patch fixes this case by adding a namespace for the record if the name was
    defined before, this way making the record names unique.
---
 .../apache/parquet/avro/AvroSchemaConverter.java   | 27 +++++-----
 .../java/org/apache/parquet/avro/AvroTestUtil.java |  8 ++-
 .../parquet/avro/TestAvroSchemaConverter.java      | 63 ++++++++++++++++++++++
 .../org/apache/parquet/avro/TestReadWrite.java     | 40 ++++++++++++--
 parquet-avro/src/test/resources/nested_array.avsc  | 39 ++++++++++++++
 5 files changed, 160 insertions(+), 17 deletions(-)

diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
index b4bac2f..0cece97 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
@@ -34,7 +34,9 @@ import org.apache.parquet.schema.Types;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
 import static java.util.Optional.empty;
@@ -243,17 +245,18 @@ public class AvroSchemaConverter {
   }
 
   public Schema convert(MessageType parquetSchema) {
-    return convertFields(parquetSchema.getName(), parquetSchema.getFields());
+    return convertFields(parquetSchema.getName(), parquetSchema.getFields(), new HashMap<>());
   }
 
   Schema convert(GroupType parquetSchema) {
-    return convertFields(parquetSchema.getName(), parquetSchema.getFields());
+    return convertFields(parquetSchema.getName(), parquetSchema.getFields(), new HashMap<>());
   }
 
-  private Schema convertFields(String name, List<Type> parquetFields) {
+  private Schema convertFields(String name, List<Type> parquetFields, Map<String, Integer> names) {
     List<Schema.Field> fields = new ArrayList<Schema.Field>();
+    Integer nameCount = names.merge(name, 1, (oldValue, value) -> oldValue + 1);
     for (Type parquetType : parquetFields) {
-      Schema fieldSchema = convertField(parquetType);
+      Schema fieldSchema = convertField(parquetType, names);
       if (parquetType.isRepetition(REPEATED)) {
         throw new UnsupportedOperationException("REPEATED not supported outside LIST or MAP. Type: " + parquetType);
       } else if (parquetType.isRepetition(Type.Repetition.OPTIONAL)) {
@@ -264,12 +267,12 @@ public class AvroSchemaConverter {
             parquetType.getName(), fieldSchema, null, (Object) null));
       }
     }
-    Schema schema = Schema.createRecord(name, null, null, false);
+    Schema schema = Schema.createRecord(name, null, nameCount > 1 ? name + nameCount : null, false);
     schema.setFields(fields);
     return schema;
   }
 
-  private Schema convertField(final Type parquetType) {
+  private Schema convertField(final Type parquetType, Map<String, Integer> names) {
     if (parquetType.isPrimitive()) {
       final PrimitiveType asPrimitive = parquetType.asPrimitiveType();
       final PrimitiveTypeName parquetPrimitiveTypeName =
@@ -342,13 +345,13 @@ public class AvroSchemaConverter {
             }
             if (isElementType(repeatedType, parquetGroupType.getName())) {
               // repeated element types are always required
-              return of(Schema.createArray(convertField(repeatedType)));
+              return of(Schema.createArray(convertField(repeatedType, names)));
             } else {
               Type elementType = repeatedType.asGroupType().getType(0);
               if (elementType.isRepetition(Type.Repetition.OPTIONAL)) {
-                return of(Schema.createArray(optional(convertField(elementType))));
+                return of(Schema.createArray(optional(convertField(elementType, names))));
               } else {
-                return of(Schema.createArray(convertField(elementType)));
+                return of(Schema.createArray(convertField(elementType, names)));
               }
             }
           }
@@ -382,9 +385,9 @@ public class AvroSchemaConverter {
             }
             Type valueType = mapKeyValType.getType(1);
             if (valueType.isRepetition(Type.Repetition.OPTIONAL)) {
-              return of(Schema.createMap(optional(convertField(valueType))));
+              return of(Schema.createMap(optional(convertField(valueType, names))));
             } else {
-              return of(Schema.createMap(convertField(valueType)));
+              return of(Schema.createMap(convertField(valueType, names)));
             }
           }
 
@@ -395,7 +398,7 @@ public class AvroSchemaConverter {
         }).orElseThrow(() -> new UnsupportedOperationException("Cannot convert Parquet type " + parquetType));
       } else {
         // if no original type then it's a record
-        return convertFields(parquetGroupType.getName(), parquetGroupType.getFields());
+        return convertFields(parquetGroupType.getName(), parquetGroupType.getFields(), names);
       }
     }
   }
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/AvroTestUtil.java b/parquet-avro/src/test/java/org/apache/parquet/avro/AvroTestUtil.java
index f4682d6..39c6d2a 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/AvroTestUtil.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/AvroTestUtil.java
@@ -37,12 +37,16 @@ import org.junit.rules.TemporaryFolder;
 
 public class AvroTestUtil {
 
-  public static Schema record(String name, Schema.Field... fields) {
-    Schema record = Schema.createRecord(name, null, null, false);
+  public static Schema record(String name, String namespace, Schema.Field... fields) {
+    Schema record = Schema.createRecord(name, null, namespace, false);
     record.setFields(Arrays.asList(fields));
     return record;
   }
 
+  public static Schema record(String name, Schema.Field... fields) {
+    return record(name, null, fields);
+  }
+
   public static Schema.Field field(String name, Schema schema) {
     return new Schema.Field(name, schema, null, null);
   }
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
index bfaeec3..2548408 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
@@ -37,6 +37,10 @@ import java.util.Collections;
 
 import static org.apache.avro.Schema.Type.INT;
 import static org.apache.avro.Schema.Type.LONG;
+import static org.apache.parquet.avro.AvroTestUtil.field;
+import static org.apache.parquet.avro.AvroTestUtil.optionalField;
+import static org.apache.parquet.avro.AvroTestUtil.primitive;
+import static org.apache.parquet.avro.AvroTestUtil.record;
 import static org.apache.parquet.schema.OriginalType.DATE;
 import static org.apache.parquet.schema.OriginalType.TIMESTAMP_MICROS;
 import static org.apache.parquet.schema.OriginalType.TIMESTAMP_MILLIS;
@@ -728,6 +732,65 @@ public class TestAvroSchemaConverter {
     }
   }
 
+  @Test
+  public void testReuseNameInNestedStructure() throws Exception {
+    Schema innerA1 = record("a1", "a12",
+      field("a4", primitive(Schema.Type.FLOAT)));
+
+    Schema outerA1 = record("a1",
+      field("a2", primitive(Schema.Type.FLOAT)),
+      optionalField("a1", innerA1));
+    Schema schema = record("Message",
+      optionalField("a1", outerA1));
+
+    String parquetSchema = "message Message {\n" +
+        "      optional group a1 {\n" +
+        "        required float a2;\n" +
+        "        optional group a1 {\n" +
+        "          required float a4;\n"+
+        "         }\n" +
+        "      }\n" +
+        "}\n";
+
+    testParquetToAvroConversion(schema, parquetSchema);
+    testParquetToAvroConversion(NEW_BEHAVIOR, schema, parquetSchema);
+  }
+
+  @Test
+  public void testReuseNameInNestedStructureAtSameLevel() throws Exception {
+    Schema a2 = record("a2",
+      field("a4", primitive(Schema.Type.FLOAT)));
+    Schema a22 = record("a2", "a22",
+      field("a4", primitive(Schema.Type.FLOAT)),
+      field("a5", primitive(Schema.Type.FLOAT)));
+
+    Schema a1 = record("a1",
+      optionalField("a2", a2));
+    Schema a3 = record("a3",
+      optionalField("a2", a22));
+
+    Schema schema = record("Message",
+      optionalField("a1", a1),
+      optionalField("a3", a3));
+
+    String parquetSchema = "message Message {\n" +
+      "      optional group a1 {\n" +
+      "        optional group a2 {\n" +
+      "          required float a4;\n"+
+      "         }\n" +
+      "      }\n" +
+      "      optional group a3 {\n" +
+      "        optional group a2 {\n" +
+      "          required float a4;\n"+
+      "          required float a5;\n"+
+      "         }\n" +
+      "      }\n" +
+      "}\n";
+
+    testParquetToAvroConversion(schema, parquetSchema);
+    testParquetToAvroConversion(NEW_BEHAVIOR, schema, parquetSchema);
+  }
+
   public static Schema optional(Schema original) {
     return Schema.createUnion(Lists.newArrayList(
         Schema.create(Schema.Type.NULL),
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
index 396b8a4..60ff269 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -688,6 +688,40 @@ public class TestReadWrite {
     }
   }
 
+  @Test
+  public void testNestedLists() throws Exception {
+    Schema schema = new Schema.Parser().parse(
+      Resources.getResource("nested_array.avsc").openStream());
+    Path file = new Path(createTempFile().getPath());
+
+    // Parquet writer
+    ParquetWriter parquetWriter = AvroParquetWriter.builder(file).withSchema(schema)
+      .withConf(testConf)
+      .build();
+
+    Schema innerRecordSchema = schema.getField("l1").schema().getTypes()
+      .get(1).getElementType().getTypes().get(1);
+
+    GenericRecord record = new GenericRecordBuilder(schema)
+      .set("l1", Collections.singletonList(
+        new GenericRecordBuilder(innerRecordSchema).set("l2", Collections.singletonList("hello")).build()
+      ))
+      .build();
+
+    parquetWriter.write(record);
+    parquetWriter.close();
+
+    AvroParquetReader<GenericRecord> reader = new AvroParquetReader(testConf, file);
+    GenericRecord nextRecord = reader.read();
+
+    assertNotNull(nextRecord);
+    assertNotNull(nextRecord.get("l1"));
+    List l1List = (List) nextRecord.get("l1");
+    assertNotNull(l1List.get(0));
+    List l2List = (List) ((GenericRecord) l1List.get(0)).get("l2");
+    assertEquals(str("hello"), l2List.get(0));
+  }
+
   private File createTempFile() throws IOException {
     File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
     tmp.deleteOnExit();
diff --git a/parquet-avro/src/test/resources/nested_array.avsc b/parquet-avro/src/test/resources/nested_array.avsc
new file mode 100644
index 0000000..090d325
--- /dev/null
+++ b/parquet-avro/src/test/resources/nested_array.avsc
@@ -0,0 +1,39 @@
+{
+  "type": "record",
+  "name": "Message",
+  "fields": [
+    {
+      "name": "l1",
+      "type": [
+        "null",
+        {
+          "type": "array",
+          "items": [
+            "null",
+            {
+              "type": "record",
+              "name": "element",
+              "fields": [
+                {
+                  "name": "l2",
+                  "type": [
+                    "null",
+                    {
+                      "type": "array",
+                      "items": [
+                        "null",
+                        "string"
+                      ]
+                    }
+                  ],
+                  "default": null
+                }
+              ]
+            }
+          ]
+        }
+      ],
+      "default": null
+    }
+  ]
+}