You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by zi...@apache.org on 2019/05/28 12:53:07 UTC
[parquet-mr] branch master updated: PARQUET-1441:
SchemaParseException: Can't redefine: list in AvroIndexedRecordConverter
(#560)
This is an automated email from the ASF dual-hosted git repository.
zivanfi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new 1e5fda5 PARQUET-1441: SchemaParseException: Can't redefine: list in AvroIndexedRecordConverter (#560)
1e5fda5 is described below
commit 1e5fda5310687b0856e74f00a4ea420b6b1ab34d
Author: Nándor Kollár <na...@users.noreply.github.com>
AuthorDate: Tue May 28 14:53:02 2019 +0200
PARQUET-1441: SchemaParseException: Can't redefine: list in AvroIndexedRecordConverter (#560)
Parquet Avro reader couldn't convert a schema where a group field name is reused
in an inner structure. The converter created an Avro record schema in this case,
but in Avro record types should have a unique name, therefore the result was an invalid Avro
schema. This patch fixes this case by adding a namespace for the record if the name was
defined before, this way making the record names unique.
---
.../apache/parquet/avro/AvroSchemaConverter.java | 27 +++++-----
.../java/org/apache/parquet/avro/AvroTestUtil.java | 8 ++-
.../parquet/avro/TestAvroSchemaConverter.java | 63 ++++++++++++++++++++++
.../org/apache/parquet/avro/TestReadWrite.java | 40 ++++++++++++--
parquet-avro/src/test/resources/nested_array.avsc | 39 ++++++++++++++
5 files changed, 160 insertions(+), 17 deletions(-)
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
index b4bac2f..0cece97 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
@@ -34,7 +34,9 @@ import org.apache.parquet.schema.Types;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import static java.util.Optional.empty;
@@ -243,17 +245,18 @@ public class AvroSchemaConverter {
}
public Schema convert(MessageType parquetSchema) {
- return convertFields(parquetSchema.getName(), parquetSchema.getFields());
+ return convertFields(parquetSchema.getName(), parquetSchema.getFields(), new HashMap<>());
}
Schema convert(GroupType parquetSchema) {
- return convertFields(parquetSchema.getName(), parquetSchema.getFields());
+ return convertFields(parquetSchema.getName(), parquetSchema.getFields(), new HashMap<>());
}
- private Schema convertFields(String name, List<Type> parquetFields) {
+ private Schema convertFields(String name, List<Type> parquetFields, Map<String, Integer> names) {
List<Schema.Field> fields = new ArrayList<Schema.Field>();
+ Integer nameCount = names.merge(name, 1, (oldValue, value) -> oldValue + 1);
for (Type parquetType : parquetFields) {
- Schema fieldSchema = convertField(parquetType);
+ Schema fieldSchema = convertField(parquetType, names);
if (parquetType.isRepetition(REPEATED)) {
throw new UnsupportedOperationException("REPEATED not supported outside LIST or MAP. Type: " + parquetType);
} else if (parquetType.isRepetition(Type.Repetition.OPTIONAL)) {
@@ -264,12 +267,12 @@ public class AvroSchemaConverter {
parquetType.getName(), fieldSchema, null, (Object) null));
}
}
- Schema schema = Schema.createRecord(name, null, null, false);
+ Schema schema = Schema.createRecord(name, null, nameCount > 1 ? name + nameCount : null, false);
schema.setFields(fields);
return schema;
}
- private Schema convertField(final Type parquetType) {
+ private Schema convertField(final Type parquetType, Map<String, Integer> names) {
if (parquetType.isPrimitive()) {
final PrimitiveType asPrimitive = parquetType.asPrimitiveType();
final PrimitiveTypeName parquetPrimitiveTypeName =
@@ -342,13 +345,13 @@ public class AvroSchemaConverter {
}
if (isElementType(repeatedType, parquetGroupType.getName())) {
// repeated element types are always required
- return of(Schema.createArray(convertField(repeatedType)));
+ return of(Schema.createArray(convertField(repeatedType, names)));
} else {
Type elementType = repeatedType.asGroupType().getType(0);
if (elementType.isRepetition(Type.Repetition.OPTIONAL)) {
- return of(Schema.createArray(optional(convertField(elementType))));
+ return of(Schema.createArray(optional(convertField(elementType, names))));
} else {
- return of(Schema.createArray(convertField(elementType)));
+ return of(Schema.createArray(convertField(elementType, names)));
}
}
}
@@ -382,9 +385,9 @@ public class AvroSchemaConverter {
}
Type valueType = mapKeyValType.getType(1);
if (valueType.isRepetition(Type.Repetition.OPTIONAL)) {
- return of(Schema.createMap(optional(convertField(valueType))));
+ return of(Schema.createMap(optional(convertField(valueType, names))));
} else {
- return of(Schema.createMap(convertField(valueType)));
+ return of(Schema.createMap(convertField(valueType, names)));
}
}
@@ -395,7 +398,7 @@ public class AvroSchemaConverter {
}).orElseThrow(() -> new UnsupportedOperationException("Cannot convert Parquet type " + parquetType));
} else {
// if no original type then it's a record
- return convertFields(parquetGroupType.getName(), parquetGroupType.getFields());
+ return convertFields(parquetGroupType.getName(), parquetGroupType.getFields(), names);
}
}
}
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/AvroTestUtil.java b/parquet-avro/src/test/java/org/apache/parquet/avro/AvroTestUtil.java
index f4682d6..39c6d2a 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/AvroTestUtil.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/AvroTestUtil.java
@@ -37,12 +37,16 @@ import org.junit.rules.TemporaryFolder;
public class AvroTestUtil {
- public static Schema record(String name, Schema.Field... fields) {
- Schema record = Schema.createRecord(name, null, null, false);
+ public static Schema record(String name, String namespace, Schema.Field... fields) {
+ Schema record = Schema.createRecord(name, null, namespace, false);
record.setFields(Arrays.asList(fields));
return record;
}
+ public static Schema record(String name, Schema.Field... fields) {
+ return record(name, null, fields);
+ }
+
public static Schema.Field field(String name, Schema schema) {
return new Schema.Field(name, schema, null, null);
}
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
index bfaeec3..2548408 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
@@ -37,6 +37,10 @@ import java.util.Collections;
import static org.apache.avro.Schema.Type.INT;
import static org.apache.avro.Schema.Type.LONG;
+import static org.apache.parquet.avro.AvroTestUtil.field;
+import static org.apache.parquet.avro.AvroTestUtil.optionalField;
+import static org.apache.parquet.avro.AvroTestUtil.primitive;
+import static org.apache.parquet.avro.AvroTestUtil.record;
import static org.apache.parquet.schema.OriginalType.DATE;
import static org.apache.parquet.schema.OriginalType.TIMESTAMP_MICROS;
import static org.apache.parquet.schema.OriginalType.TIMESTAMP_MILLIS;
@@ -728,6 +732,65 @@ public class TestAvroSchemaConverter {
}
}
+ @Test
+ public void testReuseNameInNestedStructure() throws Exception {
+ Schema innerA1 = record("a1", "a12",
+ field("a4", primitive(Schema.Type.FLOAT)));
+
+ Schema outerA1 = record("a1",
+ field("a2", primitive(Schema.Type.FLOAT)),
+ optionalField("a1", innerA1));
+ Schema schema = record("Message",
+ optionalField("a1", outerA1));
+
+ String parquetSchema = "message Message {\n" +
+ " optional group a1 {\n" +
+ " required float a2;\n" +
+ " optional group a1 {\n" +
+ " required float a4;\n"+
+ " }\n" +
+ " }\n" +
+ "}\n";
+
+ testParquetToAvroConversion(schema, parquetSchema);
+ testParquetToAvroConversion(NEW_BEHAVIOR, schema, parquetSchema);
+ }
+
+ @Test
+ public void testReuseNameInNestedStructureAtSameLevel() throws Exception {
+ Schema a2 = record("a2",
+ field("a4", primitive(Schema.Type.FLOAT)));
+ Schema a22 = record("a2", "a22",
+ field("a4", primitive(Schema.Type.FLOAT)),
+ field("a5", primitive(Schema.Type.FLOAT)));
+
+ Schema a1 = record("a1",
+ optionalField("a2", a2));
+ Schema a3 = record("a3",
+ optionalField("a2", a22));
+
+ Schema schema = record("Message",
+ optionalField("a1", a1),
+ optionalField("a3", a3));
+
+ String parquetSchema = "message Message {\n" +
+ " optional group a1 {\n" +
+ " optional group a2 {\n" +
+ " required float a4;\n"+
+ " }\n" +
+ " }\n" +
+ " optional group a3 {\n" +
+ " optional group a2 {\n" +
+ " required float a4;\n"+
+ " required float a5;\n"+
+ " }\n" +
+ " }\n" +
+ "}\n";
+
+ testParquetToAvroConversion(schema, parquetSchema);
+ testParquetToAvroConversion(NEW_BEHAVIOR, schema, parquetSchema);
+ }
+
public static Schema optional(Schema original) {
return Schema.createUnion(Lists.newArrayList(
Schema.create(Schema.Type.NULL),
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
index 396b8a4..60ff269 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -688,6 +688,40 @@ public class TestReadWrite {
}
}
+ @Test
+ public void testNestedLists() throws Exception {
+ Schema schema = new Schema.Parser().parse(
+ Resources.getResource("nested_array.avsc").openStream());
+ Path file = new Path(createTempFile().getPath());
+
+ // Parquet writer
+ ParquetWriter parquetWriter = AvroParquetWriter.builder(file).withSchema(schema)
+ .withConf(testConf)
+ .build();
+
+ Schema innerRecordSchema = schema.getField("l1").schema().getTypes()
+ .get(1).getElementType().getTypes().get(1);
+
+ GenericRecord record = new GenericRecordBuilder(schema)
+ .set("l1", Collections.singletonList(
+ new GenericRecordBuilder(innerRecordSchema).set("l2", Collections.singletonList("hello")).build()
+ ))
+ .build();
+
+ parquetWriter.write(record);
+ parquetWriter.close();
+
+ AvroParquetReader<GenericRecord> reader = new AvroParquetReader(testConf, file);
+ GenericRecord nextRecord = reader.read();
+
+ assertNotNull(nextRecord);
+ assertNotNull(nextRecord.get("l1"));
+ List l1List = (List) nextRecord.get("l1");
+ assertNotNull(l1List.get(0));
+ List l2List = (List) ((GenericRecord) l1List.get(0)).get("l2");
+ assertEquals(str("hello"), l2List.get(0));
+ }
+
private File createTempFile() throws IOException {
File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
tmp.deleteOnExit();
diff --git a/parquet-avro/src/test/resources/nested_array.avsc b/parquet-avro/src/test/resources/nested_array.avsc
new file mode 100644
index 0000000..090d325
--- /dev/null
+++ b/parquet-avro/src/test/resources/nested_array.avsc
@@ -0,0 +1,39 @@
+{
+ "type": "record",
+ "name": "Message",
+ "fields": [
+ {
+ "name": "l1",
+ "type": [
+ "null",
+ {
+ "type": "array",
+ "items": [
+ "null",
+ {
+ "type": "record",
+ "name": "element",
+ "fields": [
+ {
+ "name": "l2",
+ "type": [
+ "null",
+ {
+ "type": "array",
+ "items": [
+ "null",
+ "string"
+ ]
+ }
+ ],
+ "default": null
+ }
+ ]
+ }
+ ]
+ }
+ ],
+ "default": null
+ }
+ ]
+}