You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2015/03/07 02:06:54 UTC

incubator-parquet-mr git commit: PARQUET-193: Implement nested types compatibility rules in Avro

Repository: incubator-parquet-mr
Updated Branches:
  refs/heads/master 12ee6b442 -> 3fc28541f


PARQUET-193: Implement nested types compatibility rules in Avro

This depends on PARQUET-191 and PARQUET-192.

This replaces #83.

Author: Ryan Blue <bl...@apache.org>

Closes #128 from rdblue/PARQUET-193-implement-compatilibity-avro and squashes the following commits:

bd0491e [Ryan Blue] PARQUET-193: Implement nested types rules in Avro.


Project: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/commit/3fc28541
Tree: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/tree/3fc28541
Diff: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/diff/3fc28541

Branch: refs/heads/master
Commit: 3fc28541f001ce6e4a7afa91fec8d21bfeaa17db
Parents: 12ee6b4
Author: Ryan Blue <bl...@apache.org>
Authored: Fri Mar 6 17:06:34 2015 -0800
Committer: Ryan Blue <bl...@apache.org>
Committed: Fri Mar 6 17:06:34 2015 -0800

----------------------------------------------------------------------
 .../avro/AvroIndexedRecordConverter.java        | 137 ++-
 .../main/java/parquet/avro/AvroReadSupport.java |  18 +-
 .../java/parquet/avro/AvroSchemaConverter.java  |  58 +-
 .../java/parquet/avro/AvroWriteSupport.java     |  19 +-
 .../test/java/parquet/avro/AvroTestUtil.java    |  69 ++
 .../parquet/avro/TestArrayCompatibility.java    | 999 +++++++++++++++++++
 6 files changed, 1261 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/3fc28541/parquet-avro/src/main/java/parquet/avro/AvroIndexedRecordConverter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/parquet/avro/AvroIndexedRecordConverter.java b/parquet-avro/src/main/java/parquet/avro/AvroIndexedRecordConverter.java
index 85804ad..2f59c9b 100644
--- a/parquet-avro/src/main/java/parquet/avro/AvroIndexedRecordConverter.java
+++ b/parquet-avro/src/main/java/parquet/avro/AvroIndexedRecordConverter.java
@@ -141,9 +141,9 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter
     } else if (schema.getType().equals(Schema.Type.ENUM)) {
       return new FieldEnumConverter(parent, schema, model);
     } else if (schema.getType().equals(Schema.Type.ARRAY)) {
-      return new AvroArrayConverter(parent, type, schema, model);
+      return new AvroArrayConverter(parent, type.asGroupType(), schema, model);
     } else if (schema.getType().equals(Schema.Type.MAP)) {
-      return new MapConverter(parent, type, schema, model);
+      return new MapConverter(parent, type.asGroupType(), schema, model);
     } else if (schema.getType().equals(Schema.Type.UNION)) {
       return new AvroUnionConverter(parent, type, schema, model);
     } else if (schema.getType().equals(Schema.Type.FIXED)) {
@@ -436,6 +436,21 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter
     }
   }
 
