You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2015/03/07 02:06:54 UTC
incubator-parquet-mr git commit: PARQUET-193: Implement nested types
compatibility rules in Avro
Repository: incubator-parquet-mr
Updated Branches:
refs/heads/master 12ee6b442 -> 3fc28541f
PARQUET-193: Implement nested types compatibility rules in Avro
This depends on PARQUET-191 and PARQUET-192.
This replaces #83.
Author: Ryan Blue <bl...@apache.org>
Closes #128 from rdblue/PARQUET-193-implement-compatilibity-avro and squashes the following commits:
bd0491e [Ryan Blue] PARQUET-193: Implement nested types rules in Avro.
Project: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/commit/3fc28541
Tree: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/tree/3fc28541
Diff: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/diff/3fc28541
Branch: refs/heads/master
Commit: 3fc28541f001ce6e4a7afa91fec8d21bfeaa17db
Parents: 12ee6b4
Author: Ryan Blue <bl...@apache.org>
Authored: Fri Mar 6 17:06:34 2015 -0800
Committer: Ryan Blue <bl...@apache.org>
Committed: Fri Mar 6 17:06:34 2015 -0800
----------------------------------------------------------------------
.../avro/AvroIndexedRecordConverter.java | 137 ++-
.../main/java/parquet/avro/AvroReadSupport.java | 18 +-
.../java/parquet/avro/AvroSchemaConverter.java | 58 +-
.../java/parquet/avro/AvroWriteSupport.java | 19 +-
.../test/java/parquet/avro/AvroTestUtil.java | 69 ++
.../parquet/avro/TestArrayCompatibility.java | 999 +++++++++++++++++++
6 files changed, 1261 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/3fc28541/parquet-avro/src/main/java/parquet/avro/AvroIndexedRecordConverter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/parquet/avro/AvroIndexedRecordConverter.java b/parquet-avro/src/main/java/parquet/avro/AvroIndexedRecordConverter.java
index 85804ad..2f59c9b 100644
--- a/parquet-avro/src/main/java/parquet/avro/AvroIndexedRecordConverter.java
+++ b/parquet-avro/src/main/java/parquet/avro/AvroIndexedRecordConverter.java
@@ -141,9 +141,9 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter
} else if (schema.getType().equals(Schema.Type.ENUM)) {
return new FieldEnumConverter(parent, schema, model);
} else if (schema.getType().equals(Schema.Type.ARRAY)) {
- return new AvroArrayConverter(parent, type, schema, model);
+ return new AvroArrayConverter(parent, type.asGroupType(), schema, model);
} else if (schema.getType().equals(Schema.Type.MAP)) {
- return new MapConverter(parent, type, schema, model);
+ return new MapConverter(parent, type.asGroupType(), schema, model);
} else if (schema.getType().equals(Schema.Type.UNION)) {
return new AvroUnionConverter(parent, type, schema, model);
} else if (schema.getType().equals(Schema.Type.FIXED)) {
@@ -436,6 +436,21 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter
}
}
+ /**
+ * Converter for a list.
+ *
+ * <pre>
+ * optional group the_list (LIST) { <-- this layer
+ * repeated group array {
+ * optional (type) element;
+ * }
+ * }
+ * </pre>
+ *
+ * This class also implements LIST element backward-compatibility rules.
+ *
+ * @param <T> The type of elements in the list
+ */
static final class AvroArrayConverter<T> extends GroupConverter {
private final ParentValueContainer parent;
@@ -443,19 +458,27 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter
private final Converter converter;
private GenericArray<T> array;
- public AvroArrayConverter(ParentValueContainer parent, Type parquetSchema,
+ public AvroArrayConverter(ParentValueContainer parent, GroupType type,
Schema avroSchema, GenericData model) {
this.parent = parent;
this.avroSchema = avroSchema;
- Type elementType = parquetSchema.asGroupType().getType(0);
- Schema elementSchema = avroSchema.getElementType();
- converter = newConverter(elementSchema, elementType, model, new ParentValueContainer() {
- @Override
- @SuppressWarnings("unchecked")
- void add(Object value) {
- array.add((T) value);
- }
- });
+ Schema elementSchema = this.avroSchema.getElementType();
+ Type repeatedType = type.getType(0);
+ // always determine whether the repeated type is the element type by
+ // matching it against the element schema.
+ if (isElementType(repeatedType, elementSchema)) {
+ // the element type is the repeated type (and required)
+ converter = newConverter(elementSchema, repeatedType, model, new ParentValueContainer() {
+ @Override
+ @SuppressWarnings("unchecked")
+ void add(Object value) {
+ array.add((T) value);
+ }
+ });
+ } else {
+ // the element is wrapped in a synthetic group and may be optional
+ converter = new ElementConverter(repeatedType.asGroupType(), elementSchema, model);
+ }
}
@Override
@@ -472,6 +495,82 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter
public void end() {
parent.add(array);
}
+
+ /**
+ * Returns whether the given type is the element type of a list or is a
+ * synthetic group with one field that is the element type. This is
+ * determined by checking whether the type can be a synthetic group and by
+ * checking whether a potential synthetic group matches the expected schema.
+ * <p>
+ * Unlike {@link AvroSchemaConverter#isElementType(Type, String)}, this
+ * method never guesses because the expected schema is known.
+ *
+ * @param repeatedType a type that may be the element type
+ * @param elementSchema the expected Schema for list elements
+ * @return {@code true} if the repeatedType is the element schema
+ */
+ static boolean isElementType(Type repeatedType, Schema elementSchema) {
+ if (repeatedType.isPrimitive() ||
+ repeatedType.asGroupType().getFieldCount() > 1) {
+ // The repeated type must be the element type because it is an invalid
+ // synthetic wrapper (must be a group with one field).
+ return true;
+ } else if (elementSchema != null &&
+ elementSchema.getType() == Schema.Type.RECORD &&
+ elementSchema.getFields().size() == 1 &&
+ elementSchema.getFields().get(0).name().equals(
+ repeatedType.asGroupType().getFieldName(0))) {
+ // The repeated type must be the element type because it matches the
+ // structure of the Avro element's schema.
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Converter for list elements.
+ *
+ * <pre>
+ * optional group the_list (LIST) {
+ * repeated group array { <-- this layer
+ * optional (type) element;
+ * }
+ * }
+ * </pre>
+ */
+ final class ElementConverter extends GroupConverter {
+ private T element;
+ private final Converter elementConverter;
+
+ public ElementConverter(GroupType repeatedType, Schema elementSchema, GenericData model) {
+ Type elementType = repeatedType.getType(0);
+ Schema nonNullElementSchema = AvroSchemaConverter.getNonNull(elementSchema);
+ this.elementConverter = newConverter(nonNullElementSchema, elementType, model, new ParentValueContainer() {
+ @Override
+ @SuppressWarnings("unchecked")
+ void add(Object value) {
+ ElementConverter.this.element = (T) value;
+ }
+ });
+ }
+
+ @Override
+ public Converter getConverter(int fieldIndex) {
+ Preconditions.checkArgument(
+ fieldIndex == 0, "Illegal field index: " + fieldIndex);
+ return elementConverter;
+ }
+
+ @Override
+ public void start() {
+ element = null;
+ }
+
+ @Override
+ public void end() {
+ array.add(element);
+ }
+ }
}
static final class AvroUnionConverter<T> extends GroupConverter {
@@ -525,10 +624,12 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter
private final Converter keyValueConverter;
private Map<String, V> map;
- public MapConverter(ParentValueContainer parent, Type parquetSchema,
- Schema avroSchema, GenericData model) {
+ public MapConverter(ParentValueContainer parent, GroupType mapType,
+ Schema mapSchema, GenericData model) {
this.parent = parent;
- this.keyValueConverter = new MapKeyValueConverter(parquetSchema, avroSchema, model);
+ GroupType repeatedKeyValueType = mapType.getType(0).asGroupType();
+ this.keyValueConverter = new MapKeyValueConverter(
+ repeatedKeyValueType, mapSchema, model);
}
@Override
@@ -553,7 +654,7 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter
private final Converter keyConverter;
private final Converter valueConverter;
- public MapKeyValueConverter(Type parquetSchema, Schema avroSchema,
+ public MapKeyValueConverter(GroupType keyValueType, Schema mapSchema,
GenericData model) {
keyConverter = new PrimitiveConverter() {
@Override
@@ -562,8 +663,8 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter
}
};
- Type valueType = parquetSchema.asGroupType().getType(0).asGroupType().getType(1);
- Schema nonNullValueSchema = AvroSchemaConverter.getNonNull(avroSchema.getValueType());
+ Type valueType = keyValueType.getType(1);
+ Schema nonNullValueSchema = AvroSchemaConverter.getNonNull(mapSchema.getValueType());
valueConverter = newConverter(nonNullValueSchema, valueType, model, new ParentValueContainer() {
@Override
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/3fc28541/parquet-avro/src/main/java/parquet/avro/AvroReadSupport.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/parquet/avro/AvroReadSupport.java b/parquet-avro/src/main/java/parquet/avro/AvroReadSupport.java
index c82977e..eacd369 100644
--- a/parquet-avro/src/main/java/parquet/avro/AvroReadSupport.java
+++ b/parquet-avro/src/main/java/parquet/avro/AvroReadSupport.java
@@ -38,7 +38,7 @@ public class AvroReadSupport<T extends IndexedRecord> extends ReadSupport<T> {
public static String AVRO_REQUESTED_PROJECTION = "parquet.avro.projection";
private static final String AVRO_READ_SCHEMA = "parquet.avro.read.schema";
- static final String AVRO_SCHEMA_METADATA_KEY = "avro.schema";
+ static final String AVRO_SCHEMA_METADATA_KEY = "parquet.avro.schema";
private static final String AVRO_READ_SCHEMA_METADATA_KEY = "avro.read.schema";
public static String AVRO_DATA_SUPPLIER = "parquet.avro.data.supplier";
@@ -63,14 +63,16 @@ public class AvroReadSupport<T extends IndexedRecord> extends ReadSupport<T> {
}
@Override
- public ReadContext init(Configuration configuration, Map<String, String> keyValueMetaData, MessageType fileSchema) {
- MessageType schema = fileSchema;
+ public ReadContext init(Configuration configuration,
+ Map<String, String> keyValueMetaData,
+ MessageType fileSchema) {
+ MessageType projection = fileSchema;
Map<String, String> metadata = null;
String requestedProjectionString = configuration.get(AVRO_REQUESTED_PROJECTION);
if (requestedProjectionString != null) {
Schema avroRequestedProjection = new Schema.Parser().parse(requestedProjectionString);
- schema = new AvroSchemaConverter().convert(avroRequestedProjection);
+ projection = new AvroSchemaConverter(configuration).convert(avroRequestedProjection);
}
String avroReadSchema = configuration.get(AVRO_READ_SCHEMA);
if (avroReadSchema != null) {
@@ -79,11 +81,13 @@ public class AvroReadSupport<T extends IndexedRecord> extends ReadSupport<T> {
}
// use getSchemaForRead because it checks that the requested schema is a
// subset of the columns in the file schema
- return new ReadContext(getSchemaForRead(fileSchema, schema), metadata);
+ return new ReadContext(getSchemaForRead(fileSchema, projection), metadata);
}
@Override
- public RecordMaterializer<T> prepareForRead(Configuration configuration, Map<String, String> keyValueMetaData, MessageType fileSchema, ReadContext readContext) {
+ public RecordMaterializer<T> prepareForRead(
+ Configuration configuration, Map<String, String> keyValueMetaData,
+ MessageType fileSchema, ReadContext readContext) {
MessageType parquetSchema = readContext.getRequestedSchema();
Schema avroSchema;
if (readContext.getReadSupportMetadata() != null &&
@@ -95,7 +99,7 @@ public class AvroReadSupport<T extends IndexedRecord> extends ReadSupport<T> {
avroSchema = new Schema.Parser().parse(keyValueMetaData.get(AVRO_SCHEMA_METADATA_KEY));
} else {
// default to converting the Parquet schema into an Avro schema
- avroSchema = new AvroSchemaConverter().convert(parquetSchema);
+ avroSchema = new AvroSchemaConverter(configuration).convert(parquetSchema);
}
Class<? extends AvroDataSupplier> suppClass = configuration.getClass(AVRO_DATA_SUPPLIER,
SpecificDataSupplier.class,
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/3fc28541/parquet-avro/src/main/java/parquet/avro/AvroSchemaConverter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/parquet/avro/AvroSchemaConverter.java
index 986776c..80946f3 100644
--- a/parquet-avro/src/main/java/parquet/avro/AvroSchemaConverter.java
+++ b/parquet-avro/src/main/java/parquet/avro/AvroSchemaConverter.java
@@ -22,6 +22,7 @@ import java.util.*;
import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
import org.codehaus.jackson.node.NullNode;
import parquet.schema.ConversionPatterns;
import parquet.schema.GroupType;
@@ -42,6 +43,21 @@ import static parquet.schema.PrimitiveType.PrimitiveTypeName.*;
*/
public class AvroSchemaConverter {
+ static final String ADD_LIST_ELEMENT_RECORDS =
+ "parquet.avro.add-list-element-records";
+ private static final boolean ADD_LIST_ELEMENT_RECORDS_DEFAULT = true;
+
+ private final boolean assumeRepeatedIsListElement;
+
+ public AvroSchemaConverter() {
+ this.assumeRepeatedIsListElement = ADD_LIST_ELEMENT_RECORDS_DEFAULT;
+ }
+
+ public AvroSchemaConverter(Configuration conf) {
+ this.assumeRepeatedIsListElement = conf.getBoolean(
+ ADD_LIST_ELEMENT_RECORDS, ADD_LIST_ELEMENT_RECORDS_DEFAULT);
+ }
+
/**
* Given a schema, check to see if it is a union of a null type and a regular schema,
* and then return the non-null sub-schema. Otherwise, return the given schema.
@@ -67,7 +83,7 @@ public class AvroSchemaConverter {
return schema;
}
}
-
+
public MessageType convert(Schema avroSchema) {
if (!avroSchema.getType().equals(Schema.Type.RECORD)) {
throw new IllegalArgumentException("Avro schema must be a record.");
@@ -254,11 +270,22 @@ public class AvroSchemaConverter {
if (parquetGroupType.getFieldCount()!= 1) {
throw new UnsupportedOperationException("Invalid list type " + parquetGroupType);
}
- Type elementType = parquetGroupType.getType(0);
- if (!elementType.isRepetition(Type.Repetition.REPEATED)) {
+ Type repeatedType = parquetGroupType.getType(0);
+ if (!repeatedType.isRepetition(Type.Repetition.REPEATED)) {
throw new UnsupportedOperationException("Invalid list type " + parquetGroupType);
}
- return Schema.createArray(convertField(elementType));
+ if (isElementType(repeatedType, parquetGroupType.getName())) {
+ // repeated element types are always required
+ return Schema.createArray(convertField(repeatedType));
+ } else {
+ Type elementType = repeatedType.asGroupType().getType(0);
+ if (elementType.isRepetition(Type.Repetition.OPTIONAL)) {
+ return Schema.createArray(optional(convertField(elementType)));
+ } else {
+ return Schema.createArray(convertField(elementType));
+ }
+ }
+ case MAP_KEY_VALUE: // for backward-compatibility
case MAP:
if (parquetGroupType.getFieldCount() != 1 || parquetGroupType.getType(0).isPrimitive()) {
throw new UnsupportedOperationException("Invalid map type " + parquetGroupType);
@@ -284,7 +311,6 @@ public class AvroSchemaConverter {
}
case ENUM:
return Schema.create(Schema.Type.STRING);
- case MAP_KEY_VALUE:
case UTF8:
default:
throw new UnsupportedOperationException("Cannot convert Parquet type " +
@@ -298,6 +324,28 @@ public class AvroSchemaConverter {
}
}
+ /**
+ * Implements the rules for interpreting existing data from the logical type
+ * spec for the LIST annotation. This is used to produce the expected schema.
+ * <p>
+ * The AvroArrayConverter will decide whether the repeated type is the array
+ * element type by testing whether the element schema and repeated type are
+ * the same. This ensures that the LIST rules are followed when there is no
+ * schema and that a schema can be provided to override the default behavior.
+ */
+ private boolean isElementType(Type repeatedType, String parentName) {
+ return (
+ // can't be a synthetic layer because it would be invalid
+ repeatedType.isPrimitive() ||
+ repeatedType.asGroupType().getFieldCount() > 1 ||
+ // known patterns without the synthetic layer
+ repeatedType.getName().equals("array") ||
+ repeatedType.getName().equals(parentName + "_tuple") ||
+ // default assumption
+ assumeRepeatedIsListElement
+ );
+ }
+
private static Schema optional(Schema original) {
// null is first in the union because Parquet's default is always null
return Schema.createUnion(Arrays.asList(
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/3fc28541/parquet-avro/src/main/java/parquet/avro/AvroWriteSupport.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/parquet/avro/AvroWriteSupport.java b/parquet-avro/src/main/java/parquet/avro/AvroWriteSupport.java
index 529ca23..59320be 100644
--- a/parquet-avro/src/main/java/parquet/avro/AvroWriteSupport.java
+++ b/parquet-avro/src/main/java/parquet/avro/AvroWriteSupport.java
@@ -19,6 +19,7 @@
package parquet.avro;
import java.nio.ByteBuffer;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -42,7 +43,7 @@ import parquet.schema.Type;
*/
public class AvroWriteSupport extends WriteSupport<IndexedRecord> {
- private static final String AVRO_SCHEMA = "parquet.avro.schema";
+ static final String AVRO_SCHEMA = "parquet.avro.schema";
private static final Schema MAP_KEY_SCHEMA = Schema.create(Schema.Type.STRING);
private RecordConsumer recordConsumer;
@@ -116,11 +117,11 @@ public class AvroWriteSupport extends WriteSupport<IndexedRecord> {
index++;
}
}
-
+
private <T> void writeArray(GroupType schema, Schema avroSchema,
- Iterable<T> array) {
+ Collection<T> array) {
recordConsumer.startGroup(); // group wrapper (original type LIST)
- if (array.iterator().hasNext()) {
+ if (array.size() > 0) {
recordConsumer.startField("array", 0);
for (T elt : array) {
writeValue(schema.getType(0), avroSchema.getElementType(), elt);
@@ -141,7 +142,7 @@ public class AvroWriteSupport extends WriteSupport<IndexedRecord> {
recordConsumer.startField("map", 0);
for (Map.Entry<CharSequence, V> entry : map.entrySet()) {
- recordConsumer.startGroup(); // "repeated" group wrapper
+ recordConsumer.startGroup(); // repeated group key_value, middle layer
recordConsumer.startField("key", 0);
writeValue(keyType, MAP_KEY_SCHEMA, entry.getKey());
recordConsumer.endField("key", 0);
@@ -165,7 +166,7 @@ public class AvroWriteSupport extends WriteSupport<IndexedRecord> {
Object value) {
recordConsumer.startGroup();
- // ResolveUnion will tell us which of the union member types to
+ // ResolveUnion will tell us which of the union member types to
// deserialise.
int avroIndex = GenericData.get().resolveUnion(avroSchema, value);
@@ -178,11 +179,11 @@ public class AvroWriteSupport extends WriteSupport<IndexedRecord> {
}
}
- // Sparsely populated method of encoding unions, each member has its own
+ // Sparsely populated method of encoding unions, each member has its own
// set of columns.
String memberName = "member" + parquetIndex;
recordConsumer.startField(memberName, parquetIndex);
- writeValue(parquetGroup.getType(parquetIndex),
+ writeValue(parquetGroup.getType(parquetIndex),
avroSchema.getTypes().get(avroIndex), value);
recordConsumer.endField(memberName, parquetIndex);
@@ -212,7 +213,7 @@ public class AvroWriteSupport extends WriteSupport<IndexedRecord> {
} else if (avroType.equals(Schema.Type.ENUM)) {
recordConsumer.addBinary(Binary.fromString(value.toString()));
} else if (avroType.equals(Schema.Type.ARRAY)) {
- writeArray((GroupType) type, nonNullAvroSchema, (Iterable<?>) value);
+ writeArray((GroupType) type, nonNullAvroSchema, (Collection<?>) value);
} else if (avroType.equals(Schema.Type.MAP)) {
writeMap((GroupType) type, nonNullAvroSchema, (Map<CharSequence, ?>) value);
} else if (avroType.equals(Schema.Type.UNION)) {
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/3fc28541/parquet-avro/src/test/java/parquet/avro/AvroTestUtil.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/parquet/avro/AvroTestUtil.java b/parquet-avro/src/test/java/parquet/avro/AvroTestUtil.java
new file mode 100644
index 0000000..aba5ef3
--- /dev/null
+++ b/parquet-avro/src/test/java/parquet/avro/AvroTestUtil.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package parquet.avro;
+
+import com.google.common.collect.Lists;
+import java.util.Arrays;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.codehaus.jackson.node.NullNode;
+
+public class AvroTestUtil {
+
+ public static Schema record(String name, Schema.Field... fields) {
+ Schema record = Schema.createRecord(name, null, null, false);
+ record.setFields(Arrays.asList(fields));
+ return record;
+ }
+
+ public static Schema.Field field(String name, Schema schema) {
+ return new Schema.Field(name, schema, null, null);
+ }
+
+ public static Schema.Field optionalField(String name, Schema schema) {
+ return new Schema.Field(name, optional(schema), null, NullNode.getInstance());
+ }
+
+ public static Schema array(Schema element) {
+ return Schema.createArray(element);
+ }
+
+ public static Schema primitive(Schema.Type type) {
+ return Schema.create(type);
+ }
+
+ public static Schema optional(Schema original) {
+ return Schema.createUnion(Lists.newArrayList(
+ Schema.create(Schema.Type.NULL),
+ original));
+ }
+
+ public static GenericRecord instance(Schema schema, Object... pairs) {
+ if ((pairs.length % 2) != 0) {
+ throw new RuntimeException("Not enough values");
+ }
+ GenericRecord record = new GenericData.Record(schema);
+ for (int i = 0; i < pairs.length; i += 2) {
+ record.put(pairs[i].toString(), pairs[i + 1]);
+ }
+ return record;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/3fc28541/parquet-avro/src/test/java/parquet/avro/TestArrayCompatibility.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/parquet/avro/TestArrayCompatibility.java b/parquet-avro/src/test/java/parquet/avro/TestArrayCompatibility.java
new file mode 100644
index 0000000..62beed2
--- /dev/null
+++ b/parquet-avro/src/test/java/parquet/avro/TestArrayCompatibility.java
@@ -0,0 +1,999 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package parquet.avro;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import parquet.hadoop.ParquetWriter;
+import parquet.hadoop.api.WriteSupport;
+import parquet.io.api.RecordConsumer;
+import parquet.schema.MessageType;
+import parquet.schema.MessageTypeParser;
+
+import static parquet.avro.AvroTestUtil.array;
+import static parquet.avro.AvroTestUtil.field;
+import static parquet.avro.AvroTestUtil.instance;
+import static parquet.avro.AvroTestUtil.optional;
+import static parquet.avro.AvroTestUtil.optionalField;
+import static parquet.avro.AvroTestUtil.primitive;
+import static parquet.avro.AvroTestUtil.record;
+
+public class TestArrayCompatibility {
+
+ @Rule
+ public final TemporaryFolder tempDir = new TemporaryFolder();
+
+ public static final Configuration NEW_BEHAVIOR_CONF = new Configuration();
+
+ @BeforeClass
+ public static void setupNewBehaviorConfiguration() {
+ NEW_BEHAVIOR_CONF.setBoolean(
+ AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, false);
+ }
+
+ @Test
+ @Ignore(value="Not yet supported")
+ public void testUnannotatedListOfPrimitives() throws Exception {
+ Path test = writeDirect(
+ "message UnannotatedListOfPrimitives {" +
+ " repeated int32 list_of_ints;" +
+ "}",
+ new DirectWriter() {
+ @Override
+ public void write(RecordConsumer rc) {
+ rc.startMessage();
+ rc.startField("list_of_ints", 0);
+
+ rc.addInteger(34);
+ rc.addInteger(35);
+ rc.addInteger(36);
+
+ rc.endField("list_of_ints", 0);
+ rc.endMessage();
+ }
+ });
+
+ Schema expectedSchema = record("OldPrimitiveInList",
+ field("list_of_ints", array(primitive(Schema.Type.INT))));
+
+ GenericRecord expectedRecord = instance(expectedSchema,
+ "list_of_ints", Arrays.asList(34, 35, 36));
+
+ // both should behave the same way
+ assertReaderContains(oldBehaviorReader(test), expectedSchema, expectedRecord);
+ assertReaderContains(newBehaviorReader(test), expectedSchema, expectedRecord);
+ }
+
+ @Test
+ @Ignore(value="Not yet supported")
+ public void testUnannotatedListOfGroups() throws Exception {
+ Path test = writeDirect(
+ "message UnannotatedListOfGroups {" +
+ " repeated group list_of_points {" +
+ " required float x;" +
+ " required float y;" +
+ " }" +
+ "}",
+ new DirectWriter() {
+ @Override
+ public void write(RecordConsumer rc) {
+ rc.startMessage();
+ rc.startField("list_of_points", 0);
+
+ rc.startGroup();
+ rc.startField("x", 0);
+ rc.addFloat(1.0f);
+ rc.endField("x", 0);
+ rc.startField("y", 1);
+ rc.addFloat(1.0f);
+ rc.endField("y", 1);
+ rc.endGroup();
+
+ rc.startGroup();
+ rc.startField("x", 0);
+ rc.addFloat(2.0f);
+ rc.endField("x", 0);
+ rc.startField("y", 1);
+ rc.addFloat(2.0f);
+ rc.endField("y", 1);
+ rc.endGroup();
+
+ rc.endField("list_of_points", 0);
+ rc.endMessage();
+ }
+ });
+
+ Schema point = record("?",
+ field("x", primitive(Schema.Type.FLOAT)),
+ field("y", primitive(Schema.Type.FLOAT)));
+ Schema expectedSchema = record("OldPrimitiveInList",
+ field("list_of_points", array(point)));
+
+ GenericRecord expectedRecord = instance(expectedSchema,
+ "list_of_points", Arrays.asList(
+ instance(point, "x", 1.0f, "y", 1.0f),
+ instance(point, "x", 2.0f, "y", 2.0f)));
+
+ // both should behave the same way
+ assertReaderContains(oldBehaviorReader(test), expectedSchema, expectedRecord);
+ assertReaderContains(newBehaviorReader(test), expectedSchema, expectedRecord);
+ }
+
+ @Test
+ public void testRepeatedPrimitiveInList() throws Exception {
+ Path test = writeDirect(
+ "message RepeatedPrimitiveInList {" +
+ " required group list_of_ints (LIST) {" +
+ " repeated int32 array;" +
+ " }" +
+ "}",
+ new DirectWriter() {
+ @Override
+ public void write(RecordConsumer rc) {
+ rc.startMessage();
+ rc.startField("list_of_ints", 0);
+
+ rc.startGroup();
+ rc.startField("array", 0);
+
+ rc.addInteger(34);
+ rc.addInteger(35);
+ rc.addInteger(36);
+
+ rc.endField("array", 0);
+ rc.endGroup();
+
+ rc.endField("list_of_ints", 0);
+ rc.endMessage();
+ }
+ });
+
+ Schema expectedSchema = record("RepeatedPrimitiveInList",
+ field("list_of_ints", array(Schema.create(Schema.Type.INT))));
+
+ GenericRecord expectedRecord = instance(expectedSchema,
+ "list_of_ints", Arrays.asList(34, 35, 36));
+
+ // both should behave the same way
+ assertReaderContains(oldBehaviorReader(test), expectedSchema, expectedRecord);
+ assertReaderContains(newBehaviorReader(test), expectedSchema, expectedRecord);
+ }
+
+ @Test
+ public void testMultiFieldGroupInList() throws Exception {
+ // tests the missing element layer, detected by a multi-field group
+ Path test = writeDirect(
+ "message MultiFieldGroupInList {" +
+ " optional group locations (LIST) {" +
+ " repeated group element {" +
+ " required double latitude;" +
+ " required double longitude;" +
+ " }" +
+ " }" +
+ "}",
+ new DirectWriter() {
+ @Override
+ public void write(RecordConsumer rc) {
+ rc.startMessage();
+ rc.startField("locations", 0);
+
+ rc.startGroup();
+ rc.startField("element", 0);
+
+ rc.startGroup();
+ rc.startField("latitude", 0);
+ rc.addDouble(0.0);
+ rc.endField("latitude", 0);
+ rc.startField("longitude", 1);
+ rc.addDouble(0.0);
+ rc.endField("longitude", 1);
+ rc.endGroup();
+
+ rc.startGroup();
+ rc.startField("latitude", 0);
+ rc.addDouble(0.0);
+ rc.endField("latitude", 0);
+ rc.startField("longitude", 1);
+ rc.addDouble(180.0);
+ rc.endField("longitude", 1);
+ rc.endGroup();
+
+ rc.endField("element", 0);
+ rc.endGroup();
+
+ rc.endField("locations", 0);
+ rc.endMessage();
+ }
+ });
+
+ Schema location = record("element",
+ field("latitude", primitive(Schema.Type.DOUBLE)),
+ field("longitude", primitive(Schema.Type.DOUBLE)));
+ Schema expectedSchema = record("MultiFieldGroupInList",
+ optionalField("locations", array(location)));
+
+ GenericRecord expectedRecord = instance(expectedSchema,
+ "locations", Arrays.asList(
+ instance(location, "latitude", 0.0, "longitude", 0.0),
+ instance(location, "latitude", 0.0, "longitude", 180.0)));
+
+ // both should behave the same way
+ assertReaderContains(oldBehaviorReader(test), expectedSchema, expectedRecord);
+ assertReaderContains(newBehaviorReader(test), expectedSchema, expectedRecord);
+ }
+
+ @Test
+ public void testSingleFieldGroupInList() throws Exception {
+ // this tests the case where non-avro older data has an ambiguous list
+ Path test = writeDirect(
+ "message SingleFieldGroupInList {" +
+ " optional group single_element_groups (LIST) {" +
+ " repeated group single_element_group {" +
+ " required int64 count;" +
+ " }" +
+ " }" +
+ "}",
+ new DirectWriter() {
+ @Override
+ public void write(RecordConsumer rc) {
+ rc.startMessage();
+ rc.startField("single_element_groups", 0);
+
+ rc.startGroup();
+ rc.startField("single_element_group", 0); // start writing array contents
+
+ rc.startGroup();
+ rc.startField("count", 0);
+ rc.addLong(1234L);
+ rc.endField("count", 0);
+ rc.endGroup();
+
+ rc.startGroup();
+ rc.startField("count", 0);
+ rc.addLong(2345L);
+ rc.endField("count", 0);
+ rc.endGroup();
+
+ rc.endField("single_element_group", 0); // finished writing array contents
+ rc.endGroup();
+
+ rc.endField("single_element_groups", 0);
+ rc.endMessage();
+ }
+ });
+
+ // can't tell from storage whether this should be a list of single-field
+ // records or if the single_field_group layer is synthetic.
+
+ // old behavior - assume that the repeated type is the element type
+ Schema singleElementGroupSchema = record("single_element_group",
+ field("count", primitive(Schema.Type.LONG)));
+ Schema oldSchema = record("SingleFieldGroupInList",
+ optionalField("single_element_groups", array(singleElementGroupSchema)));
+ GenericRecord oldRecord = instance(oldSchema,
+ "single_element_groups", Arrays.asList(
+ instance(singleElementGroupSchema, "count", 1234L),
+ instance(singleElementGroupSchema, "count", 2345L)));
+
+ assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);
+
+ // new behavior - assume that single_element_group is synthetic (in spec)
+ Schema newSchema = record("SingleFieldGroupInList",
+ optionalField("single_element_groups", array(primitive(Schema.Type.LONG))));
+ GenericRecord newRecord = instance(newSchema,
+ "single_element_groups", Arrays.asList(1234L, 2345L));
+
+ assertReaderContains(newBehaviorReader(test), newSchema, newRecord);
+ }
+
+ @Test
+ public void testSingleFieldGroupInListWithSchema() throws Exception {
+ // this tests the case where older data has an ambiguous structure, but the
+ // correct interpretation can be determined from the avro schema
+
+ Schema singleElementRecord = record("single_element_group",
+ field("count", primitive(Schema.Type.LONG)));
+
+ Schema expectedSchema = record("SingleFieldGroupInList",
+ optionalField("single_element_groups",
+ array(singleElementRecord)));
+
+ Map<String, String> metadata = new HashMap<String, String>();
+ metadata.put(AvroWriteSupport.AVRO_SCHEMA, expectedSchema.toString());
+
+ Path test = writeDirect(
+ "message SingleFieldGroupInList {" +
+ " optional group single_element_groups (LIST) {" +
+ " repeated group single_element_group {" +
+ " required int64 count;" +
+ " }" +
+ " }" +
+ "}",
+ new DirectWriter() {
+ @Override
+ public void write(RecordConsumer rc) {
+ rc.startMessage();
+ rc.startField("single_element_groups", 0);
+
+ rc.startGroup();
+ rc.startField("single_element_group", 0); // start writing array contents
+
+ rc.startGroup();
+ rc.startField("count", 0);
+ rc.addLong(1234L);
+ rc.endField("count", 0);
+ rc.endGroup();
+
+ rc.startGroup();
+ rc.startField("count", 0);
+ rc.addLong(2345L);
+ rc.endField("count", 0);
+ rc.endGroup();
+
+ rc.endField("single_element_group", 0); // finished writing array contents
+ rc.endGroup();
+
+ rc.endField("single_element_groups", 0);
+ rc.endMessage();
+ }
+ },
+ metadata);
+
+ GenericRecord expectedRecord = instance(expectedSchema,
+ "single_element_groups", Arrays.asList(
+ instance(singleElementRecord, "count", 1234L),
+ instance(singleElementRecord, "count", 2345L)));
+
+ // both should behave the same way because the schema is present
+ assertReaderContains(oldBehaviorReader(test), expectedSchema, expectedRecord);
+ assertReaderContains(newBehaviorReader(test), expectedSchema, expectedRecord);
+ }
+
+ @Test
+ public void testNewOptionalGroupInList() throws Exception {
+ Path test = writeDirect(
+ "message NewOptionalGroupInList {" +
+ " optional group locations (LIST) {" +
+ " repeated group list {" +
+ " optional group element {" +
+ " required double latitude;" +
+ " required double longitude;" +
+ " }" +
+ " }" +
+ " }" +
+ "}",
+ new DirectWriter() {
+ @Override
+ public void write(RecordConsumer rc) {
+ rc.startMessage();
+ rc.startField("locations", 0);
+
+ rc.startGroup();
+ rc.startField("list", 0); // start writing array contents
+
+ // write a non-null element
+ rc.startGroup(); // array level
+ rc.startField("element", 0);
+
+ rc.startGroup();
+ rc.startField("latitude", 0);
+ rc.addDouble(0.0);
+ rc.endField("latitude", 0);
+ rc.startField("longitude", 1);
+ rc.addDouble(0.0);
+ rc.endField("longitude", 1);
+ rc.endGroup();
+
+ rc.endField("element", 0);
+ rc.endGroup(); // array level
+
+ // write a null element (element field is omitted)
+ rc.startGroup(); // array level
+ rc.endGroup(); // array level
+
+ // write a second non-null element
+ rc.startGroup(); // array level
+ rc.startField("element", 0);
+
+ rc.startGroup();
+ rc.startField("latitude", 0);
+ rc.addDouble(0.0);
+ rc.endField("latitude", 0);
+ rc.startField("longitude", 1);
+ rc.addDouble(180.0);
+ rc.endField("longitude", 1);
+ rc.endGroup();
+
+ rc.endField("element", 0);
+ rc.endGroup(); // array level
+
+ rc.endField("list", 0); // finished writing array contents
+ rc.endGroup();
+
+ rc.endField("locations", 0);
+ rc.endMessage();
+ }
+ });
+
+ Schema location = record("element",
+ field("latitude", primitive(Schema.Type.DOUBLE)),
+ field("longitude", primitive(Schema.Type.DOUBLE)));
+
+ // old behavior - assume that the repeated type is the element type
+ Schema elementRecord = record("list", optionalField("element", location));
+ Schema oldSchema = record("NewOptionalGroupInList",
+ optionalField("locations", array(elementRecord)));
+ GenericRecord oldRecord = instance(oldSchema,
+ "locations", Arrays.asList(
+ instance(elementRecord, "element",
+ instance(location, "latitude", 0.0, "longitude", 0.0)),
+ instance(elementRecord),
+ instance(elementRecord, "element",
+ instance(location, "latitude", 0.0, "longitude", 180.0))));
+
+ assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);
+
+ // new behavior - assume that single_element_group is synthetic (in spec)
+ Schema newSchema = record("NewOptionalGroupInList",
+ optionalField("locations", array(optional(location))));
+ GenericRecord newRecord = instance(newSchema,
+ "locations", Arrays.asList(
+ instance(location, "latitude", 0.0, "longitude", 0.0),
+ null,
+ instance(location, "latitude", 0.0, "longitude", 180.0)));
+
+ assertReaderContains(newBehaviorReader(test), newSchema, newRecord);
+ }
+
+ @Test
+ public void testNewRequiredGroupInList() throws Exception {
+ Path test = writeDirect(
+ "message NewRequiredGroupInList {" +
+ " optional group locations (LIST) {" +
+ " repeated group list {" +
+ " required group element {" +
+ " required double latitude;" +
+ " required double longitude;" +
+ " }" +
+ " }" +
+ " }" +
+ "}",
+ new DirectWriter() {
+ @Override
+ public void write(RecordConsumer rc) {
+ rc.startMessage();
+ rc.startField("locations", 0);
+
+ rc.startGroup();
+ rc.startField("list", 0); // start writing array contents
+
+ // write a non-null element
+ rc.startGroup(); // array level
+ rc.startField("element", 0);
+
+ rc.startGroup();
+ rc.startField("latitude", 0);
+ rc.addDouble(0.0);
+ rc.endField("latitude", 0);
+ rc.startField("longitude", 1);
+ rc.addDouble(180.0);
+ rc.endField("longitude", 1);
+ rc.endGroup();
+
+ rc.endField("element", 0);
+ rc.endGroup(); // array level
+
+ // write a second non-null element
+ rc.startGroup(); // array level
+ rc.startField("element", 0);
+
+ rc.startGroup();
+ rc.startField("latitude", 0);
+ rc.addDouble(0.0);
+ rc.endField("latitude", 0);
+ rc.startField("longitude", 1);
+ rc.addDouble(0.0);
+ rc.endField("longitude", 1);
+ rc.endGroup();
+
+ rc.endField("element", 0);
+ rc.endGroup(); // array level
+
+ rc.endField("list", 0); // finished writing array contents
+ rc.endGroup();
+
+ rc.endField("locations", 0);
+ rc.endMessage();
+ }
+ });
+
+ Schema location = record("element",
+ field("latitude", primitive(Schema.Type.DOUBLE)),
+ field("longitude", primitive(Schema.Type.DOUBLE)));
+
+ // old behavior - assume that the repeated type is the element type
+ Schema elementRecord = record("list", field("element", location));
+ Schema oldSchema = record("NewRequiredGroupInList",
+ optionalField("locations", array(elementRecord)));
+ GenericRecord oldRecord = instance(oldSchema,
+ "locations", Arrays.asList(
+ instance(elementRecord, "element",
+ instance(location, "latitude", 0.0, "longitude", 180.0)),
+ instance(elementRecord, "element",
+ instance(location, "latitude", 0.0, "longitude", 0.0))));
+
+ assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);
+
+ // new behavior - assume that single_element_group is synthetic (in spec)
+ Schema newSchema = record("NewRequiredGroupInList",
+ optionalField("locations", array(location)));
+ GenericRecord newRecord = instance(newSchema,
+ "locations", Arrays.asList(
+ instance(location, "latitude", 0.0, "longitude", 180.0),
+ instance(location, "latitude", 0.0, "longitude", 0.0)));
+
+ assertReaderContains(newBehaviorReader(test), newSchema, newRecord);
+ }
+
+ @Test
+ public void testAvroCompatRequiredGroupInList() throws Exception {
+ Path test = writeDirect(
+ "message AvroCompatRequiredGroupInList {" +
+ " optional group locations (LIST) {" +
+ " repeated group array {" +
+ " optional group element {" +
+ " required double latitude;" +
+ " required double longitude;" +
+ " }" +
+ " }" +
+ " }" +
+ "}",
+ new DirectWriter() {
+ @Override
+ public void write(RecordConsumer rc) {
+ rc.startMessage();
+ rc.startField("locations", 0);
+
+ rc.startGroup();
+ rc.startField("array", 0); // start writing array contents
+
+ // write a non-null element
+ rc.startGroup(); // array level
+ rc.startField("element", 0);
+
+ rc.startGroup();
+ rc.startField("latitude", 0);
+ rc.addDouble(0.0);
+ rc.endField("latitude", 0);
+ rc.startField("longitude", 1);
+ rc.addDouble(180.0);
+ rc.endField("longitude", 1);
+ rc.endGroup();
+
+ rc.endField("element", 0);
+ rc.endGroup(); // array level
+
+ // write a second non-null element
+ rc.startGroup(); // array level
+ rc.startField("element", 0);
+
+ rc.startGroup();
+ rc.startField("latitude", 0);
+ rc.addDouble(0.0);
+ rc.endField("latitude", 0);
+ rc.startField("longitude", 1);
+ rc.addDouble(0.0);
+ rc.endField("longitude", 1);
+ rc.endGroup();
+
+ rc.endField("element", 0);
+ rc.endGroup(); // array level
+
+ rc.endField("array", 0); // finished writing array contents
+ rc.endGroup();
+
+ rc.endField("locations", 0);
+ rc.endMessage();
+ }
+ });
+
+ Schema location = record("element",
+ field("latitude", primitive(Schema.Type.DOUBLE)),
+ field("longitude", primitive(Schema.Type.DOUBLE)));
+
+ // old behavior - assume that the repeated type is the element type
+ Schema elementRecord = record("array", optionalField("element", location));
+ Schema oldSchema = record("AvroCompatRequiredGroupInList",
+ optionalField("locations", array(elementRecord)));
+ GenericRecord oldRecord = instance(oldSchema,
+ "locations", Arrays.asList(
+ instance(elementRecord, "element",
+ instance(location, "latitude", 0.0, "longitude", 180.0)),
+ instance(elementRecord, "element",
+ instance(location, "latitude", 0.0, "longitude", 0.0))));
+
+ // both should detect the "array" name
+ assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);
+ assertReaderContains(newBehaviorReader(test), oldSchema, oldRecord);
+ }
+
+ @Test
+ public void testAvroCompatRequiredGroupInListWithSchema() throws Exception {
+ Path test = writeDirect(
+ "message AvroCompatRequiredGroupInListWithSchema {" +
+ " optional group locations (LIST) {" +
+ " repeated group array {" +
+ " optional group element {" +
+ " required double latitude;" +
+ " required double longitude;" +
+ " }" +
+ " }" +
+ " }" +
+ "}",
+ new DirectWriter() {
+ @Override
+ public void write(RecordConsumer rc) {
+ rc.startMessage();
+ rc.startField("locations", 0);
+
+ rc.startGroup();
+ rc.startField("array", 0); // start writing array contents
+
+ // write a non-null element
+ rc.startGroup(); // array level
+ rc.startField("element", 0);
+
+ rc.startGroup();
+ rc.startField("latitude", 0);
+ rc.addDouble(0.0);
+ rc.endField("latitude", 0);
+ rc.startField("longitude", 1);
+ rc.addDouble(180.0);
+ rc.endField("longitude", 1);
+ rc.endGroup();
+
+ rc.endField("element", 0);
+ rc.endGroup(); // array level
+
+ // write a second non-null element
+ rc.startGroup(); // array level
+ rc.startField("element", 0);
+
+ rc.startGroup();
+ rc.startField("latitude", 0);
+ rc.addDouble(0.0);
+ rc.endField("latitude", 0);
+ rc.startField("longitude", 1);
+ rc.addDouble(0.0);
+ rc.endField("longitude", 1);
+ rc.endGroup();
+
+ rc.endField("element", 0);
+ rc.endGroup(); // array level
+
+ rc.endField("array", 0); // finished writing array contents
+ rc.endGroup();
+
+ rc.endField("locations", 0);
+ rc.endMessage();
+ }
+ });
+
+ Schema location = record("element",
+ field("latitude", primitive(Schema.Type.DOUBLE)),
+ field("longitude", primitive(Schema.Type.DOUBLE)));
+
+ Schema newSchema = record("HiveCompatOptionalGroupInList",
+ optionalField("locations", array(optional(location))));
+ GenericRecord newRecord = instance(newSchema,
+ "locations", Arrays.asList(
+ instance(location, "latitude", 0.0, "longitude", 180.0),
+ instance(location, "latitude", 0.0, "longitude", 0.0)));
+
+ Configuration oldConfWithSchema = new Configuration();
+ AvroReadSupport.setAvroReadSchema(oldConfWithSchema, newSchema);
+
+ // both should use the schema structure that is provided
+ assertReaderContains(
+ new AvroParquetReader<GenericRecord>(oldConfWithSchema, test),
+ newSchema, newRecord);
+
+ Configuration newConfWithSchema = new Configuration(NEW_BEHAVIOR_CONF);
+ AvroReadSupport.setAvroReadSchema(newConfWithSchema, newSchema);
+
+ assertReaderContains(
+ new AvroParquetReader<GenericRecord>(newConfWithSchema, test),
+ newSchema, newRecord);
+ }
+
+ @Test
+ public void testThriftCompatRequiredGroupInList() throws Exception {
+ Path test = writeDirect(
+ "message ThriftCompatRequiredGroupInList {" +
+ " optional group locations (LIST) {" +
+ " repeated group locations_tuple {" +
+ " optional group element {" +
+ " required double latitude;" +
+ " required double longitude;" +
+ " }" +
+ " }" +
+ " }" +
+ "}",
+ new DirectWriter() {
+ @Override
+ public void write(RecordConsumer rc) {
+ rc.startMessage();
+ rc.startField("locations", 0);
+
+ rc.startGroup();
+ rc.startField("locations_tuple", 0); // start writing array contents
+
+ // write a non-null element
+ rc.startGroup(); // array level
+ rc.startField("element", 0);
+
+ rc.startGroup();
+ rc.startField("latitude", 0);
+ rc.addDouble(0.0);
+ rc.endField("latitude", 0);
+ rc.startField("longitude", 1);
+ rc.addDouble(180.0);
+ rc.endField("longitude", 1);
+ rc.endGroup();
+
+ rc.endField("element", 0);
+ rc.endGroup(); // array level
+
+ // write a second non-null element
+ rc.startGroup(); // array level
+ rc.startField("element", 0);
+
+ rc.startGroup();
+ rc.startField("latitude", 0);
+ rc.addDouble(0.0);
+ rc.endField("latitude", 0);
+ rc.startField("longitude", 1);
+ rc.addDouble(0.0);
+ rc.endField("longitude", 1);
+ rc.endGroup();
+
+ rc.endField("element", 0);
+ rc.endGroup(); // array level
+
+ rc.endField("locations_tuple", 0); // finished writing array contents
+ rc.endGroup();
+
+ rc.endField("locations", 0);
+ rc.endMessage();
+ }
+ });
+
+ Schema location = record("element",
+ field("latitude", primitive(Schema.Type.DOUBLE)),
+ field("longitude", primitive(Schema.Type.DOUBLE)));
+
+ // old behavior - assume that the repeated type is the element type
+ Schema elementRecord = record("locations_tuple", optionalField("element", location));
+ Schema oldSchema = record("ThriftCompatRequiredGroupInList",
+ optionalField("locations", array(elementRecord)));
+ GenericRecord oldRecord = instance(oldSchema,
+ "locations", Arrays.asList(
+ instance(elementRecord, "element",
+ instance(location, "latitude", 0.0, "longitude", 180.0)),
+ instance(elementRecord, "element",
+ instance(location, "latitude", 0.0, "longitude", 0.0))));
+
+ // both should detect the "array" name
+ assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);
+ assertReaderContains(newBehaviorReader(test), oldSchema, oldRecord);
+ }
+
+ @Test
+ public void testHiveCompatOptionalGroupInList() throws Exception {
+ Path test = writeDirect(
+ "message HiveCompatOptionalGroupInList {" +
+ " optional group locations (LIST) {" +
+ " repeated group bag {" +
+ " optional group element {" +
+ " required double latitude;" +
+ " required double longitude;" +
+ " }" +
+ " }" +
+ " }" +
+ "}",
+ new DirectWriter() {
+ @Override
+ public void write(RecordConsumer rc) {
+ rc.startMessage();
+ rc.startField("locations", 0);
+
+ rc.startGroup();
+ rc.startField("bag", 0); // start writing array contents
+
+ // write a non-null element
+ rc.startGroup(); // array level
+ rc.startField("element", 0);
+
+ rc.startGroup();
+ rc.startField("latitude", 0);
+ rc.addDouble(0.0);
+ rc.endField("latitude", 0);
+ rc.startField("longitude", 1);
+ rc.addDouble(180.0);
+ rc.endField("longitude", 1);
+ rc.endGroup();
+
+ rc.endField("element", 0);
+ rc.endGroup(); // array level
+
+ // write a second non-null element
+ rc.startGroup(); // array level
+ rc.startField("element", 0);
+
+ rc.startGroup();
+ rc.startField("latitude", 0);
+ rc.addDouble(0.0);
+ rc.endField("latitude", 0);
+ rc.startField("longitude", 1);
+ rc.addDouble(0.0);
+ rc.endField("longitude", 1);
+ rc.endGroup();
+
+ rc.endField("element", 0);
+ rc.endGroup(); // array level
+
+ rc.endField("bag", 0); // finished writing array contents
+ rc.endGroup();
+
+ rc.endField("locations", 0);
+ rc.endMessage();
+ }
+ });
+
+ Schema location = record("element",
+ field("latitude", primitive(Schema.Type.DOUBLE)),
+ field("longitude", primitive(Schema.Type.DOUBLE)));
+
+ // old behavior - assume that the repeated type is the element type
+ Schema elementRecord = record("bag", optionalField("element", location));
+ Schema oldSchema = record("HiveCompatOptionalGroupInList",
+ optionalField("locations", array(elementRecord)));
+ GenericRecord oldRecord = instance(oldSchema,
+ "locations", Arrays.asList(
+ instance(elementRecord, "element",
+ instance(location, "latitude", 0.0, "longitude", 180.0)),
+ instance(elementRecord, "element",
+ instance(location, "latitude", 0.0, "longitude", 0.0))));
+
+ // both should detect the "array" name
+ assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);
+
+ Schema newSchema = record("HiveCompatOptionalGroupInList",
+ optionalField("locations", array(optional(location))));
+ GenericRecord newRecord = instance(newSchema,
+ "locations", Arrays.asList(
+ instance(location, "latitude", 0.0, "longitude", 180.0),
+ instance(location, "latitude", 0.0, "longitude", 0.0)));
+
+ assertReaderContains(newBehaviorReader(test), newSchema, newRecord);
+ }
+
+ private interface DirectWriter {
+ public void write(RecordConsumer consumer);
+ }
+
+ private static class DirectWriteSupport extends WriteSupport<Void> {
+ private RecordConsumer recordConsumer;
+ private final MessageType type;
+ private final DirectWriter writer;
+ private final Map<String, String> metadata;
+
+ private DirectWriteSupport(MessageType type, DirectWriter writer,
+ Map<String, String> metadata) {
+ this.type = type;
+ this.writer = writer;
+ this.metadata = metadata;
+ }
+
+ @Override
+ public WriteContext init(Configuration configuration) {
+ return new WriteContext(type, metadata);
+ }
+
+ @Override
+ public void prepareForWrite(RecordConsumer recordConsumer) {
+ this.recordConsumer = recordConsumer;
+ }
+
+ @Override
+ public void write(Void record) {
+ writer.write(recordConsumer);
+ }
+ }
+
+ private Path writeDirect(String type, DirectWriter writer) throws IOException {
+ return writeDirect(MessageTypeParser.parseMessageType(type), writer);
+ }
+
+ private Path writeDirect(String type, DirectWriter writer,
+ Map<String, String> metadata) throws IOException {
+ return writeDirect(MessageTypeParser.parseMessageType(type), writer, metadata);
+ }
+
+ private Path writeDirect(MessageType type, DirectWriter writer) throws IOException {
+ return writeDirect(type, writer, new HashMap<String, String>());
+ }
+
+ private Path writeDirect(MessageType type, DirectWriter writer,
+ Map<String, String> metadata) throws IOException {
+ File temp = tempDir.newFile(UUID.randomUUID().toString());
+ temp.deleteOnExit();
+ temp.delete();
+
+ Path path = new Path(temp.getPath());
+
+ ParquetWriter<Void> parquetWriter = new ParquetWriter<Void>(
+ path, new DirectWriteSupport(type, writer, metadata));
+ parquetWriter.write(null);
+ parquetWriter.close();
+
+ return path;
+ }
+
+ public <T extends IndexedRecord> AvroParquetReader<T> oldBehaviorReader(
+ Path path) throws IOException {
+ return new AvroParquetReader<T>(path);
+ }
+
+ public <T extends IndexedRecord> AvroParquetReader<T> newBehaviorReader(
+ Path path) throws IOException {
+ return new AvroParquetReader<T>(NEW_BEHAVIOR_CONF, path);
+ }
+
+ public <T extends IndexedRecord> void assertReaderContains(
+ AvroParquetReader<T> reader, Schema expectedSchema, T... expectedRecords)
+ throws IOException {
+ for (T expectedRecord : expectedRecords) {
+ T actualRecord = reader.read();
+ Assert.assertEquals("Should match expected schema",
+ expectedSchema, actualRecord.getSchema());
+ Assert.assertEquals("Should match the expected record",
+ expectedRecord, actualRecord);
+ }
+ Assert.assertNull("Should only contain " + expectedRecords.length +
+ " record" + (expectedRecords.length == 1 ? "" : "s"),
+ reader.read());
+ }
+
+}