You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2015/11/18 00:10:02 UTC
parquet-mr git commit: PARQUET-364: Fix compatibility for Avro lists
of lists.
Repository: parquet-mr
Updated Branches:
refs/heads/master 6b605a4ea -> 440882c65
PARQUET-364: Fix compatibility for Avro lists of lists.
This fixes lists of lists that have been written with Avro's 2-level
representation. The conversion setup logic missed the case where the
inner field is repeated and cannot be the element in a 3-level list.
This also fixes the schema conversion for cases where an unknown
writer used a 2-level list of lists.
This is based on @liancheng's #264 but fixes the problem in a slightly different way.
Author: Ryan Blue <bl...@apache.org>
Closes #272 from rdblue/PARQUET-364-fix-avro-lists-of-lists and squashes the following commits:
41a70e0 [Ryan Blue] PARQUET-364: Fix compatibility for Avro lists of lists.
Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/440882c6
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/440882c6
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/440882c6
Branch: refs/heads/master
Commit: 440882c659967572311402c7fe534cf13d501cf4
Parents: 6b605a4
Author: Ryan Blue <bl...@apache.org>
Authored: Tue Nov 17 15:09:50 2015 -0800
Committer: Ryan Blue <bl...@apache.org>
Committed: Tue Nov 17 15:09:50 2015 -0800
----------------------------------------------------------------------
.../avro/AvroIndexedRecordConverter.java | 36 +----
.../parquet/avro/AvroRecordConverter.java | 8 +-
.../parquet/avro/AvroSchemaConverter.java | 10 +-
.../parquet/avro/TestArrayCompatibility.java | 148 ++++++++++++++++++-
.../parquet/avro/TestAvroSchemaConverter.java | 92 ++++++++++++
5 files changed, 248 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/440882c6/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java
index a5e4141..06c66d6 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java
@@ -125,7 +125,8 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter
}
}
if (avroField == null) {
- throw new InvalidRecordException(String.format("Parquet/Avro schema mismatch. Avro field '%s' not found.",
+ throw new InvalidRecordException(String.format(
+ "Parquet/Avro schema mismatch. Avro field '%s' not found.",
parquetFieldName));
}
return avroField;
@@ -313,7 +314,7 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter
Type repeatedType = type.getType(0);
// always determine whether the repeated type is the element type by
// matching it against the element schema.
- if (isElementType(repeatedType, elementSchema)) {
+ if (AvroRecordConverter.isElementType(repeatedType, elementSchema)) {
// the element type is the repeated type (and required)
converter = newConverter(elementSchema, repeatedType, model, new ParentValueContainer() {
@Override
@@ -344,37 +345,6 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter
}
/**
- * Returns whether the given type is the element type of a list or is a
- * synthetic group with one field that is the element type. This is
- * determined by checking whether the type can be a synthetic group and by
- * checking whether a potential synthetic group matches the expected schema.
- * <p>
- * Unlike {@link AvroSchemaConverter#isElementType(Type, String)}, this
- * method never guesses because the expected schema is known.
- *
- * @param repeatedType a type that may be the element type
- * @param elementSchema the expected Schema for list elements
- * @return {@code true} if the repeatedType is the element schema
- */
- static boolean isElementType(Type repeatedType, Schema elementSchema) {
- if (repeatedType.isPrimitive() ||
- repeatedType.asGroupType().getFieldCount() > 1) {
- // The repeated type must be the element type because it is an invalid
- // synthetic wrapper (must be a group with one field).
- return true;
- } else if (elementSchema != null &&
- elementSchema.getType() == Schema.Type.RECORD &&
- elementSchema.getFields().size() == 1 &&
- elementSchema.getFields().get(0).name().equals(
- repeatedType.asGroupType().getFieldName(0))) {
- // The repeated type must be the element type because it matches the
- // structure of the Avro element's schema.
- return true;
- }
- return false;
- }
-
- /**
* Converter for list elements.
*
* <pre>
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/440882c6/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
index 57ad18a..61d7d8e 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
@@ -50,6 +50,7 @@ import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
+import static org.apache.parquet.schema.Type.Repetition.REPEATED;
import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
/**
@@ -744,11 +745,12 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
* @param elementSchema the expected Schema for list elements
* @return {@code true} if the repeatedType is the element schema
*/
- private static boolean isElementType(Type repeatedType, Schema elementSchema) {
+ static boolean isElementType(Type repeatedType, Schema elementSchema) {
if (repeatedType.isPrimitive() ||
- repeatedType.asGroupType().getFieldCount() > 1) {
+ repeatedType.asGroupType().getFieldCount() > 1 ||
+ repeatedType.asGroupType().getType(0).isRepetition(REPEATED)) {
// The repeated type must be the element type because it is an invalid
- // synthetic wrapper (must be a group with one field).
+ // synthetic wrapper. Must be a group with one optional or required field
return true;
} else if (elementSchema != null &&
elementSchema.getType() == Schema.Type.RECORD &&
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/440882c6/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
index 04fe3a7..6cfa8d1 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
@@ -36,6 +36,7 @@ import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE_DEFAULT;
import static org.apache.parquet.schema.OriginalType.*;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*;
+import static org.apache.parquet.schema.Type.Repetition.REPEATED;
/**
* <p>
@@ -135,7 +136,7 @@ public class AvroSchemaConverter {
} else if (type.equals(Schema.Type.ARRAY)) {
if (writeOldListStructure) {
return ConversionPatterns.listType(repetition, fieldName,
- convertField("array", schema.getElementType(), Type.Repetition.REPEATED));
+ convertField("array", schema.getElementType(), REPEATED));
} else {
return ConversionPatterns.listOfElements(repetition, fieldName,
convertField(AvroWriteSupport.LIST_ELEMENT_NAME, schema.getElementType()));
@@ -213,7 +214,7 @@ public class AvroSchemaConverter {
List<Schema.Field> fields = new ArrayList<Schema.Field>();
for (Type parquetType : parquetFields) {
Schema fieldSchema = convertField(parquetType);
- if (parquetType.isRepetition(Type.Repetition.REPEATED)) {
+ if (parquetType.isRepetition(REPEATED)) {
throw new UnsupportedOperationException("REPEATED not supported outside LIST or MAP. Type: " + parquetType);
} else if (parquetType.isRepetition(Type.Repetition.OPTIONAL)) {
fields.add(new Schema.Field(parquetType.getName(), optional(fieldSchema), null,
@@ -282,7 +283,7 @@ public class AvroSchemaConverter {
throw new UnsupportedOperationException("Invalid list type " + parquetGroupType);
}
Type repeatedType = parquetGroupType.getType(0);
- if (!repeatedType.isRepetition(Type.Repetition.REPEATED)) {
+ if (!repeatedType.isRepetition(REPEATED)) {
throw new UnsupportedOperationException("Invalid list type " + parquetGroupType);
}
if (isElementType(repeatedType, parquetGroupType.getName())) {
@@ -302,7 +303,7 @@ public class AvroSchemaConverter {
throw new UnsupportedOperationException("Invalid map type " + parquetGroupType);
}
GroupType mapKeyValType = parquetGroupType.getType(0).asGroupType();
- if (!mapKeyValType.isRepetition(Type.Repetition.REPEATED) ||
+ if (!mapKeyValType.isRepetition(REPEATED) ||
mapKeyValType.getFieldCount()!=2) {
throw new UnsupportedOperationException("Invalid map type " + parquetGroupType);
}
@@ -348,6 +349,7 @@ public class AvroSchemaConverter {
// can't be a synthetic layer because it would be invalid
repeatedType.isPrimitive() ||
repeatedType.asGroupType().getFieldCount() > 1 ||
+ repeatedType.asGroupType().getType(0).isRepetition(REPEATED) ||
// known patterns without the synthetic layer
repeatedType.getName().equals("array") ||
repeatedType.getName().equals(parentName + "_tuple") ||
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/440882c6/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java
index c4585a7..9c29e50 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java
@@ -567,9 +567,9 @@ public class TestArrayCompatibility {
}
@Test
- public void testAvroCompatRequiredGroupInList() throws Exception {
+ public void testAvroCompatOptionalGroupInList() throws Exception {
Path test = writeDirect(
- "message AvroCompatRequiredGroupInList {" +
+ "message AvroCompatOptionalGroupInList {" +
" optional group locations (LIST) {" +
" repeated group array {" +
" optional group element {" +
@@ -634,7 +634,7 @@ public class TestArrayCompatibility {
// old behavior - assume that the repeated type is the element type
Schema elementRecord = record("array", optionalField("element", location));
- Schema oldSchema = record("AvroCompatRequiredGroupInList",
+ Schema oldSchema = record("AvroCompatOptionalGroupInList",
optionalField("locations", array(elementRecord)));
GenericRecord oldRecord = instance(oldSchema,
"locations", Arrays.asList(
@@ -649,9 +649,9 @@ public class TestArrayCompatibility {
}
@Test
- public void testAvroCompatRequiredGroupInListWithSchema() throws Exception {
+ public void testAvroCompatOptionalGroupInListWithSchema() throws Exception {
Path test = writeDirect(
- "message AvroCompatRequiredGroupInListWithSchema {" +
+ "message AvroCompatOptionalGroupInListWithSchema {" +
" optional group locations (LIST) {" +
" repeated group array {" +
" optional group element {" +
@@ -714,7 +714,7 @@ public class TestArrayCompatibility {
field("latitude", primitive(Schema.Type.DOUBLE)),
field("longitude", primitive(Schema.Type.DOUBLE)));
- Schema newSchema = record("HiveCompatOptionalGroupInList",
+ Schema newSchema = record("AvroCompatOptionalGroupInListWithSchema",
optionalField("locations", array(optional(location))));
GenericRecord newRecord = instance(newSchema,
"locations", Arrays.asList(
@@ -738,6 +738,142 @@ public class TestArrayCompatibility {
}
@Test
+ public void testAvroCompatListInList() throws Exception {
+ Path test = writeDirect(
+ "message AvroCompatListInList {" +
+ " optional group listOfLists (LIST) {" +
+ " repeated group array (LIST) {" +
+ " repeated int32 array;" +
+ " }" +
+ " }" +
+ "}",
+ new DirectWriter() {
+ @Override
+ public void write(RecordConsumer rc) {
+ rc.startMessage();
+ rc.startField("locations", 0);
+
+ rc.startGroup();
+ rc.startField("array", 0); // start writing array contents
+
+ rc.startGroup();
+ rc.startField("array", 0); // start writing inner array contents
+
+ // write [34, 35, 36]
+ rc.addInteger(34);
+ rc.addInteger(35);
+ rc.addInteger(36);
+
+ rc.endField("array", 0); // finished writing inner array contents
+ rc.endGroup();
+
+ // write an empty list
+ rc.startGroup();
+ rc.endGroup();
+
+ rc.startGroup();
+ rc.startField("array", 0); // start writing inner array contents
+
+ // write [32, 33, 34]
+ rc.addInteger(32);
+ rc.addInteger(33);
+ rc.addInteger(34);
+
+ rc.endField("array", 0); // finished writing inner array contents
+ rc.endGroup();
+
+ rc.endField("array", 0); // finished writing array contents
+ rc.endGroup();
+
+ rc.endField("locations", 0);
+ rc.endMessage();
+ }
+ });
+
+ Schema listOfLists = array(array(primitive(Schema.Type.INT)));
+ Schema oldSchema = record("AvroCompatListInList",
+ optionalField("listOfLists", listOfLists));
+
+ GenericRecord oldRecord = instance(oldSchema,
+ "listOfLists", Arrays.asList(
+ Arrays.asList(34, 35, 36),
+ Arrays.asList(),
+ Arrays.asList(32, 33, 34)));
+
+ // both should detect the "array" name
+ assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);
+ assertReaderContains(newBehaviorReader(test), oldSchema, oldRecord);
+ }
+
+ @Test
+ public void testThriftCompatListInList() throws Exception {
+ Path test = writeDirect(
+ "message ThriftCompatListInList {" +
+ " optional group listOfLists (LIST) {" +
+ " repeated group listOfLists_tuple (LIST) {" +
+ " repeated int32 listOfLists_tuple_tuple;" +
+ " }" +
+ " }" +
+ "}",
+ new DirectWriter() {
+ @Override
+ public void write(RecordConsumer rc) {
+ rc.startMessage();
+ rc.startField("locations", 0);
+
+ rc.startGroup();
+ rc.startField("listOfLists_tuple", 0); // start writing array contents
+
+ rc.startGroup();
+ rc.startField("listOfLists_tuple_tuple", 0); // start writing inner array contents
+
+ // write [34, 35, 36]
+ rc.addInteger(34);
+ rc.addInteger(35);
+ rc.addInteger(36);
+
+ rc.endField("listOfLists_tuple_tuple", 0); // finished writing inner array contents
+ rc.endGroup();
+
+ // write an empty list
+ rc.startGroup();
+ rc.endGroup();
+
+ rc.startGroup();
+ rc.startField("listOfLists_tuple_tuple", 0); // start writing inner array contents
+
+ // write [32, 33, 34]
+ rc.addInteger(32);
+ rc.addInteger(33);
+ rc.addInteger(34);
+
+ rc.endField("listOfLists_tuple_tuple", 0); // finished writing inner array contents
+ rc.endGroup();
+
+ rc.endField("listOfLists_tuple", 0); // finished writing array contents
+ rc.endGroup();
+
+ rc.endField("locations", 0);
+ rc.endMessage();
+ }
+ });
+
+ Schema listOfLists = array(array(primitive(Schema.Type.INT)));
+ Schema oldSchema = record("ThriftCompatListInList",
+ optionalField("listOfLists", listOfLists));
+
+ GenericRecord oldRecord = instance(oldSchema,
+ "listOfLists", Arrays.asList(
+ Arrays.asList(34, 35, 36),
+ Arrays.asList(),
+ Arrays.asList(32, 33, 34)));
+
+ // both should detect the "_tuple" names
+ assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);
+ assertReaderContains(newBehaviorReader(test), oldSchema, oldRecord);
+ }
+
+ @Test
public void testThriftCompatRequiredGroupInList() throws Exception {
Path test = writeDirect(
"message ThriftCompatRequiredGroupInList {" +
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/440882c6/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
index 6c802a6..b393615 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
@@ -394,6 +394,98 @@ public class TestAvroSchemaConverter {
}
@Test
+ public void testOldAvroListOfLists() throws Exception {
+ Schema listOfLists = optional(Schema.createArray(Schema.createArray(
+ Schema.create(Schema.Type.INT))));
+ Schema schema = Schema.createRecord("AvroCompatListInList", null, null, false);
+ schema.setFields(Lists.newArrayList(
+ new Schema.Field("listOfLists", listOfLists, null, NullNode.getInstance())
+ ));
+ System.err.println("Avro schema: " + schema.toString(true));
+
+ testRoundTripConversion(schema,
+ "message AvroCompatListInList {\n" +
+ " optional group listOfLists (LIST) {\n" +
+ " repeated group array (LIST) {\n" +
+ " repeated int32 array;\n" +
+ " }\n" +
+ " }\n" +
+ "}");
+ // Cannot use round-trip assertion because 3-level representation is used
+ testParquetToAvroConversion(NEW_BEHAVIOR, schema,
+ "message AvroCompatListInList {\n" +
+ " optional group listOfLists (LIST) {\n" +
+ " repeated group array (LIST) {\n" +
+ " repeated int32 array;\n" +
+ " }\n" +
+ " }\n" +
+ "}");
+ }
+
+ @Test
+ public void testOldThriftListOfLists() throws Exception {
+ Schema listOfLists = optional(Schema.createArray(Schema.createArray(
+ Schema.create(Schema.Type.INT))));
+ Schema schema = Schema.createRecord("ThriftCompatListInList", null, null, false);
+ schema.setFields(Lists.newArrayList(
+ new Schema.Field("listOfLists", listOfLists, null, NullNode.getInstance())
+ ));
+ System.err.println("Avro schema: " + schema.toString(true));
+
+ // Cannot use round-trip assertion because repeated group names differ
+ testParquetToAvroConversion(schema,
+ "message ThriftCompatListInList {\n" +
+ " optional group listOfLists (LIST) {\n" +
+ " repeated group listOfLists_tuple (LIST) {\n" +
+ " repeated int32 listOfLists_tuple_tuple;\n" +
+ " }\n" +
+ " }\n" +
+ "}");
+ // Cannot use round-trip assertion because 3-level representation is used
+ testParquetToAvroConversion(NEW_BEHAVIOR, schema,
+ "message ThriftCompatListInList {\n" +
+ " optional group listOfLists (LIST) {\n" +
+ " repeated group listOfLists_tuple (LIST) {\n" +
+ " repeated int32 listOfLists_tuple_tuple;\n" +
+ " }\n" +
+ " }\n" +
+ "}");
+ }
+
+ @Test
+ public void testUnknownTwoLevelListOfLists() throws Exception {
+ // This tests the case where we don't detect a 2-level list by the repeated
+ // group's name, but it must be 2-level because the repeated group doesn't
+ // contain an optional or repeated element as required for 3-level lists
+ Schema listOfLists = optional(Schema.createArray(Schema.createArray(
+ Schema.create(Schema.Type.INT))));
+ Schema schema = Schema.createRecord("UnknownTwoLevelListInList", null, null, false);
+ schema.setFields(Lists.newArrayList(
+ new Schema.Field("listOfLists", listOfLists, null, NullNode.getInstance())
+ ));
+ System.err.println("Avro schema: " + schema.toString(true));
+
+ // Cannot use round-trip assertion because repeated group names differ
+ testParquetToAvroConversion(schema,
+ "message UnknownTwoLevelListInList {\n" +
+ " optional group listOfLists (LIST) {\n" +
+ " repeated group mylist (LIST) {\n" +
+ " repeated int32 innerlist;\n" +
+ " }\n" +
+ " }\n" +
+ "}");
+ // Cannot use round-trip assertion because 3-level representation is used
+ testParquetToAvroConversion(NEW_BEHAVIOR, schema,
+ "message UnknownTwoLevelListInList {\n" +
+ " optional group listOfLists (LIST) {\n" +
+ " repeated group mylist (LIST) {\n" +
+ " repeated int32 innerlist;\n" +
+ " }\n" +
+ " }\n" +
+ "}");
+ }
+
+ @Test
public void testParquetMapWithoutMapKeyValueAnnotation() throws Exception {
Schema schema = Schema.createRecord("myrecord", null, null, false);
Schema map = Schema.createMap(Schema.create(Schema.Type.INT));