+  /**
+   * Converter for a list.
+   *
+   * <pre>
+   *   optional group the_list (LIST) { <-- this layer
+   *     repeated group array {
+   *       optional (type) element;
+   *     }
+   *   }
+   * </pre>
+   *
+   * This class also implements LIST element backward-compatibility rules.
+   *
+   * @param <T> The type of elements in the list
+   */
   static final class AvroArrayConverter<T> extends GroupConverter {
 
     private final ParentValueContainer parent;
@@ -443,19 +458,27 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter
     private final Converter converter;
     private GenericArray<T> array;
 
-    public AvroArrayConverter(ParentValueContainer parent, Type parquetSchema,
+    public AvroArrayConverter(ParentValueContainer parent, GroupType type,
         Schema avroSchema, GenericData model) {
       this.parent = parent;
       this.avroSchema = avroSchema;
-      Type elementType = parquetSchema.asGroupType().getType(0);
-      Schema elementSchema = avroSchema.getElementType();
-      converter = newConverter(elementSchema, elementType, model, new ParentValueContainer() {
-        @Override
-        @SuppressWarnings("unchecked")
-        void add(Object value) {
-          array.add((T) value);
-        }
-      });
+      Schema elementSchema = this.avroSchema.getElementType();
+      Type repeatedType = type.getType(0);
+      // always determine whether the repeated type is the element type by
+      // matching it against the element schema.
+      if (isElementType(repeatedType, elementSchema)) {
+        // the element type is the repeated type (and required)
+        converter = newConverter(elementSchema, repeatedType, model, new ParentValueContainer() {
+          @Override
+          @SuppressWarnings("unchecked")
+          void add(Object value) {
+            array.add((T) value);
+          }
+        });
+      } else {
+        // the element is wrapped in a synthetic group and may be optional
+        converter = new ElementConverter(repeatedType.asGroupType(), elementSchema, model);
+      }
     }
 
     @Override
@@ -472,6 +495,82 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter
     public void end() {
       parent.add(array);
     }
+
+    /**
+     * Returns whether the given type is the element type of a list or is a
+     * synthetic group with one field that is the element type. This is
+     * determined by checking whether the type can be a synthetic group and by
+     * checking whether a potential synthetic group matches the expected schema.
+     * <p>
+     * Unlike {@link AvroSchemaConverter#isElementType(Type, String)}, this
+     * method never guesses because the expected schema is known.
+     *
+     * @param repeatedType a type that may be the element type
+     * @param elementSchema the expected Schema for list elements
+     * @return {@code true} if the repeatedType is the element schema
+     */
+    static boolean isElementType(Type repeatedType, Schema elementSchema) {
+      if (repeatedType.isPrimitive() ||
+          repeatedType.asGroupType().getFieldCount() > 1) {
+        // The repeated type must be the element type because it is an invalid
+        // synthetic wrapper (must be a group with one field).
+        return true;
+      } else if (elementSchema != null &&
+          elementSchema.getType() == Schema.Type.RECORD &&
+          elementSchema.getFields().size() == 1 &&
+          elementSchema.getFields().get(0).name().equals(
+              repeatedType.asGroupType().getFieldName(0))) {
+        // The repeated type must be the element type because it matches the
+        // structure of the Avro element's schema.
+        return true;
+      }
+      return false;
+    }
+
+    /**
+     * Converter for list elements.
+     *
+     * <pre>
+     *   optional group the_list (LIST) {
+     *     repeated group array { <-- this layer
+     *       optional (type) element;
+     *     }
+     *   }
+     * </pre>
+     */
+    final class ElementConverter extends GroupConverter {
+      private T element;
+      private final Converter elementConverter;
+
+      public ElementConverter(GroupType repeatedType, Schema elementSchema, GenericData model) {
+        Type elementType = repeatedType.getType(0);
+        Schema nonNullElementSchema = AvroSchemaConverter.getNonNull(elementSchema);
+        this.elementConverter = newConverter(nonNullElementSchema, elementType, model, new ParentValueContainer() {
+          @Override
+          @SuppressWarnings("unchecked")
+          void add(Object value) {
+            ElementConverter.this.element = (T) value;
+          }
+        });
+      }
+
+      @Override
+      public Converter getConverter(int fieldIndex) {
+        Preconditions.checkArgument(
+            fieldIndex == 0, "Illegal field index: " + fieldIndex);
+        return elementConverter;
+      }
+
+      @Override
+      public void start() {
+        element = null;
+      }
+
+      @Override
+      public void end() {
+        array.add(element);
+      }
+    }
   }
 
   static final class AvroUnionConverter<T> extends GroupConverter {
@@ -525,10 +624,12 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter
     private final Converter keyValueConverter;
     private Map<String, V> map;
 
-    public MapConverter(ParentValueContainer parent, Type parquetSchema,
-        Schema avroSchema, GenericData model) {
+    public MapConverter(ParentValueContainer parent, GroupType mapType,
+        Schema mapSchema, GenericData model) {
       this.parent = parent;
-      this.keyValueConverter = new MapKeyValueConverter(parquetSchema, avroSchema, model);
+      GroupType repeatedKeyValueType = mapType.getType(0).asGroupType();
+      this.keyValueConverter = new MapKeyValueConverter(
+          repeatedKeyValueType, mapSchema, model);
     }
 
     @Override
@@ -553,7 +654,7 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter
       private final Converter keyConverter;
       private final Converter valueConverter;
 
-      public MapKeyValueConverter(Type parquetSchema, Schema avroSchema,
+      public MapKeyValueConverter(GroupType keyValueType, Schema mapSchema,
           GenericData model) {
         keyConverter = new PrimitiveConverter() {
           @Override
@@ -562,8 +663,8 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter
           }
         };
 
-        Type valueType = parquetSchema.asGroupType().getType(0).asGroupType().getType(1);
-        Schema nonNullValueSchema = AvroSchemaConverter.getNonNull(avroSchema.getValueType());
+        Type valueType = keyValueType.getType(1);
+        Schema nonNullValueSchema = AvroSchemaConverter.getNonNull(mapSchema.getValueType());
         valueConverter = newConverter(nonNullValueSchema, valueType, model, new ParentValueContainer() {
           @Override
           @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/3fc28541/parquet-avro/src/main/java/parquet/avro/AvroReadSupport.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/parquet/avro/AvroReadSupport.java b/parquet-avro/src/main/java/parquet/avro/AvroReadSupport.java
index c82977e..eacd369 100644
--- a/parquet-avro/src/main/java/parquet/avro/AvroReadSupport.java
+++ b/parquet-avro/src/main/java/parquet/avro/AvroReadSupport.java
@@ -38,7 +38,7 @@ public class AvroReadSupport<T extends IndexedRecord> extends ReadSupport<T> {
   public static String AVRO_REQUESTED_PROJECTION = "parquet.avro.projection";
   private static final String AVRO_READ_SCHEMA = "parquet.avro.read.schema";
 
-  static final String AVRO_SCHEMA_METADATA_KEY = "avro.schema";
+  static final String AVRO_SCHEMA_METADATA_KEY = "parquet.avro.schema";
   private static final String AVRO_READ_SCHEMA_METADATA_KEY = "avro.read.schema";
 
   public static String AVRO_DATA_SUPPLIER = "parquet.avro.data.supplier";
@@ -63,14 +63,16 @@ public class AvroReadSupport<T extends IndexedRecord> extends ReadSupport<T> {
   }
 
   @Override
-  public ReadContext init(Configuration configuration, Map<String, String> keyValueMetaData, MessageType fileSchema) {
-    MessageType schema = fileSchema;
+  public ReadContext init(Configuration configuration,
+                          Map<String, String> keyValueMetaData,
+                          MessageType fileSchema) {
+    MessageType projection = fileSchema;
     Map<String, String> metadata = null;
 
     String requestedProjectionString = configuration.get(AVRO_REQUESTED_PROJECTION);
     if (requestedProjectionString != null) {
       Schema avroRequestedProjection = new Schema.Parser().parse(requestedProjectionString);
-      schema = new AvroSchemaConverter().convert(avroRequestedProjection);
+      projection = new AvroSchemaConverter(configuration).convert(avroRequestedProjection);
     }
     String avroReadSchema = configuration.get(AVRO_READ_SCHEMA);
     if (avroReadSchema != null) {
@@ -79,11 +81,13 @@ public class AvroReadSupport<T extends IndexedRecord> extends ReadSupport<T> {
     }
     // use getSchemaForRead because it checks that the requested schema is a
     // subset of the columns in the file schema
-    return new ReadContext(getSchemaForRead(fileSchema, schema), metadata);
+    return new ReadContext(getSchemaForRead(fileSchema, projection), metadata);
   }
 
   @Override
-  public RecordMaterializer<T> prepareForRead(Configuration configuration, Map<String, String> keyValueMetaData, MessageType fileSchema, ReadContext readContext) {
+  public RecordMaterializer<T> prepareForRead(
+      Configuration configuration, Map<String, String> keyValueMetaData,
+      MessageType fileSchema, ReadContext readContext) {
     MessageType parquetSchema = readContext.getRequestedSchema();
     Schema avroSchema;
     if (readContext.getReadSupportMetadata() != null &&
@@ -95,7 +99,7 @@ public class AvroReadSupport<T extends IndexedRecord> extends ReadSupport<T> {
       avroSchema = new Schema.Parser().parse(keyValueMetaData.get(AVRO_SCHEMA_METADATA_KEY));
     } else {
       // default to converting the Parquet schema into an Avro schema
-      avroSchema = new AvroSchemaConverter().convert(parquetSchema);
+      avroSchema = new AvroSchemaConverter(configuration).convert(parquetSchema);
     }
     Class<? extends AvroDataSupplier> suppClass = configuration.getClass(AVRO_DATA_SUPPLIER,
         SpecificDataSupplier.class,

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/3fc28541/parquet-avro/src/main/java/parquet/avro/AvroSchemaConverter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/parquet/avro/AvroSchemaConverter.java
index 986776c..80946f3 100644
--- a/parquet-avro/src/main/java/parquet/avro/AvroSchemaConverter.java
+++ b/parquet-avro/src/main/java/parquet/avro/AvroSchemaConverter.java
@@ -22,6 +22,7 @@ import java.util.*;
 
 import org.apache.avro.Schema;
 
+import org.apache.hadoop.conf.Configuration;
 import org.codehaus.jackson.node.NullNode;
 import parquet.schema.ConversionPatterns;
 import parquet.schema.GroupType;
@@ -42,6 +43,21 @@ import static parquet.schema.PrimitiveType.PrimitiveTypeName.*;
  */
 public class AvroSchemaConverter {
 
+  static final String ADD_LIST_ELEMENT_RECORDS =
+      "parquet.avro.add-list-element-records";
+  private static final boolean ADD_LIST_ELEMENT_RECORDS_DEFAULT = true;
+
+  private final boolean assumeRepeatedIsListElement;
+
+  public AvroSchemaConverter() {
+    this.assumeRepeatedIsListElement = ADD_LIST_ELEMENT_RECORDS_DEFAULT;
+  }
+
+  public AvroSchemaConverter(Configuration conf) {
+    this.assumeRepeatedIsListElement = conf.getBoolean(
+        ADD_LIST_ELEMENT_RECORDS, ADD_LIST_ELEMENT_RECORDS_DEFAULT);
+  }
+
   /**
    * Given a schema, check to see if it is a union of a null type and a regular schema,
    * and then return the non-null sub-schema. Otherwise, return the given schema.
@@ -67,7 +83,7 @@ public class AvroSchemaConverter {
       return schema;
     }
   }
-  
+
   public MessageType convert(Schema avroSchema) {
     if (!avroSchema.getType().equals(Schema.Type.RECORD)) {
       throw new IllegalArgumentException("Avro schema must be a record.");
@@ -254,11 +270,22 @@ public class AvroSchemaConverter {
             if (parquetGroupType.getFieldCount()!= 1) {
               throw new UnsupportedOperationException("Invalid list type " + parquetGroupType);
             }
-            Type elementType = parquetGroupType.getType(0);
-            if (!elementType.isRepetition(Type.Repetition.REPEATED)) {
+            Type repeatedType = parquetGroupType.getType(0);
+            if (!repeatedType.isRepetition(Type.Repetition.REPEATED)) {
               throw new UnsupportedOperationException("Invalid list type " + parquetGroupType);
             }
-            return Schema.createArray(convertField(elementType));
+            if (isElementType(repeatedType, parquetGroupType.getName())) {
+              // repeated element types are always required
+              return Schema.createArray(convertField(repeatedType));
+            } else {
+              Type elementType = repeatedType.asGroupType().getType(0);
+              if (elementType.isRepetition(Type.Repetition.OPTIONAL)) {
+                return Schema.createArray(optional(convertField(elementType)));
+              } else {
+                return Schema.createArray(convertField(elementType));
+              }
+            }
+          case MAP_KEY_VALUE: // for backward-compatibility
           case MAP:
             if (parquetGroupType.getFieldCount() != 1 || parquetGroupType.getType(0).isPrimitive()) {
               throw new UnsupportedOperationException("Invalid map type " + parquetGroupType);
@@ -284,7 +311,6 @@ public class AvroSchemaConverter {
             }
           case ENUM:
             return Schema.create(Schema.Type.STRING);
-          case MAP_KEY_VALUE:
           case UTF8:
           default:
             throw new UnsupportedOperationException("Cannot convert Parquet type " +
@@ -298,6 +324,28 @@ public class AvroSchemaConverter {
     }
   }
 
+  /**
+   * Implements the rules for interpreting existing data from the logical type
+   * spec for the LIST annotation. This is used to produce the expected schema.
+   * <p>
+   * The AvroArrayConverter will decide whether the repeated type is the array
+   * element type by testing whether the element schema and repeated type are
+   * the same. This ensures that the LIST rules are followed when there is no
+   * schema and that a schema can be provided to override the default behavior.
+   */
+  private boolean isElementType(Type repeatedType, String parentName) {
+    return (
+        // can't be a synthetic layer because it would be invalid
+        repeatedType.isPrimitive() ||
+        repeatedType.asGroupType().getFieldCount() > 1 ||
+        // known patterns without the synthetic layer
+        repeatedType.getName().equals("array") ||
+        repeatedType.getName().equals(parentName + "_tuple") ||
+        // default assumption
+        assumeRepeatedIsListElement
+    );
+  }
+
   private static Schema optional(Schema original) {
     // null is first in the union because Parquet's default is always null
     return Schema.createUnion(Arrays.asList(

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/3fc28541/parquet-avro/src/main/java/parquet/avro/AvroWriteSupport.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/parquet/avro/AvroWriteSupport.java b/parquet-avro/src/main/java/parquet/avro/AvroWriteSupport.java
index 529ca23..59320be 100644
--- a/parquet-avro/src/main/java/parquet/avro/AvroWriteSupport.java
+++ b/parquet-avro/src/main/java/parquet/avro/AvroWriteSupport.java
@@ -19,6 +19,7 @@
 package parquet.avro;
 
 import java.nio.ByteBuffer;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -42,7 +43,7 @@ import parquet.schema.Type;
  */
 public class AvroWriteSupport extends WriteSupport<IndexedRecord> {
 
-  private static final String AVRO_SCHEMA = "parquet.avro.schema";
+  static final String AVRO_SCHEMA = "parquet.avro.schema";
   private static final Schema MAP_KEY_SCHEMA = Schema.create(Schema.Type.STRING);
 
   private RecordConsumer recordConsumer;
@@ -116,11 +117,11 @@ public class AvroWriteSupport extends WriteSupport<IndexedRecord> {
       index++;
     }
   }
-  
+
   private <T> void writeArray(GroupType schema, Schema avroSchema,
-                              Iterable<T> array) {
+                              Collection<T> array) {
     recordConsumer.startGroup(); // group wrapper (original type LIST)
-    if (array.iterator().hasNext()) {
+    if (array.size() > 0) {
       recordConsumer.startField("array", 0);
       for (T elt : array) {
         writeValue(schema.getType(0), avroSchema.getElementType(), elt);
@@ -141,7 +142,7 @@ public class AvroWriteSupport extends WriteSupport<IndexedRecord> {
       recordConsumer.startField("map", 0);
 
       for (Map.Entry<CharSequence, V> entry : map.entrySet()) {
-        recordConsumer.startGroup(); // "repeated" group wrapper
+        recordConsumer.startGroup(); // repeated group key_value, middle layer
         recordConsumer.startField("key", 0);
         writeValue(keyType, MAP_KEY_SCHEMA, entry.getKey());
         recordConsumer.endField("key", 0);
@@ -165,7 +166,7 @@ public class AvroWriteSupport extends WriteSupport<IndexedRecord> {
                           Object value) {
     recordConsumer.startGroup();
 
-    // ResolveUnion will tell us which of the union member types to 
+    // ResolveUnion will tell us which of the union member types to
     // deserialise.
     int avroIndex = GenericData.get().resolveUnion(avroSchema, value);
 
@@ -178,11 +179,11 @@ public class AvroWriteSupport extends WriteSupport<IndexedRecord> {
       }
     }
 
-    // Sparsely populated method of encoding unions, each member has its own 
+    // Sparsely populated method of encoding unions, each member has its own
     // set of columns.
     String memberName = "member" + parquetIndex;
     recordConsumer.startField(memberName, parquetIndex);
-    writeValue(parquetGroup.getType(parquetIndex), 
+    writeValue(parquetGroup.getType(parquetIndex),
                avroSchema.getTypes().get(avroIndex), value);
     recordConsumer.endField(memberName, parquetIndex);
 
@@ -212,7 +213,7 @@ public class AvroWriteSupport extends WriteSupport<IndexedRecord> {
     } else if (avroType.equals(Schema.Type.ENUM)) {
       recordConsumer.addBinary(Binary.fromString(value.toString()));
     } else if (avroType.equals(Schema.Type.ARRAY)) {
-      writeArray((GroupType) type, nonNullAvroSchema, (Iterable<?>) value);
+      writeArray((GroupType) type, nonNullAvroSchema, (Collection<?>) value);
     } else if (avroType.equals(Schema.Type.MAP)) {
       writeMap((GroupType) type, nonNullAvroSchema, (Map<CharSequence, ?>) value);
     } else if (avroType.equals(Schema.Type.UNION)) {

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/3fc28541/parquet-avro/src/test/java/parquet/avro/AvroTestUtil.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/parquet/avro/AvroTestUtil.java b/parquet-avro/src/test/java/parquet/avro/AvroTestUtil.java
new file mode 100644
index 0000000..aba5ef3
--- /dev/null
+++ b/parquet-avro/src/test/java/parquet/avro/AvroTestUtil.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package parquet.avro;
+
+import com.google.common.collect.Lists;
+import java.util.Arrays;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.codehaus.jackson.node.NullNode;
+
+public class AvroTestUtil {
+
+  public static Schema record(String name, Schema.Field... fields) {
+    Schema record = Schema.createRecord(name, null, null, false);
+    record.setFields(Arrays.asList(fields));
+    return record;
+  }
+
+  public static Schema.Field field(String name, Schema schema) {
+    return new Schema.Field(name, schema, null, null);
+  }
+
+  public static Schema.Field optionalField(String name, Schema schema) {
+    return new Schema.Field(name, optional(schema), null, NullNode.getInstance());
+  }
+
+  public static Schema array(Schema element) {
+    return Schema.createArray(element);
+  }
+
+  public static Schema primitive(Schema.Type type) {
+    return Schema.create(type);
+  }
+
+  public static Schema optional(Schema original) {
+    return Schema.createUnion(Lists.newArrayList(
+        Schema.create(Schema.Type.NULL),
+        original));
+  }
+
+  public static GenericRecord instance(Schema schema, Object... pairs) {
+    if ((pairs.length % 2) != 0) {
+      throw new RuntimeException("Not enough values");
+    }
+    GenericRecord record = new GenericData.Record(schema);
+    for (int i = 0; i < pairs.length; i += 2) {
+      record.put(pairs[i].toString(), pairs[i + 1]);
+    }
+    return record;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/3fc28541/parquet-avro/src/test/java/parquet/avro/TestArrayCompatibility.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/parquet/avro/TestArrayCompatibility.java b/parquet-avro/src/test/java/parquet/avro/TestArrayCompatibility.java
new file mode 100644
index 0000000..62beed2
--- /dev/null
+++ b/parquet-avro/src/test/java/parquet/avro/TestArrayCompatibility.java
@@ -0,0 +1,999 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package parquet.avro;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import parquet.hadoop.ParquetWriter;
+import parquet.hadoop.api.WriteSupport;
+import parquet.io.api.RecordConsumer;
+import parquet.schema.MessageType;
+import parquet.schema.MessageTypeParser;
+
+import static parquet.avro.AvroTestUtil.array;
+import static parquet.avro.AvroTestUtil.field;
+import static parquet.avro.AvroTestUtil.instance;
+import static parquet.avro.AvroTestUtil.optional;
+import static parquet.avro.AvroTestUtil.optionalField;
+import static parquet.avro.AvroTestUtil.primitive;
+import static parquet.avro.AvroTestUtil.record;
+
+public class TestArrayCompatibility {
+
+  @Rule
+  public final TemporaryFolder tempDir = new TemporaryFolder();
+
+  public static final Configuration NEW_BEHAVIOR_CONF = new Configuration();
+
+  @BeforeClass
+  public static void setupNewBehaviorConfiguration() {
+    NEW_BEHAVIOR_CONF.setBoolean(
+        AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, false);
+  }
+
+  @Test
+  @Ignore(value="Not yet supported")
+  public void testUnannotatedListOfPrimitives() throws Exception {
+    Path test = writeDirect(
+        "message UnannotatedListOfPrimitives {" +
+            "  repeated int32 list_of_ints;" +
+            "}",
+        new DirectWriter() {
+          @Override
+          public void write(RecordConsumer rc) {
+            rc.startMessage();
+            rc.startField("list_of_ints", 0);
+
+            rc.addInteger(34);
+            rc.addInteger(35);
+            rc.addInteger(36);
+
+            rc.endField("list_of_ints", 0);
+            rc.endMessage();
+          }
+        });
+
+    Schema expectedSchema = record("OldPrimitiveInList",
+        field("list_of_ints", array(primitive(Schema.Type.INT))));
+
+    GenericRecord expectedRecord = instance(expectedSchema,
+        "list_of_ints", Arrays.asList(34, 35, 36));
+
+    // both should behave the same way
+    assertReaderContains(oldBehaviorReader(test), expectedSchema, expectedRecord);
+    assertReaderContains(newBehaviorReader(test), expectedSchema, expectedRecord);
+  }
+
+  @Test
+  @Ignore(value="Not yet supported")
+  public void testUnannotatedListOfGroups() throws Exception {
+    Path test = writeDirect(
+        "message UnannotatedListOfGroups {" +
+            "  repeated group list_of_points {" +
+            "    required float x;" +
+            "    required float y;" +
+            "  }" +
+            "}",
+        new DirectWriter() {
+          @Override
+          public void write(RecordConsumer rc) {
+            rc.startMessage();
+            rc.startField("list_of_points", 0);
+
+            rc.startGroup();
+            rc.startField("x", 0);
+            rc.addFloat(1.0f);
+            rc.endField("x", 0);
+            rc.startField("y", 1);
+            rc.addFloat(1.0f);
+            rc.endField("y", 1);
+            rc.endGroup();
+
+            rc.startGroup();
+            rc.startField("x", 0);
+            rc.addFloat(2.0f);
+            rc.endField("x", 0);
+            rc.startField("y", 1);
+            rc.addFloat(2.0f);
+            rc.endField("y", 1);
+            rc.endGroup();
+
+            rc.endField("list_of_points", 0);
+            rc.endMessage();
+          }
+        });
+
+    Schema point = record("?",
+        field("x", primitive(Schema.Type.FLOAT)),
+        field("y", primitive(Schema.Type.FLOAT)));
+    Schema expectedSchema = record("OldPrimitiveInList",
+        field("list_of_points", array(point)));
+
+    GenericRecord expectedRecord = instance(expectedSchema,
+        "list_of_points", Arrays.asList(
+            instance(point, "x", 1.0f, "y", 1.0f),
+            instance(point, "x", 2.0f, "y", 2.0f)));
+
+    // both should behave the same way
+    assertReaderContains(oldBehaviorReader(test), expectedSchema, expectedRecord);
+    assertReaderContains(newBehaviorReader(test), expectedSchema, expectedRecord);
+  }
+
+  @Test
+  public void testRepeatedPrimitiveInList() throws Exception {
+    Path test = writeDirect(
+        "message RepeatedPrimitiveInList {" +
+            "  required group list_of_ints (LIST) {" +
+            "    repeated int32 array;" +
+            "  }" +
+            "}",
+        new DirectWriter() {
+          @Override
+          public void write(RecordConsumer rc) {
+            rc.startMessage();
+            rc.startField("list_of_ints", 0);
+
+            rc.startGroup();
+            rc.startField("array", 0);
+
+            rc.addInteger(34);
+            rc.addInteger(35);
+            rc.addInteger(36);
+
+            rc.endField("array", 0);
+            rc.endGroup();
+
+            rc.endField("list_of_ints", 0);
+            rc.endMessage();
+          }
+        });
+
+    Schema expectedSchema = record("RepeatedPrimitiveInList",
+        field("list_of_ints", array(Schema.create(Schema.Type.INT))));
+
+    GenericRecord expectedRecord = instance(expectedSchema,
+        "list_of_ints", Arrays.asList(34, 35, 36));
+
+    // both should behave the same way
+    assertReaderContains(oldBehaviorReader(test), expectedSchema, expectedRecord);
+    assertReaderContains(newBehaviorReader(test), expectedSchema, expectedRecord);
+  }
+
+  @Test
+  public void testMultiFieldGroupInList() throws Exception {
+    // tests the missing element layer, detected by a multi-field group
+    Path test = writeDirect(
+        "message MultiFieldGroupInList {" +
+            "  optional group locations (LIST) {" +
+            "    repeated group element {" +
+            "      required double latitude;" +
+            "      required double longitude;" +
+            "    }" +
+            "  }" +
+            "}",
+        new DirectWriter() {
+          @Override
+          public void write(RecordConsumer rc) {
+            rc.startMessage();
+            rc.startField("locations", 0);
+
+            rc.startGroup();
+            rc.startField("element", 0);
+
+            rc.startGroup();
+            rc.startField("latitude", 0);
+            rc.addDouble(0.0);
+            rc.endField("latitude", 0);
+            rc.startField("longitude", 1);
+            rc.addDouble(0.0);
+            rc.endField("longitude", 1);
+            rc.endGroup();
+
+            rc.startGroup();
+            rc.startField("latitude", 0);
+            rc.addDouble(0.0);
+            rc.endField("latitude", 0);
+            rc.startField("longitude", 1);
+            rc.addDouble(180.0);
+            rc.endField("longitude", 1);
+            rc.endGroup();
+
+            rc.endField("element", 0);
+            rc.endGroup();
+
+            rc.endField("locations", 0);
+            rc.endMessage();
+          }
+        });
+
+    Schema location = record("element",
+        field("latitude", primitive(Schema.Type.DOUBLE)),
+        field("longitude", primitive(Schema.Type.DOUBLE)));
+    Schema expectedSchema = record("MultiFieldGroupInList",
+        optionalField("locations", array(location)));
+
+    GenericRecord expectedRecord = instance(expectedSchema,
+        "locations", Arrays.asList(
+            instance(location, "latitude", 0.0, "longitude", 0.0),
+            instance(location, "latitude", 0.0, "longitude", 180.0)));
+
+    // both should behave the same way
+    assertReaderContains(oldBehaviorReader(test), expectedSchema, expectedRecord);
+    assertReaderContains(newBehaviorReader(test), expectedSchema, expectedRecord);
+  }
+
+  @Test
+  public void testSingleFieldGroupInList() throws Exception {
+    // this tests the case where non-avro older data has an ambiguous list
+    Path test = writeDirect(
+        "message SingleFieldGroupInList {" +
+            "  optional group single_element_groups (LIST) {" +
+            "    repeated group single_element_group {" +
+            "      required int64 count;" +
+            "    }" +
+            "  }" +
+            "}",
+        new DirectWriter() {
+          @Override
+          public void write(RecordConsumer rc) {
+            rc.startMessage();
+            rc.startField("single_element_groups", 0);
+
+            rc.startGroup();
+            rc.startField("single_element_group", 0); // start writing array contents
+
+            rc.startGroup();
+            rc.startField("count", 0);
+            rc.addLong(1234L);
+            rc.endField("count", 0);
+            rc.endGroup();
+
+            rc.startGroup();
+            rc.startField("count", 0);
+            rc.addLong(2345L);
+            rc.endField("count", 0);
+            rc.endGroup();
+
+            rc.endField("single_element_group", 0); // finished writing array contents
+            rc.endGroup();
+
+            rc.endField("single_element_groups", 0);
+            rc.endMessage();
+          }
+        });
+
+    // can't tell from storage whether this should be a list of single-field
+    // records or if the single_field_group layer is synthetic.
+
+    // old behavior - assume that the repeated type is the element type
+    Schema singleElementGroupSchema = record("single_element_group",
+        field("count", primitive(Schema.Type.LONG)));
+    Schema oldSchema = record("SingleFieldGroupInList",
+        optionalField("single_element_groups", array(singleElementGroupSchema)));
+    GenericRecord oldRecord = instance(oldSchema,
+        "single_element_groups", Arrays.asList(
+            instance(singleElementGroupSchema, "count", 1234L),
+            instance(singleElementGroupSchema, "count", 2345L)));
+
+    assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);
+
+    // new behavior - assume that single_element_group is synthetic (in spec)
+    Schema newSchema = record("SingleFieldGroupInList",
+        optionalField("single_element_groups", array(primitive(Schema.Type.LONG))));
+    GenericRecord newRecord = instance(newSchema,
+        "single_element_groups", Arrays.asList(1234L, 2345L));
+
+    assertReaderContains(newBehaviorReader(test), newSchema, newRecord);
+  }
+
+  @Test
+  public void testSingleFieldGroupInListWithSchema() throws Exception {
+    // this tests the case where older data has an ambiguous structure, but the
+    // correct interpretation can be determined from the avro schema
+
+    Schema singleElementRecord = record("single_element_group",
+        field("count", primitive(Schema.Type.LONG)));
+
+    Schema expectedSchema = record("SingleFieldGroupInList",
+        optionalField("single_element_groups",
+            array(singleElementRecord)));
+
+    Map<String, String> metadata = new HashMap<String, String>();
+    metadata.put(AvroWriteSupport.AVRO_SCHEMA, expectedSchema.toString());
+
+    Path test = writeDirect(
+        "message SingleFieldGroupInList {" +
+            "  optional group single_element_groups (LIST) {" +
+            "    repeated group single_element_group {" +
+            "      required int64 count;" +
+            "    }" +
+            "  }" +
+            "}",
+        new DirectWriter() {
+          @Override
+          public void write(RecordConsumer rc) {
+            rc.startMessage();
+            rc.startField("single_element_groups", 0);
+
+            rc.startGroup();
+            rc.startField("single_element_group", 0); // start writing array contents
+
+            rc.startGroup();
+            rc.startField("count", 0);
+            rc.addLong(1234L);
+            rc.endField("count", 0);
+            rc.endGroup();
+
+            rc.startGroup();
+            rc.startField("count", 0);
+            rc.addLong(2345L);
+            rc.endField("count", 0);
+            rc.endGroup();
+
+            rc.endField("single_element_group", 0); // finished writing array contents
+            rc.endGroup();
+
+            rc.endField("single_element_groups", 0);
+            rc.endMessage();
+          }
+        },
+        metadata);
+
+    GenericRecord expectedRecord = instance(expectedSchema,
+        "single_element_groups", Arrays.asList(
+            instance(singleElementRecord, "count", 1234L),
+            instance(singleElementRecord, "count", 2345L)));
+
+    // both should behave the same way because the schema is present
+    assertReaderContains(oldBehaviorReader(test), expectedSchema, expectedRecord);
+    assertReaderContains(newBehaviorReader(test), expectedSchema, expectedRecord);
+  }
+
+  @Test
+  public void testNewOptionalGroupInList() throws Exception {
+    Path test = writeDirect(
+        "message NewOptionalGroupInList {" +
+            "  optional group locations (LIST) {" +
+            "    repeated group list {" +
+            "      optional group element {" +
+            "        required double latitude;" +
+            "        required double longitude;" +
+            "      }" +
+            "    }" +
+            "  }" +
+            "}",
+        new DirectWriter() {
+          @Override
+          public void write(RecordConsumer rc) {
+            rc.startMessage();
+            rc.startField("locations", 0);
+
+            rc.startGroup();
+            rc.startField("list", 0); // start writing array contents
+
+            // write a non-null element
+            rc.startGroup(); // array level
+            rc.startField("element", 0);
+
+            rc.startGroup();
+            rc.startField("latitude", 0);
+            rc.addDouble(0.0);
+            rc.endField("latitude", 0);
+            rc.startField("longitude", 1);
+            rc.addDouble(0.0);
+            rc.endField("longitude", 1);
+            rc.endGroup();
+
+            rc.endField("element", 0);
+            rc.endGroup(); // array level
+
+            // write a null element (element field is omitted)
+            rc.startGroup(); // array level
+            rc.endGroup(); // array level
+
+            // write a second non-null element
+            rc.startGroup(); // array level
+            rc.startField("element", 0);
+
+            rc.startGroup();
+            rc.startField("latitude", 0);
+            rc.addDouble(0.0);
+            rc.endField("latitude", 0);
+            rc.startField("longitude", 1);
+            rc.addDouble(180.0);
+            rc.endField("longitude", 1);
+            rc.endGroup();
+
+            rc.endField("element", 0);
+            rc.endGroup(); // array level
+
+            rc.endField("list", 0); // finished writing array contents
+            rc.endGroup();
+
+            rc.endField("locations", 0);
+            rc.endMessage();
+          }
+        });
+
+    Schema location = record("element",
+        field("latitude", primitive(Schema.Type.DOUBLE)),
+        field("longitude", primitive(Schema.Type.DOUBLE)));
+
+    // old behavior - assume that the repeated type is the element type
+    Schema elementRecord = record("list", optionalField("element", location));
+    Schema oldSchema = record("NewOptionalGroupInList",
+        optionalField("locations", array(elementRecord)));
+    GenericRecord oldRecord = instance(oldSchema,
+        "locations", Arrays.asList(
+            instance(elementRecord, "element",
+                instance(location, "latitude", 0.0, "longitude", 0.0)),
+            instance(elementRecord),
+            instance(elementRecord, "element",
+                instance(location, "latitude", 0.0, "longitude", 180.0))));
+
+    assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);
+
+    // new behavior - assume that single_element_group is synthetic (in spec)
+    Schema newSchema = record("NewOptionalGroupInList",
+        optionalField("locations", array(optional(location))));
+    GenericRecord newRecord = instance(newSchema,
+        "locations", Arrays.asList(
+            instance(location, "latitude", 0.0, "longitude", 0.0),
+            null,
+            instance(location, "latitude", 0.0, "longitude", 180.0)));
+
+    assertReaderContains(newBehaviorReader(test), newSchema, newRecord);
+  }
+
+  @Test
+  public void testNewRequiredGroupInList() throws Exception {
+    Path test = writeDirect(
+        "message NewRequiredGroupInList {" +
+            "  optional group locations (LIST) {" +
+            "    repeated group list {" +
+            "      required group element {" +
+            "        required double latitude;" +
+            "        required double longitude;" +
+            "      }" +
+            "    }" +
+            "  }" +
+            "}",
+        new DirectWriter() {
+          @Override
+          public void write(RecordConsumer rc) {
+            rc.startMessage();
+            rc.startField("locations", 0);
+
+            rc.startGroup();
+            rc.startField("list", 0); // start writing array contents
+
+            // write a non-null element
+            rc.startGroup(); // array level
+            rc.startField("element", 0);
+
+            rc.startGroup();
+            rc.startField("latitude", 0);
+            rc.addDouble(0.0);
+            rc.endField("latitude", 0);
+            rc.startField("longitude", 1);
+            rc.addDouble(180.0);
+            rc.endField("longitude", 1);
+            rc.endGroup();
+
+            rc.endField("element", 0);
+            rc.endGroup(); // array level
+
+            // write a second non-null element
+            rc.startGroup(); // array level
+            rc.startField("element", 0);
+
+            rc.startGroup();
+            rc.startField("latitude", 0);
+            rc.addDouble(0.0);
+            rc.endField("latitude", 0);
+            rc.startField("longitude", 1);
+            rc.addDouble(0.0);
+            rc.endField("longitude", 1);
+            rc.endGroup();
+
+            rc.endField("element", 0);
+            rc.endGroup(); // array level
+
+            rc.endField("list", 0); // finished writing array contents
+            rc.endGroup();
+
+            rc.endField("locations", 0);
+            rc.endMessage();
+          }
+        });
+
+    Schema location = record("element",
+        field("latitude", primitive(Schema.Type.DOUBLE)),
+        field("longitude", primitive(Schema.Type.DOUBLE)));
+
+    // old behavior - assume that the repeated type is the element type
+    Schema elementRecord = record("list", field("element", location));
+    Schema oldSchema = record("NewRequiredGroupInList",
+        optionalField("locations", array(elementRecord)));
+    GenericRecord oldRecord = instance(oldSchema,
+        "locations", Arrays.asList(
+            instance(elementRecord, "element",
+                instance(location, "latitude", 0.0, "longitude", 180.0)),
+            instance(elementRecord, "element",
+                instance(location, "latitude", 0.0, "longitude", 0.0))));
+
+    assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);
+
+    // new behavior - assume that single_element_group is synthetic (in spec)
+    Schema newSchema = record("NewRequiredGroupInList",
+        optionalField("locations", array(location)));
+    GenericRecord newRecord = instance(newSchema,
+        "locations", Arrays.asList(
+            instance(location, "latitude", 0.0, "longitude", 180.0),
+            instance(location, "latitude", 0.0, "longitude", 0.0)));
+
+    assertReaderContains(newBehaviorReader(test), newSchema, newRecord);
+  }
+
+  @Test
+  public void testAvroCompatRequiredGroupInList() throws Exception {
+    Path test = writeDirect(
+        "message AvroCompatRequiredGroupInList {" +
+            "  optional group locations (LIST) {" +
+            "    repeated group array {" +
+            "      optional group element {" +
+            "        required double latitude;" +
+            "        required double longitude;" +
+            "      }" +
+            "    }" +
+            "  }" +
+            "}",
+        new DirectWriter() {
+          @Override
+          public void write(RecordConsumer rc) {
+            rc.startMessage();
+            rc.startField("locations", 0);
+
+            rc.startGroup();
+            rc.startField("array", 0); // start writing array contents
+
+            // write a non-null element
+            rc.startGroup(); // array level
+            rc.startField("element", 0);
+
+            rc.startGroup();
+            rc.startField("latitude", 0);
+            rc.addDouble(0.0);
+            rc.endField("latitude", 0);
+            rc.startField("longitude", 1);
+            rc.addDouble(180.0);
+            rc.endField("longitude", 1);
+            rc.endGroup();
+
+            rc.endField("element", 0);
+            rc.endGroup(); // array level
+
+            // write a second non-null element
+            rc.startGroup(); // array level
+            rc.startField("element", 0);
+
+            rc.startGroup();
+            rc.startField("latitude", 0);
+            rc.addDouble(0.0);
+            rc.endField("latitude", 0);
+            rc.startField("longitude", 1);
+            rc.addDouble(0.0);
+            rc.endField("longitude", 1);
+            rc.endGroup();
+
+            rc.endField("element", 0);
+            rc.endGroup(); // array level
+
+            rc.endField("array", 0); // finished writing array contents
+            rc.endGroup();
+
+            rc.endField("locations", 0);
+            rc.endMessage();
+          }
+        });
+
+    Schema location = record("element",
+        field("latitude", primitive(Schema.Type.DOUBLE)),
+        field("longitude", primitive(Schema.Type.DOUBLE)));
+
+    // old behavior - assume that the repeated type is the element type
+    Schema elementRecord = record("array", optionalField("element", location));
+    Schema oldSchema = record("AvroCompatRequiredGroupInList",
+        optionalField("locations", array(elementRecord)));
+    GenericRecord oldRecord = instance(oldSchema,
+        "locations", Arrays.asList(
+            instance(elementRecord, "element",
+                instance(location, "latitude", 0.0, "longitude", 180.0)),
+            instance(elementRecord, "element",
+                instance(location, "latitude", 0.0, "longitude", 0.0))));
+
+    // both should detect the "array" name
+    assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);
+    assertReaderContains(newBehaviorReader(test), oldSchema, oldRecord);
+  }
+
+  @Test
+  public void testAvroCompatRequiredGroupInListWithSchema() throws Exception {
+    Path test = writeDirect(
+        "message AvroCompatRequiredGroupInListWithSchema {" +
+            "  optional group locations (LIST) {" +
+            "    repeated group array {" +
+            "      optional group element {" +
+            "        required double latitude;" +
+            "        required double longitude;" +
+            "      }" +
+            "    }" +
+            "  }" +
+            "}",
+        new DirectWriter() {
+          @Override
+          public void write(RecordConsumer rc) {
+            rc.startMessage();
+            rc.startField("locations", 0);
+
+            rc.startGroup();
+            rc.startField("array", 0); // start writing array contents
+
+            // write a non-null element
+            rc.startGroup(); // array level
+            rc.startField("element", 0);
+
+            rc.startGroup();
+            rc.startField("latitude", 0);
+            rc.addDouble(0.0);
+            rc.endField("latitude", 0);
+            rc.startField("longitude", 1);
+            rc.addDouble(180.0);
+            rc.endField("longitude", 1);
+            rc.endGroup();
+
+            rc.endField("element", 0);
+            rc.endGroup(); // array level
+
+            // write a second non-null element
+            rc.startGroup(); // array level
+            rc.startField("element", 0);
+
+            rc.startGroup();
+            rc.startField("latitude", 0);
+            rc.addDouble(0.0);
+            rc.endField("latitude", 0);
+            rc.startField("longitude", 1);
+            rc.addDouble(0.0);
+            rc.endField("longitude", 1);
+            rc.endGroup();
+
+            rc.endField("element", 0);
+            rc.endGroup(); // array level
+
+            rc.endField("array", 0); // finished writing array contents
+            rc.endGroup();
+
+            rc.endField("locations", 0);
+            rc.endMessage();
+          }
+        });
+
+    Schema location = record("element",
+        field("latitude", primitive(Schema.Type.DOUBLE)),
+        field("longitude", primitive(Schema.Type.DOUBLE)));
+
+    Schema newSchema = record("HiveCompatOptionalGroupInList",
+        optionalField("locations", array(optional(location))));
+    GenericRecord newRecord = instance(newSchema,
+        "locations", Arrays.asList(
+            instance(location, "latitude", 0.0, "longitude", 180.0),
+            instance(location, "latitude", 0.0, "longitude", 0.0)));
+
+    Configuration oldConfWithSchema = new Configuration();
+    AvroReadSupport.setAvroReadSchema(oldConfWithSchema, newSchema);
+
+    // both should use the schema structure that is provided
+    assertReaderContains(
+        new AvroParquetReader<GenericRecord>(oldConfWithSchema, test),
+        newSchema, newRecord);
+
+    Configuration newConfWithSchema = new Configuration(NEW_BEHAVIOR_CONF);
+    AvroReadSupport.setAvroReadSchema(newConfWithSchema, newSchema);
+
+    assertReaderContains(
+        new AvroParquetReader<GenericRecord>(newConfWithSchema, test),
+        newSchema, newRecord);
+  }
+
+  @Test
+  public void testThriftCompatRequiredGroupInList() throws Exception {
+    Path test = writeDirect(
+        "message ThriftCompatRequiredGroupInList {" +
+            "  optional group locations (LIST) {" +
+            "    repeated group locations_tuple {" +
+            "      optional group element {" +
+            "        required double latitude;" +
+            "        required double longitude;" +
+            "      }" +
+            "    }" +
+            "  }" +
+            "}",
+        new DirectWriter() {
+          @Override
+          public void write(RecordConsumer rc) {
+            rc.startMessage();
+            rc.startField("locations", 0);
+
+            rc.startGroup();
+            rc.startField("locations_tuple", 0); // start writing array contents
+
+            // write a non-null element
+            rc.startGroup(); // array level
+            rc.startField("element", 0);
+
+            rc.startGroup();
+            rc.startField("latitude", 0);
+            rc.addDouble(0.0);
+            rc.endField("latitude", 0);
+            rc.startField("longitude", 1);
+            rc.addDouble(180.0);
+            rc.endField("longitude", 1);
+            rc.endGroup();
+
+            rc.endField("element", 0);
+            rc.endGroup(); // array level
+
+            // write a second non-null element
+            rc.startGroup(); // array level
+            rc.startField("element", 0);
+
+            rc.startGroup();
+            rc.startField("latitude", 0);
+            rc.addDouble(0.0);
+            rc.endField("latitude", 0);
+            rc.startField("longitude", 1);
+            rc.addDouble(0.0);
+            rc.endField("longitude", 1);
+            rc.endGroup();
+
+            rc.endField("element", 0);
+            rc.endGroup(); // array level
+
+            rc.endField("locations_tuple", 0); // finished writing array contents
+            rc.endGroup();
+
+            rc.endField("locations", 0);
+            rc.endMessage();
+          }
+        });
+
+    Schema location = record("element",
+        field("latitude", primitive(Schema.Type.DOUBLE)),
+        field("longitude", primitive(Schema.Type.DOUBLE)));
+
+    // old behavior - assume that the repeated type is the element type
+    Schema elementRecord = record("locations_tuple", optionalField("element", location));
+    Schema oldSchema = record("ThriftCompatRequiredGroupInList",
+        optionalField("locations", array(elementRecord)));
+    GenericRecord oldRecord = instance(oldSchema,
+        "locations", Arrays.asList(
+            instance(elementRecord, "element",
+                instance(location, "latitude", 0.0, "longitude", 180.0)),
+            instance(elementRecord, "element",
+                instance(location, "latitude", 0.0, "longitude", 0.0))));
+
+    // both should detect the "array" name
+    assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);
+    assertReaderContains(newBehaviorReader(test), oldSchema, oldRecord);
+  }
+
+  @Test
+  public void testHiveCompatOptionalGroupInList() throws Exception {
+    Path test = writeDirect(
+        "message HiveCompatOptionalGroupInList {" +
+            "  optional group locations (LIST) {" +
+            "    repeated group bag {" +
+            "      optional group element {" +
+            "        required double latitude;" +
+            "        required double longitude;" +
+            "      }" +
+            "    }" +
+            "  }" +
+            "}",
+        new DirectWriter() {
+          @Override
+          public void write(RecordConsumer rc) {
+            rc.startMessage();
+            rc.startField("locations", 0);
+
+            rc.startGroup();
+            rc.startField("bag", 0); // start writing array contents
+
+            // write a non-null element
+            rc.startGroup(); // array level
+            rc.startField("element", 0);
+
+            rc.startGroup();
+            rc.startField("latitude", 0);
+            rc.addDouble(0.0);
+            rc.endField("latitude", 0);
+            rc.startField("longitude", 1);
+            rc.addDouble(180.0);
+            rc.endField("longitude", 1);
+            rc.endGroup();
+
+            rc.endField("element", 0);
+            rc.endGroup(); // array level
+
+            // write a second non-null element
+            rc.startGroup(); // array level
+            rc.startField("element", 0);
+
+            rc.startGroup();
+            rc.startField("latitude", 0);
+            rc.addDouble(0.0);
+            rc.endField("latitude", 0);
+            rc.startField("longitude", 1);
+            rc.addDouble(0.0);
+            rc.endField("longitude", 1);
+            rc.endGroup();
+
+            rc.endField("element", 0);
+            rc.endGroup(); // array level
+
+            rc.endField("bag", 0); // finished writing array contents
+            rc.endGroup();
+
+            rc.endField("locations", 0);
+            rc.endMessage();
+          }
+        });
+
+    Schema location = record("element",
+        field("latitude", primitive(Schema.Type.DOUBLE)),
+        field("longitude", primitive(Schema.Type.DOUBLE)));
+
+    // old behavior - assume that the repeated type is the element type
+    Schema elementRecord = record("bag", optionalField("element", location));
+    Schema oldSchema = record("HiveCompatOptionalGroupInList",
+        optionalField("locations", array(elementRecord)));
+    GenericRecord oldRecord = instance(oldSchema,
+        "locations", Arrays.asList(
+            instance(elementRecord, "element",
+                instance(location, "latitude", 0.0, "longitude", 180.0)),
+            instance(elementRecord, "element",
+                instance(location, "latitude", 0.0, "longitude", 0.0))));
+
+    // both should detect the "array" name
+    assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);
+
+    Schema newSchema = record("HiveCompatOptionalGroupInList",
+        optionalField("locations", array(optional(location))));
+    GenericRecord newRecord = instance(newSchema,
+        "locations", Arrays.asList(
+            instance(location, "latitude", 0.0, "longitude", 180.0),
+            instance(location, "latitude", 0.0, "longitude", 0.0)));
+
+    assertReaderContains(newBehaviorReader(test), newSchema, newRecord);
+  }
+
+  private interface DirectWriter {
+    public void write(RecordConsumer consumer);
+  }
+
+  private static class DirectWriteSupport extends WriteSupport<Void> {
+    private RecordConsumer recordConsumer;
+    private final MessageType type;
+    private final DirectWriter writer;
+    private final Map<String, String> metadata;
+
+    private DirectWriteSupport(MessageType type, DirectWriter writer,
+                               Map<String, String> metadata) {
+      this.type = type;
+      this.writer = writer;
+      this.metadata = metadata;
+    }
+
+    @Override
+    public WriteContext init(Configuration configuration) {
+      return new WriteContext(type, metadata);
+    }
+
+    @Override
+    public void prepareForWrite(RecordConsumer recordConsumer) {
+      this.recordConsumer = recordConsumer;
+    }
+
+    @Override
+    public void write(Void record) {
+      writer.write(recordConsumer);
+    }
+  }
+
+  private Path writeDirect(String type, DirectWriter writer) throws IOException {
+    return writeDirect(MessageTypeParser.parseMessageType(type), writer);
+  }
+
+  private Path writeDirect(String type, DirectWriter writer,
+                           Map<String, String> metadata) throws IOException {
+    return writeDirect(MessageTypeParser.parseMessageType(type), writer, metadata);
+  }
+
+  private Path writeDirect(MessageType type, DirectWriter writer) throws IOException {
+    return writeDirect(type, writer, new HashMap<String, String>());
+  }
+
+  private Path writeDirect(MessageType type, DirectWriter writer,
+                           Map<String, String> metadata) throws IOException {
+    File temp = tempDir.newFile(UUID.randomUUID().toString());
+    temp.deleteOnExit();
+    temp.delete();
+
+    Path path = new Path(temp.getPath());
+
+    ParquetWriter<Void> parquetWriter = new ParquetWriter<Void>(
+        path, new DirectWriteSupport(type, writer, metadata));
+    parquetWriter.write(null);
+    parquetWriter.close();
+
+    return path;
+  }
+
+  public <T extends IndexedRecord> AvroParquetReader<T> oldBehaviorReader(
+      Path path) throws IOException {
+    return new AvroParquetReader<T>(path);
+  }
+
+  public <T extends IndexedRecord> AvroParquetReader<T> newBehaviorReader(
+      Path path) throws IOException {
+    return new AvroParquetReader<T>(NEW_BEHAVIOR_CONF, path);
+  }
+
+  public <T extends IndexedRecord> void assertReaderContains(
+      AvroParquetReader<T> reader, Schema expectedSchema, T... expectedRecords)
+      throws IOException {
+    for (T expectedRecord : expectedRecords) {
+      T actualRecord = reader.read();
+      Assert.assertEquals("Should match expected schema",
+          expectedSchema, actualRecord.getSchema());
+      Assert.assertEquals("Should match the expected record",
+          expectedRecord, actualRecord);
+    }
+    Assert.assertNull("Should only contain " + expectedRecords.length +
+            " record" + (expectedRecords.length == 1 ? "" : "s"),
+        reader.read());
+  }
+
+}