You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ju...@apache.org on 2018/04/26 12:48:28 UTC

[parquet-mr] branch master updated: PARQUET-968 Add Hive/Presto support in ProtoParquet

This is an automated email from the ASF dual-hosted git repository.

julien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new f849384  PARQUET-968 Add Hive/Presto support in ProtoParquet
f849384 is described below

commit f84938441be49c665595c936ac631c3e5f171bf9
Author: Constantin Muraru <cm...@adobe.com>
AuthorDate: Thu Apr 26 08:48:08 2018 -0400

    PARQUET-968 Add Hive/Presto support in ProtoParquet
    
    This PR adds Hive (https://github.com/apache/hive) and Presto (https://github.com/prestodb/presto) support for parquet messages written with ProtoParquetWriter. Hive and other tools, such as Presto (used by AWS Athena), rely on specific LIST/MAP wrappers (as defined in the parquet spec: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md). These wrappers are currently missing from the ProtoParquet schema. AvroParquet works just fine, because it adds these wrappers whe [...]
    
    This is backward compatible. Messages written without the extra LIST/MAP wrappers are still being read successfully using the updated ProtoParquetReader.
    
    Regarding the change.
    Given the following protobuf schema:
    
    ```
    message ListOfPrimitives {
        repeated int64 my_repeated_id = 1;
    }
    ```
    
    Old parquet schema was:
    ```
    message ListOfPrimitives {
      repeated int64 my_repeated_id = 1;
    }
    ```
    
    New parquet schema is:
    ```
    message ListOfPrimitives {
      required group my_repeated_id (LIST) = 1 {
        repeated group list {
          required int64 element;
        }
      }
    }
    ```
    ---
    
    For list of messages, the changes look like this:
    
    Protobuf schema:
    ```
    message ListOfMessages {
        string top_field = 1;
        repeated MyInnerMessage first_array = 2;
    }
    
    message MyInnerMessage {
        int32 inner_field = 1;
    }
    ```
    
    Old parquet schema was:
    ```
    message TestProto3.ListOfMessages {
      optional binary top_field (UTF8) = 1;
      repeated group first_array = 2 {
        optional int32 inner_field = 1;
      }
    }
    ```
    
    The expected parquet schema, compatible with Hive (and similar to parquet-avro) is the following (notice the LIST wrapper):
    
    ```
    message TestProto3.ListOfMessages {
      optional binary top_field (UTF8) = 1;
      required group first_array (LIST) = 2 {
        repeated group list {
          optional group element {
            optional int32 inner_field = 1;
          }
        }
      }
    }
    ```
    
    ---
    
    Similar for maps. Protobuf schema:
    ```
    message TopMessage {
        map<int64, MyInnerMessage> myMap = 1;
    }
    
    message MyInnerMessage {
        int32 inner_field = 1;
    }
    ```
    
    Old parquet schema:
    ```
    message TestProto3.TopMessage {
      repeated group myMap = 1 {
        optional int64 key = 1;
        optional group value = 2 {
          optional int32 inner_field = 1;
        }
      }
    }
    ```
    
    New parquet schema (notice the `MAP` wrapper):
    ```
    message TestProto3.TopMessage {
      required group myMap (MAP) = 1 {
        repeated group key_value {
          required int64 key;
          optional group value {
            optional int32 inner_field = 1;
          }
        }
      }
    }
    ```
    
    Jira: https://issues.apache.org/jira/browse/PARQUET-968
    
    Author: Constantin Muraru <cm...@adobe.com>
    Author: Benoît Hanotte <Be...@users.noreply.github.com>
    
    Closes #411 from costimuraru/PARQUET-968 and squashes the following commits:
    
    16eafcb6 [Benoît Hanotte] PARQUET-968 add proto flag to enable writing using specs-compliant schemas (#2)
    a8bd7041 [Constantin Muraru] Pick up commit from @andredasilvapinto
    5cf92487 [Constantin Muraru] PARQUET-968 Add Hive support in ProtoParquet
---
 .../parquet/proto/ProtoMessageConverter.java       | 126 ++++-
 .../apache/parquet/proto/ProtoSchemaConverter.java | 169 ++++--
 .../apache/parquet/proto/ProtoWriteSupport.java    | 190 +++++--
 .../parquet/proto/ProtoInputOutputFormatTest.java  | 120 +++++
 .../parquet/proto/ProtoSchemaConverterTest.java    | 219 +++++++-
 .../parquet/proto/ProtoWriteSupportTest.java       | 565 ++++++++++++++++++++-
 .../apache/parquet/proto/utils/WriteUsingMR.java   |  10 +-
 .../src/test/resources/TestProto3.proto            |   8 +
 .../src/test/resources/TestProtobuf.proto          |   8 +
 9 files changed, 1331 insertions(+), 84 deletions(-)

diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java
index 890f16c..979d78e 100644
--- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java
+++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java
@@ -24,12 +24,14 @@ import com.google.protobuf.Message;
 import com.twitter.elephantbird.util.Protobufs;
 import org.apache.parquet.column.Dictionary;
 import org.apache.parquet.io.InvalidRecordException;
+import org.apache.parquet.io.ParquetDecodingException;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.io.api.Converter;
 import org.apache.parquet.io.api.GroupConverter;
 import org.apache.parquet.io.api.PrimitiveConverter;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.IncompatibleSchemaModificationException;
+import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.Type;
 
 import java.util.HashMap;
@@ -126,10 +128,15 @@ class ProtoMessageConverter extends GroupConverter {
       };
     }
 
+    if (OriginalType.LIST == parquetType.getOriginalType()) {
+      return new ListConverter(parentBuilder, fieldDescriptor, parquetType);
+    }
+    if (OriginalType.MAP == parquetType.getOriginalType()) {
+      return new MapConverter(parentBuilder, fieldDescriptor, parquetType);
+    }
     return newScalarConverter(parent, parentBuilder, fieldDescriptor, parquetType);
   }
 
-
   private Converter newScalarConverter(ParentValueContainer pvc, Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) {
 
     JavaType javaType = fieldDescriptor.getJavaType();
@@ -342,4 +349,121 @@ class ProtoMessageConverter extends GroupConverter {
     }
 
   }
+
+  /**
+   * This class unwraps the additional LIST wrapper and makes it possible to read the underlying data and then convert
+   * it to protobuf.
+   * <p>
+   * Consider the following protobuf schema:
+   * message SimpleList {
+   *   repeated int64 first_array = 1;
+   * }
+   * <p>
+   * A LIST wrapper is created in parquet for the above mentioned protobuf schema:
+   * message SimpleList {
+   *   optional group first_array (LIST) = 1 {
+   *     repeated group list {
+   *         optional int32 element;
+   *     }
+   *   }
+   * }
+   * <p>
+   * The LIST wrappers are used by 3rd party tools, such as Hive, to read parquet arrays. The wrapper contains
+   * a repeated group named 'list', itself containing only one field called 'element' of the type of the repeated
+   * object (can be a primitive as in this example or a group in case of a repeated message in protobuf).
+   */
+  final class ListConverter extends GroupConverter {
+    private final Converter converter;
+
+    public ListConverter(Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) {
+      OriginalType originalType = parquetType.getOriginalType();
+      if (originalType != OriginalType.LIST || parquetType.isPrimitive()) {
+        throw new ParquetDecodingException("Expected LIST wrapper. Found: " + originalType + " instead.");
+      }
+
+      GroupType rootWrapperType = parquetType.asGroupType();
+      if (!rootWrapperType.containsField("list") || rootWrapperType.getType("list").isPrimitive()) {
+        throw new ParquetDecodingException("Expected repeated 'list' group inside LIST wrapperr but got: " + rootWrapperType);
+      }
+
+      GroupType listType = rootWrapperType.getType("list").asGroupType();
+      if (!listType.containsField("element")) {
+        throw new ParquetDecodingException("Expected 'element' inside repeated list group but got: " + listType);
+      }
+
+      Type elementType = listType.getType("element");
+      converter = newMessageConverter(parentBuilder, fieldDescriptor, elementType);
+    }
+
+    @Override
+    public Converter getConverter(int fieldIndex) {
+      if (fieldIndex > 0) {
+        throw new ParquetDecodingException("Unexpected multiple fields in the LIST wrapper");
+      }
+
+      return new GroupConverter() {
+        @Override
+        public Converter getConverter(int fieldIndex) {
+          return converter;
+        }
+
+        @Override
+        public void start() {
+
+        }
+
+        @Override
+        public void end() {
+
+        }
+      };
+    }
+
+    @Override
+    public void start() {
+
+    }
+
+    @Override
+    public void end() {
+
+    }
+  }
+
+
+  final class MapConverter extends GroupConverter {
+    private final Converter converter;
+
+    public MapConverter(Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) {
+      OriginalType originalType = parquetType.getOriginalType();
+      if (originalType != OriginalType.MAP) {
+        throw new ParquetDecodingException("Expected MAP wrapper. Found: " + originalType + " instead.");
+      }
+
+      Type parquetSchema;
+      if (parquetType.asGroupType().containsField("key_value")){
+        parquetSchema = parquetType.asGroupType().getType("key_value");
+      } else {
+        throw new ParquetDecodingException("Expected map but got: " + parquetType);
+      }
+
+      converter = newMessageConverter(parentBuilder, fieldDescriptor, parquetSchema);
+    }
+
+    @Override
+    public Converter getConverter(int fieldIndex) {
+      if (fieldIndex > 0) {
+        throw new ParquetDecodingException("Unexpected multiple fields in the MAP wrapper");
+      }
+      return converter;
+    }
+
+    @Override
+    public void start() {
+    }
+
+    @Override
+    public void end() {
+    }
+  }
 }
diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java
index 64668c0..0e1aa20 100644
--- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java
+++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,36 +18,49 @@
  */
 package org.apache.parquet.proto;
 
-import static org.apache.parquet.schema.OriginalType.ENUM;
-import static org.apache.parquet.schema.OriginalType.UTF8;
-import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
-import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
-import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
-import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
-import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
-import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
-
-import java.util.List;
-
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
+import com.google.protobuf.Message;
+import com.twitter.elephantbird.util.Protobufs;
 import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 import org.apache.parquet.schema.Type;
 import org.apache.parquet.schema.Types;
 import org.apache.parquet.schema.Types.Builder;
 import org.apache.parquet.schema.Types.GroupBuilder;
-
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
-import com.google.protobuf.Message;
-import com.twitter.elephantbird.util.Protobufs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
+
+import static org.apache.parquet.schema.OriginalType.ENUM;
+import static org.apache.parquet.schema.OriginalType.UTF8;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*;
+
 /**
  * Converts a Protocol Buffer Descriptor into a Parquet schema.
  */
 public class ProtoSchemaConverter {
 
   private static final Logger LOG = LoggerFactory.getLogger(ProtoSchemaConverter.class);
+  private final boolean parquetSpecsCompliant;
+
+  public ProtoSchemaConverter() {
+    this(false);
+  }
+
+  /**
+   * Instanciate a schema converter to get the parquet schema corresponding to protobuf classes.
+   * @param parquetSpecsCompliant   If set to false, the parquet schema generated will be using the old
+   *                                schema style (prior to PARQUET-968) to provide backward-compatibility
+   *                                but which does not use LIST and MAP wrappers around collections as required
+   *                                by the parquet specifications. If set to true, specs compliant schemas are used.
+   */
+  public ProtoSchemaConverter(boolean parquetSpecsCompliant) {
+    this.parquetSpecsCompliant = parquetSpecsCompliant;
+  }
 
   public MessageType convert(Class<? extends Message> protobufClass) {
     LOG.debug("Converting protocol buffer class \"" + protobufClass + "\" to parquet schema.");
@@ -60,8 +73,8 @@ public class ProtoSchemaConverter {
   }
 
   /* Iterates over list of fields. **/
-  private <T> GroupBuilder<T> convertFields(GroupBuilder<T> groupBuilder, List<Descriptors.FieldDescriptor> fieldDescriptors) {
-    for (Descriptors.FieldDescriptor fieldDescriptor : fieldDescriptors) {
+  private <T> GroupBuilder<T> convertFields(GroupBuilder<T> groupBuilder, List<FieldDescriptor> fieldDescriptors) {
+    for (FieldDescriptor fieldDescriptor : fieldDescriptors) {
       groupBuilder =
           addField(fieldDescriptor, groupBuilder)
           .id(fieldDescriptor.getNumber())
@@ -70,7 +83,7 @@ public class ProtoSchemaConverter {
     return groupBuilder;
   }
 
-  private Type.Repetition getRepetition(Descriptors.FieldDescriptor descriptor) {
+  private Type.Repetition getRepetition(FieldDescriptor descriptor) {
     if (descriptor.isRequired()) {
       return Type.Repetition.REQUIRED;
     } else if (descriptor.isRepeated()) {
@@ -80,26 +93,110 @@ public class ProtoSchemaConverter {
     }
   }
 
-  private <T> Builder<? extends Builder<?, GroupBuilder<T>>, GroupBuilder<T>> addField(Descriptors.FieldDescriptor descriptor, GroupBuilder<T> builder) {
-    Type.Repetition repetition = getRepetition(descriptor);
-    JavaType javaType = descriptor.getJavaType();
+  private <T> Builder<? extends Builder<?, GroupBuilder<T>>, GroupBuilder<T>> addField(FieldDescriptor descriptor, final GroupBuilder<T> builder) {
+    if (descriptor.getJavaType() == JavaType.MESSAGE) {
+      return addMessageField(descriptor, builder);
+    }
+
+    ParquetType parquetType = getParquetType(descriptor);
+    if (descriptor.isRepeated() && parquetSpecsCompliant) {
+      // the old schema style did not include the LIST wrapper around repeated fields
+      return addRepeatedPrimitive(descriptor, parquetType.primitiveType, parquetType.originalType, builder);
+    }
+
+    return builder.primitive(parquetType.primitiveType, getRepetition(descriptor)).as(parquetType.originalType);
+  }
+
+  private <T> Builder<? extends Builder<?, GroupBuilder<T>>, GroupBuilder<T>> addRepeatedPrimitive(FieldDescriptor descriptor,
+                                                                                                   PrimitiveTypeName primitiveType,
+                                                                                                   OriginalType originalType,
+                                                                                                   final GroupBuilder<T> builder) {
+    return builder
+        .group(Type.Repetition.OPTIONAL).as(OriginalType.LIST)
+          .group(Type.Repetition.REPEATED)
+            .primitive(primitiveType, Type.Repetition.REQUIRED).as(originalType)
+          .named("element")
+        .named("list");
+  }
+
+  private <T> GroupBuilder<GroupBuilder<T>> addRepeatedMessage(FieldDescriptor descriptor, GroupBuilder<T> builder) {
+    GroupBuilder<GroupBuilder<GroupBuilder<GroupBuilder<T>>>> result =
+      builder
+        .group(Type.Repetition.OPTIONAL).as(OriginalType.LIST)
+        .group(Type.Repetition.REPEATED)
+        .group(Type.Repetition.OPTIONAL);
+
+    convertFields(result, descriptor.getMessageType().getFields());
+
+    return result.named("element").named("list");
+  }
+
+  private <T> GroupBuilder<GroupBuilder<T>> addMessageField(FieldDescriptor descriptor, final GroupBuilder<T> builder) {
+    if (descriptor.isMapField() && parquetSpecsCompliant) {
+      // the old schema style did not include the MAP wrapper around map groups
+      return addMapField(descriptor, builder);
+    }
+    if (descriptor.isRepeated() && parquetSpecsCompliant) {
+      // the old schema style did not include the LIST wrapper around repeated messages
+      return addRepeatedMessage(descriptor, builder);
+    }
+
+    // Plain message
+    GroupBuilder<GroupBuilder<T>> group = builder.group(getRepetition(descriptor));
+    convertFields(group, descriptor.getMessageType().getFields());
+    return group;
+  }
+
+  private <T> GroupBuilder<GroupBuilder<T>> addMapField(FieldDescriptor descriptor, final GroupBuilder<T> builder) {
+    List<FieldDescriptor> fields = descriptor.getMessageType().getFields();
+    if (fields.size() != 2) {
+      throw new UnsupportedOperationException("Expected two fields for the map (key/value), but got: " + fields);
+    }
+
+    ParquetType mapKeyParquetType = getParquetType(fields.get(0));
+
+    GroupBuilder<GroupBuilder<GroupBuilder<T>>> group = builder
+      .group(Type.Repetition.OPTIONAL).as(OriginalType.MAP) // only optional maps are allowed in Proto3
+      .group(Type.Repetition.REPEATED) // key_value wrapper
+      .primitive(mapKeyParquetType.primitiveType, Type.Repetition.REQUIRED).as(mapKeyParquetType.originalType).named("key");
+
+    return addField(fields.get(1), group).named("value")
+      .named("key_value");
+  }
+
+  private ParquetType getParquetType(FieldDescriptor fieldDescriptor) {
+
+    JavaType javaType = fieldDescriptor.getJavaType();
     switch (javaType) {
-      case BOOLEAN: return builder.primitive(BOOLEAN, repetition);
-      case INT: return builder.primitive(INT32, repetition);
-      case LONG: return builder.primitive(INT64, repetition);
-      case FLOAT: return builder.primitive(FLOAT, repetition);
-      case DOUBLE: return builder.primitive(DOUBLE, repetition);
-      case BYTE_STRING: return builder.primitive(BINARY, repetition);
-      case STRING: return builder.primitive(BINARY, repetition).as(UTF8);
-      case MESSAGE: {
-        GroupBuilder<GroupBuilder<T>> group = builder.group(repetition);
-        convertFields(group, descriptor.getMessageType().getFields());
-        return group;
-      }
-      case ENUM: return builder.primitive(BINARY, repetition).as(ENUM);
+      case INT: return ParquetType.of(INT32);
+      case LONG: return ParquetType.of(INT64);
+      case DOUBLE: return ParquetType.of(DOUBLE);
+      case BOOLEAN: return ParquetType.of(BOOLEAN);
+      case FLOAT: return ParquetType.of(FLOAT);
+      case STRING: return ParquetType.of(BINARY, UTF8);
+      case ENUM: return ParquetType.of(BINARY, ENUM);
+      case BYTE_STRING: return ParquetType.of(BINARY);
       default:
         throw new UnsupportedOperationException("Cannot convert Protocol Buffer: unknown type " + javaType);
     }
   }
 
+  private static class ParquetType {
+    PrimitiveTypeName primitiveType;
+    OriginalType originalType;
+
+    private ParquetType(PrimitiveTypeName primitiveType, OriginalType originalType) {
+      this.primitiveType = primitiveType;
+      this.originalType = originalType;
+    }
+
+    public static ParquetType of(PrimitiveTypeName primitiveType, OriginalType originalType) {
+      return new ParquetType(primitiveType, originalType);
+    }
+
+    public static ParquetType of(PrimitiveTypeName primitiveType) {
+      return of(primitiveType, null);
+    }
+  }
+
 }
diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java
index 19b0706..59c236f 100644
--- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java
+++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java
@@ -18,12 +18,9 @@
  */
 package org.apache.parquet.proto;
 
-import com.google.protobuf.ByteString;
-import com.google.protobuf.DescriptorProtos;
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.MessageOrBuilder;
-import com.google.protobuf.TextFormat;
+import com.google.protobuf.*;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor;
 import com.twitter.elephantbird.util.Protobufs;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.hadoop.BadConfigurationException;
@@ -31,14 +28,13 @@ import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.parquet.io.InvalidRecordException;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.io.api.RecordConsumer;
-import org.apache.parquet.schema.GroupType;
-import org.apache.parquet.schema.IncompatibleSchemaModificationException;
-import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.*;
 import org.apache.parquet.schema.Type;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.lang.reflect.Array;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -50,7 +46,13 @@ public class ProtoWriteSupport<T extends MessageOrBuilder> extends WriteSupport<
 
   private static final Logger LOG = LoggerFactory.getLogger(ProtoWriteSupport.class);
   public static final String PB_CLASS_WRITE = "parquet.proto.writeClass";
+  // PARQUET-968 introduces changes to allow writing specs compliant schemas with parquet-protobuf.
+  // In the past, collection were not written using the LIST and MAP wrappers and thus were not compliant
+  // with the parquet specs. This flag, is set to true, allows to write using spec compliant schemas
+  // but is set to false by default to keep backward compatibility
+  public static final String PB_SPECS_COMPLIANT_WRITE = "parquet.proto.writeSpecsCompliant";
 
+  private boolean writeSpecsCompliant = false;
   private RecordConsumer recordConsumer;
   private Class<? extends Message> protoMessage;
   private MessageWriter messageWriter;
@@ -72,6 +74,16 @@ public class ProtoWriteSupport<T extends MessageOrBuilder> extends WriteSupport<
   }
 
   /**
+   * Make parquet-protobuf use the LIST and MAP wrappers for collections. Set to false if you need backward
+   * compatibility with parquet before PARQUET-968 (1.9.0 and older).
+   * @param configuration           The hadoop configuration
+   * @param writeSpecsCompliant     If set to true, the old schema style will be used (without wrappers).
+   */
+  public static void setWriteSpecsCompliant(Configuration configuration, boolean writeSpecsCompliant) {
+    configuration.setBoolean(PB_SPECS_COMPLIANT_WRITE, writeSpecsCompliant);
+  }
+
+  /**
    * Writes Protocol buffer to parquet file.
    * @param record instance of Message.Builder or Message.
    * */
@@ -108,8 +120,9 @@ public class ProtoWriteSupport<T extends MessageOrBuilder> extends WriteSupport<
       }
     }
 
-    MessageType rootSchema = new ProtoSchemaConverter().convert(protoMessage);
-    Descriptors.Descriptor messageDescriptor = Protobufs.getMessageDescriptor(protoMessage);
+    writeSpecsCompliant = configuration.getBoolean(PB_SPECS_COMPLIANT_WRITE, writeSpecsCompliant);
+    MessageType rootSchema = new ProtoSchemaConverter(writeSpecsCompliant).convert(protoMessage);
+    Descriptor messageDescriptor = Protobufs.getMessageDescriptor(protoMessage);
     validatedMapping(messageDescriptor, rootSchema);
 
     this.messageWriter = new MessageWriter(messageDescriptor, rootSchema);
@@ -117,6 +130,7 @@ public class ProtoWriteSupport<T extends MessageOrBuilder> extends WriteSupport<
     Map<String, String> extraMetaData = new HashMap<String, String>();
     extraMetaData.put(ProtoReadSupport.PB_CLASS, protoMessage.getName());
     extraMetaData.put(ProtoReadSupport.PB_DESCRIPTOR, serializeDescriptor(protoMessage));
+    extraMetaData.put(PB_SPECS_COMPLIANT_WRITE, String.valueOf(writeSpecsCompliant));
     return new WriteContext(rootSchema, extraMetaData);
   }
 
@@ -152,17 +166,21 @@ public class ProtoWriteSupport<T extends MessageOrBuilder> extends WriteSupport<
     final FieldWriter[] fieldWriters;
 
     @SuppressWarnings("unchecked")
-    MessageWriter(Descriptors.Descriptor descriptor, GroupType schema) {
-      List<Descriptors.FieldDescriptor> fields = descriptor.getFields();
+    MessageWriter(Descriptor descriptor, GroupType schema) {
+      List<FieldDescriptor> fields = descriptor.getFields();
       fieldWriters = (FieldWriter[]) Array.newInstance(FieldWriter.class, fields.size());
 
-      for (Descriptors.FieldDescriptor fieldDescriptor: fields) {
+      for (FieldDescriptor fieldDescriptor: fields) {
         String name = fieldDescriptor.getName();
         Type type = schema.getType(name);
         FieldWriter writer = createWriter(fieldDescriptor, type);
 
-        if(fieldDescriptor.isRepeated()) {
-         writer = new ArrayWriter(writer);
+        if(writeSpecsCompliant && fieldDescriptor.isRepeated() && !fieldDescriptor.isMapField()) {
+          writer = new ArrayWriter(writer);
+        }
+        else if (!writeSpecsCompliant && fieldDescriptor.isRepeated()) {
+          // the old schemas style used to write maps as repeated fields instead of wrapping them in a LIST
+          writer = new RepeatedWriter(writer);
         }
 
         writer.setFieldName(name);
@@ -172,11 +190,11 @@ public class ProtoWriteSupport<T extends MessageOrBuilder> extends WriteSupport<
       }
     }
 
-    private FieldWriter createWriter(Descriptors.FieldDescriptor fieldDescriptor, Type type) {
+    private FieldWriter createWriter(FieldDescriptor fieldDescriptor, Type type) {
 
       switch (fieldDescriptor.getJavaType()) {
         case STRING: return new StringWriter() ;
-        case MESSAGE: return new MessageWriter(fieldDescriptor.getMessageType(), type.asGroupType());
+        case MESSAGE: return createMessageWriter(fieldDescriptor, type);
         case INT: return new IntWriter();
         case LONG: return new LongWriter();
         case FLOAT: return new FloatWriter();
@@ -189,6 +207,47 @@ public class ProtoWriteSupport<T extends MessageOrBuilder> extends WriteSupport<
       return unknownType(fieldDescriptor);//should not be executed, always throws exception.
     }
 
+    private FieldWriter createMessageWriter(FieldDescriptor fieldDescriptor, Type type) {
+      if (fieldDescriptor.isMapField() && writeSpecsCompliant) {
+        return createMapWriter(fieldDescriptor, type);
+      }
+
+      return new MessageWriter(fieldDescriptor.getMessageType(), getGroupType(type));
+    }
+
+    private GroupType getGroupType(Type type) {
+      if (type.getOriginalType() == OriginalType.LIST) {
+        return type.asGroupType().getType("list").asGroupType().getType("element").asGroupType();
+      }
+
+      if (type.getOriginalType() == OriginalType.MAP) {
+        return type.asGroupType().getType("key_value").asGroupType().getType("value").asGroupType();
+      }
+
+      return type.asGroupType();
+    }
+
+    private MapWriter createMapWriter(FieldDescriptor fieldDescriptor, Type type) {
+      List<FieldDescriptor> fields = fieldDescriptor.getMessageType().getFields();
+      if (fields.size() != 2) {
+        throw new UnsupportedOperationException("Expected two fields for the map (key/value), but got: " + fields);
+      }
+
+      // KeyFieldWriter
+      FieldDescriptor keyProtoField = fields.get(0);
+      FieldWriter keyWriter = createWriter(keyProtoField, type);
+      keyWriter.setFieldName(keyProtoField.getName());
+      keyWriter.setIndex(0);
+
+      // ValueFieldWriter
+      FieldDescriptor valueProtoField = fields.get(1);
+      FieldWriter valueWriter = createWriter(valueProtoField, type);
+      valueWriter.setFieldName(valueProtoField.getName());
+      valueWriter.setIndex(1);
+
+      return new MapWriter(keyWriter, valueWriter);
+    }
+
     /** Writes top level message. It cannot call startGroup() */
     void writeTopLevelMessage(Object value) {
       writeAllFields((MessageOrBuilder) value);
@@ -206,18 +265,16 @@ public class ProtoWriteSupport<T extends MessageOrBuilder> extends WriteSupport<
     @Override
     final void writeField(Object value) {
       recordConsumer.startField(fieldName, index);
-      recordConsumer.startGroup();
-      writeAllFields((MessageOrBuilder) value);
-      recordConsumer.endGroup();
+      writeRawValue(value);
       recordConsumer.endField(fieldName, index);
     }
 
     private void writeAllFields(MessageOrBuilder pb) {
       //returns changed fields with values. Map is ordered by id.
-      Map<Descriptors.FieldDescriptor, Object> changedPbFields = pb.getAllFields();
+      Map<FieldDescriptor, Object> changedPbFields = pb.getAllFields();
 
-      for (Map.Entry<Descriptors.FieldDescriptor, Object> entry : changedPbFields.entrySet()) {
-        Descriptors.FieldDescriptor fieldDescriptor = entry.getKey();
+      for (Map.Entry<FieldDescriptor, Object> entry : changedPbFields.entrySet()) {
+        FieldDescriptor fieldDescriptor = entry.getKey();
 
         if(fieldDescriptor.isExtension()) {
           // Field index of an extension field might overlap with a base field.
@@ -246,6 +303,45 @@ public class ProtoWriteSupport<T extends MessageOrBuilder> extends WriteSupport<
     @Override
     final void writeField(Object value) {
       recordConsumer.startField(fieldName, index);
+      recordConsumer.startGroup();
+      List<?> list = (List<?>) value;
+
+      recordConsumer.startField("list", 0); // This is the wrapper group for the array field
+      for (Object listEntry: list) {
+        recordConsumer.startGroup();
+        recordConsumer.startField("element", 0); // This is the mandatory inner field
+
+        fieldWriter.writeRawValue(listEntry);
+
+        recordConsumer.endField("element", 0);
+        recordConsumer.endGroup();
+      }
+      recordConsumer.endField("list", 0);
+
+      recordConsumer.endGroup();
+      recordConsumer.endField(fieldName, index);
+    }
+  }
+
+  /**
+   * The RepeatedWriter is used to write collections (lists and maps) using the old style (without LIST and MAP
+   * wrappers).
+   */
+  class RepeatedWriter extends FieldWriter {
+    final FieldWriter fieldWriter;
+
+    RepeatedWriter(FieldWriter fieldWriter) {
+      this.fieldWriter = fieldWriter;
+    }
+
+    @Override
+    final void writeRawValue(Object value) {
+      throw new UnsupportedOperationException("Array has no raw value");
+    }
+
+    @Override
+    final void writeField(Object value) {
+      recordConsumer.startField(fieldName, index);
       List<?> list = (List<?>) value;
 
       for (Object listEntry: list) {
@@ -257,10 +353,10 @@ public class ProtoWriteSupport<T extends MessageOrBuilder> extends WriteSupport<
   }
 
   /** validates mapping between protobuffer fields and parquet fields.*/
-  private void validatedMapping(Descriptors.Descriptor descriptor, GroupType parquetSchema) {
-    List<Descriptors.FieldDescriptor> allFields = descriptor.getFields();
+  private void validatedMapping(Descriptor descriptor, GroupType parquetSchema) {
+    List<FieldDescriptor> allFields = descriptor.getFields();
 
-    for (Descriptors.FieldDescriptor fieldDescriptor: allFields) {
+    for (FieldDescriptor fieldDescriptor: allFields) {
       String fieldName = fieldDescriptor.getName();
       int fieldIndex = fieldDescriptor.getIndex();
       int parquetIndex = parquetSchema.getFieldIndex(fieldName);
@@ -295,6 +391,41 @@ public class ProtoWriteSupport<T extends MessageOrBuilder> extends WriteSupport<
     }
   }
 
+  class MapWriter extends FieldWriter {
+
+    private final FieldWriter keyWriter;
+    private final FieldWriter valueWriter;
+
+    public MapWriter(FieldWriter keyWriter, FieldWriter valueWriter) {
+      super();
+      this.keyWriter = keyWriter;
+      this.valueWriter = valueWriter;
+    }
+
+    @Override
+    final void writeRawValue(Object value) {
+      recordConsumer.startGroup();
+
+      recordConsumer.startField("key_value", 0); // This is the wrapper group for the map field
+      for (Message msg : (Collection<Message>) value) {
+        recordConsumer.startGroup();
+
+        final Descriptor descriptorForType = msg.getDescriptorForType();
+        final FieldDescriptor keyDesc = descriptorForType.findFieldByName("key");
+        final FieldDescriptor valueDesc = descriptorForType.findFieldByName("value");
+
+        keyWriter.writeField(msg.getField(keyDesc));
+        valueWriter.writeField(msg.getField(valueDesc));
+
+        recordConsumer.endGroup();
+      }
+
+      recordConsumer.endField("key_value", 0);
+
+      recordConsumer.endGroup();
+    }
+  }
+
   class FloatWriter extends FieldWriter {
     @Override
     final void writeRawValue(Object value) {
@@ -333,7 +464,7 @@ public class ProtoWriteSupport<T extends MessageOrBuilder> extends WriteSupport<
     }
   }
 
-  private FieldWriter unknownType(Descriptors.FieldDescriptor fieldDescriptor) {
+  private FieldWriter unknownType(FieldDescriptor fieldDescriptor) {
     String exceptionMsg = "Unknown type with descriptor \"" + fieldDescriptor
             + "\" and type \"" + fieldDescriptor.getJavaType() + "\".";
     throw new InvalidRecordException(exceptionMsg);
@@ -341,9 +472,8 @@ public class ProtoWriteSupport<T extends MessageOrBuilder> extends WriteSupport<
 
   /** Returns message descriptor as JSON String*/
   private String serializeDescriptor(Class<? extends Message> protoClass) {
-    Descriptors.Descriptor descriptor = Protobufs.getMessageDescriptor(protoClass);
+    Descriptor descriptor = Protobufs.getMessageDescriptor(protoClass);
     DescriptorProtos.DescriptorProto asProto = descriptor.toProto();
     return TextFormat.printToString(asProto);
   }
-
 }
diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoInputOutputFormatTest.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoInputOutputFormatTest.java
index 6c01d7b..5544dc6 100644
--- a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoInputOutputFormatTest.java
+++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoInputOutputFormatTest.java
@@ -19,6 +19,7 @@
 package org.apache.parquet.proto;
 
 import com.google.protobuf.Message;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.proto.test.TestProto3;
 import org.apache.parquet.proto.test.TestProtobuf;
@@ -193,6 +194,125 @@ public class ProtoInputOutputFormatTest {
     assertEquals("writtenString", stringValue);
   }
 
+  @Test
+  public void testRepeatedIntMessageClass() throws Exception {
+    TestProtobuf.RepeatedIntMessage msgEmpty = TestProtobuf.RepeatedIntMessage.newBuilder().build();
+    TestProtobuf.RepeatedIntMessage msgNonEmpty = TestProtobuf.RepeatedIntMessage.newBuilder()
+      .addRepeatedInt(1).addRepeatedInt(2)
+      .build();
+
+    Path outputPath = new WriteUsingMR().write(msgEmpty, msgNonEmpty);
+    ReadUsingMR readUsingMR = new ReadUsingMR();
+    String customClass = TestProtobuf.RepeatedIntMessage.class.getName();
+    ProtoReadSupport.setProtobufClass(readUsingMR.getConfiguration(), customClass);
+    List<Message> result = readUsingMR.read(outputPath);
+
+    assertEquals(2, result.size());
+    assertEquals(msgEmpty, result.get(0));
+    assertEquals(msgNonEmpty, result.get(1));
+  }
+
+  @Test
+  public void testRepeatedIntMessageClassSchemaCompliant() throws Exception {
+    TestProtobuf.RepeatedIntMessage msgEmpty = TestProtobuf.RepeatedIntMessage.newBuilder().build();
+    TestProtobuf.RepeatedIntMessage msgNonEmpty = TestProtobuf.RepeatedIntMessage.newBuilder()
+      .addRepeatedInt(1).addRepeatedInt(2)
+      .build();
+
+    Configuration conf = new Configuration();
+    ProtoWriteSupport.setWriteSpecsCompliant(conf, true);
+
+    Path outputPath = new WriteUsingMR(conf).write(msgEmpty, msgNonEmpty);
+    ReadUsingMR readUsingMR = new ReadUsingMR();
+    String customClass = TestProtobuf.RepeatedIntMessage.class.getName();
+    ProtoReadSupport.setProtobufClass(readUsingMR.getConfiguration(), customClass);
+    List<Message> result = readUsingMR.read(outputPath);
+
+    assertEquals(2, result.size());
+    assertEquals(msgEmpty, result.get(0));
+    assertEquals(msgNonEmpty, result.get(1));
+  }
+
+  @Test
+  public void testMapIntMessageClass() throws Exception {
+    TestProtobuf.MapIntMessage msgEmpty = TestProtobuf.MapIntMessage.newBuilder().build();
+    TestProtobuf.MapIntMessage msgNonEmpty = TestProtobuf.MapIntMessage.newBuilder()
+      .putMapInt(1, 123).putMapInt(2, 234)
+      .build();
+
+    Path outputPath = new WriteUsingMR().write(msgEmpty, msgNonEmpty);
+    ReadUsingMR readUsingMR = new ReadUsingMR();
+    String customClass = TestProtobuf.MapIntMessage.class.getName();
+    ProtoReadSupport.setProtobufClass(readUsingMR.getConfiguration(), customClass);
+    List<Message> result = readUsingMR.read(outputPath);
+
+    assertEquals(2, result.size());
+    assertEquals(msgEmpty, result.get(0));
+    assertEquals(msgNonEmpty, result.get(1));
+  }
+
+  @Test
+  public void testMapIntMessageClassSchemaCompliant() throws Exception {
+    TestProtobuf.MapIntMessage msgEmpty = TestProtobuf.MapIntMessage.newBuilder().build();
+    TestProtobuf.MapIntMessage msgNonEmpty = TestProtobuf.MapIntMessage.newBuilder()
+      .putMapInt(1, 123).putMapInt(2, 234)
+      .build();
+
+    Configuration conf = new Configuration();
+    ProtoWriteSupport.setWriteSpecsCompliant(conf, true);
+
+    Path outputPath = new WriteUsingMR(conf).write(msgEmpty, msgNonEmpty);
+    ReadUsingMR readUsingMR = new ReadUsingMR();
+    String customClass = TestProtobuf.MapIntMessage.class.getName();
+    ProtoReadSupport.setProtobufClass(readUsingMR.getConfiguration(), customClass);
+    List<Message> result = readUsingMR.read(outputPath);
+
+    assertEquals(2, result.size());
+    assertEquals(msgEmpty, result.get(0));
+    assertEquals(msgNonEmpty, result.get(1));
+  }
+
+  @Test
+  public void testRepeatedInnerMessageClass() throws Exception {
+    TestProtobuf.RepeatedInnerMessage msgEmpty = TestProtobuf.RepeatedInnerMessage.newBuilder().build();
+    TestProtobuf.RepeatedInnerMessage msgNonEmpty = TestProtobuf.RepeatedInnerMessage.newBuilder()
+      .addRepeatedInnerMessage(TestProtobuf.InnerMessage.newBuilder().setOne("one").build())
+      .addRepeatedInnerMessage(TestProtobuf.InnerMessage.newBuilder().setTwo("two").build())
+      .build();
+
+    Path outputPath = new WriteUsingMR().write(msgEmpty, msgNonEmpty);
+    ReadUsingMR readUsingMR = new ReadUsingMR();
+    String customClass = TestProtobuf.RepeatedInnerMessage.class.getName();
+    ProtoReadSupport.setProtobufClass(readUsingMR.getConfiguration(), customClass);
+    List<Message> result = readUsingMR.read(outputPath);
+
+    assertEquals(2, result.size());
+    assertEquals(msgEmpty, result.get(0));
+    assertEquals(msgNonEmpty, result.get(1));
+  }
+
+  @Test
+  public void testRepeatedInnerMessageClassSchemaCompliant() throws Exception {
+    TestProtobuf.RepeatedInnerMessage msgEmpty = TestProtobuf.RepeatedInnerMessage.newBuilder().build();
+    TestProtobuf.RepeatedInnerMessage msgNonEmpty = TestProtobuf.RepeatedInnerMessage.newBuilder()
+      .addRepeatedInnerMessage(TestProtobuf.InnerMessage.newBuilder().setOne("one").build())
+      .addRepeatedInnerMessage(TestProtobuf.InnerMessage.newBuilder().setTwo("two").build())
+      .build();
+
+    Configuration conf = new Configuration();
+    ProtoWriteSupport.setWriteSpecsCompliant(conf, true);
+
+    Path outputPath = new WriteUsingMR(conf).write(msgEmpty, msgNonEmpty);
+    ReadUsingMR readUsingMR = new ReadUsingMR();
+    String customClass = TestProtobuf.RepeatedInnerMessage.class.getName();
+    ProtoReadSupport.setProtobufClass(readUsingMR.getConfiguration(), customClass);
+    List<Message> result = readUsingMR.read(outputPath);
+
+    assertEquals(2, result.size());
+    assertEquals(msgEmpty, result.get(0));
+    assertEquals(msgNonEmpty, result.get(1));
+  }
+
   /**
    * Runs job that writes input to file and then job reading data back.
    */
diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaConverterTest.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaConverterTest.java
index 6f5ff53..4ca82ac 100644
--- a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaConverterTest.java
+++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaConverterTest.java
@@ -32,14 +32,17 @@ public class ProtoSchemaConverterTest {
   /**
    * Converts given pbClass to parquet schema and compares it with expected parquet schema.
    */
-  private void testConversion(Class<? extends Message> pbClass, String parquetSchemaString) throws
+  private void testConversion(Class<? extends Message> pbClass, String parquetSchemaString, boolean parquetSpecsCompliant) throws
           Exception {
-    ProtoSchemaConverter protoSchemaConverter = new ProtoSchemaConverter();
+    ProtoSchemaConverter protoSchemaConverter = new ProtoSchemaConverter(parquetSpecsCompliant);
     MessageType schema = protoSchemaConverter.convert(pbClass);
     MessageType expectedMT = MessageTypeParser.parseMessageType(parquetSchemaString);
     assertEquals(expectedMT.toString(), schema.toString());
   }
 
+  private void testConversion(Class<? extends Message> pbClass, String parquetSchemaString) throws Exception {
+    testConversion(pbClass, parquetSchemaString, true);
+  }
 
   /**
    * Tests that all protocol buffer datatypes are converted to correct parquet datatypes.
@@ -103,10 +106,12 @@ public class ProtoSchemaConverterTest {
         "  optional binary optionalEnum (ENUM) = 18;" +
         "  optional int32 someInt32 = 19;" +
         "  optional binary someString (UTF8) = 20;" +
-        "  repeated group optionalMap = 21 {\n" +
-        "    optional int64 key = 1;\n" +
-        "    optional group value = 2 {\n" +
-        "      optional int32 someId = 3;\n" +
+        "  optional group optionalMap (MAP) = 21 {\n" +
+        "    repeated group key_value {\n" +
+        "      required int64 key;\n" +
+        "      optional group value {\n" +
+        "        optional int32 someId = 3;\n" +
+        "      }\n" +
         "    }\n" +
         "  }\n" +
         "}";
@@ -120,16 +125,24 @@ public class ProtoSchemaConverterTest {
       "message TestProtobuf.SchemaConverterRepetition {\n" +
         "  optional int32 optionalPrimitive = 1;\n" +
         "  required int32 requiredPrimitive = 2;\n" +
-        "  repeated int32 repeatedPrimitive = 3;\n" +
+        "  optional group repeatedPrimitive (LIST) = 3 {\n" +
+        "    repeated group list {\n" +
+        "      required int32 element;\n" +
+        "    }\n" +
+        "  }\n" +
         "  optional group optionalMessage = 7 {\n" +
         "    optional int32 someId = 3;\n" +
         "  }\n" +
-        "  required group requiredMessage = 8 {" +
+        "  required group requiredMessage = 8 {\n" +
         "    optional int32 someId= 3;\n" +
         "  }\n" +
-        "  repeated group repeatedMessage = 9 {" +
-        "    optional int32 someId = 3;\n" +
-        "  }\n" +
+        "  optional group repeatedMessage (LIST) = 9 {\n" +
+        "    repeated group list {\n" +
+        "      optional group element {\n" +
+        "        optional int32 someId = 3;\n" +
+        "      }\n" +
+        "    }\n" +
+        "  }" +
         "}";
 
     testConversion(TestProtobuf.SchemaConverterRepetition.class, expectedSchema);
@@ -140,15 +153,193 @@ public class ProtoSchemaConverterTest {
     String expectedSchema =
       "message TestProto3.SchemaConverterRepetition {\n" +
         "  optional int32 optionalPrimitive = 1;\n" +
-        "  repeated int32 repeatedPrimitive = 3;\n" +
+        "  optional group repeatedPrimitive (LIST) = 3 {\n" +
+        "    repeated group list {\n" +
+        "      required int32 element;\n" +
+        "    }\n" +
+        "  }\n" +
         "  optional group optionalMessage = 7 {\n" +
         "    optional int32 someId = 3;\n" +
         "  }\n" +
-        "  repeated group repeatedMessage = 9 {" +
-        "    optional int32 someId = 3;\n" +
+        "  optional group repeatedMessage (LIST) = 9 {\n" +
+        "    repeated group list {\n" +
+        "      optional group element {\n" +
+        "        optional int32 someId = 3;\n" +
+        "      }\n" +
+        "    }\n" +
         "  }\n" +
         "}";
 
     testConversion(TestProto3.SchemaConverterRepetition.class, expectedSchema);
   }
+
+  @Test
+  public void testConvertRepeatedIntMessage() throws Exception {
+    String expectedSchema =
+      "message TestProtobuf.RepeatedIntMessage {\n" +
+        "  optional group repeatedInt (LIST) = 1 {\n" +
+        "    repeated group list {\n" +
+        "      required int32 element;\n" +
+        "      }\n" +
+        "    }\n" +
+        "  }\n" +
+        "}";
+
+    testConversion(TestProtobuf.RepeatedIntMessage.class, expectedSchema);
+  }
+
+  @Test
+  public void testConvertRepeatedIntMessageNonSpecsCompliant() throws Exception {
+    String expectedSchema =
+      "message TestProtobuf.RepeatedIntMessage {\n" +
+        "  repeated int32 repeatedInt = 1;\n" +
+        "}";
+
+    testConversion(TestProtobuf.RepeatedIntMessage.class, expectedSchema, false);
+  }
+
+  @Test
+  public void testProto3ConvertRepeatedIntMessage() throws Exception {
+    String expectedSchema =
+      "message TestProto3.RepeatedIntMessage {\n" +
+        "  optional group repeatedInt (LIST) = 1 {\n" +
+        "    repeated group list {\n" +
+        "      required int32 element;\n" +
+        "      }\n" +
+        "    }\n" +
+        "  }\n" +
+        "}";
+
+    testConversion(TestProto3.RepeatedIntMessage.class, expectedSchema);
+  }
+
+  @Test
+  public void testProto3ConvertRepeatedIntMessageNonSpecsCompliant() throws Exception {
+    String expectedSchema =
+      "message TestProto3.RepeatedIntMessage {\n" +
+        "  repeated int32 repeatedInt = 1;\n" +
+        "}";
+
+    testConversion(TestProto3.RepeatedIntMessage.class, expectedSchema, false);
+  }
+
+  @Test
+  public void testConvertRepeatedInnerMessage() throws Exception {
+    String expectedSchema =
+      "message TestProtobuf.RepeatedInnerMessage {\n" +
+        "  optional group repeatedInnerMessage (LIST) = 1 {\n" +
+        "    repeated group list {\n" +
+        "      optional group element {\n" +
+        "        optional binary one (UTF8) = 1;\n" +
+        "        optional binary two (UTF8) = 2;\n" +
+        "        optional binary three (UTF8) = 3;\n" +
+        "      }\n" +
+        "    }\n" +
+        "  }\n" +
+        "}";
+
+    testConversion(TestProtobuf.RepeatedInnerMessage.class, expectedSchema);
+  }
+
+  @Test
+  public void testConvertRepeatedInnerMessageNonSpecsCompliant() throws Exception {
+    String expectedSchema =
+      "message TestProtobuf.RepeatedInnerMessage {\n" +
+        "  repeated group repeatedInnerMessage = 1 {\n" +
+        "    optional binary one (UTF8) = 1;\n" +
+        "    optional binary two (UTF8) = 2;\n" +
+        "    optional binary three (UTF8) = 3;\n" +
+        "  }\n" +
+        "}";
+
+    testConversion(TestProtobuf.RepeatedInnerMessage.class, expectedSchema, false);
+  }
+
+  @Test
+  public void testProto3ConvertRepeatedInnerMessage() throws Exception {
+    String expectedSchema =
+      "message TestProto3.RepeatedInnerMessage {\n" +
+        "  optional group repeatedInnerMessage (LIST) = 1 {\n" +
+        "    repeated group list {\n" +
+        "      optional group element {\n" +
+        "        optional binary one (UTF8) = 1;\n" +
+        "        optional binary two (UTF8) = 2;\n" +
+        "        optional binary three (UTF8) = 3;\n" +
+        "      }\n" +
+        "    }\n" +
+        "  }\n" +
+        "}";
+
+    testConversion(TestProto3.RepeatedInnerMessage.class, expectedSchema);
+  }
+
+  @Test
+  public void testProto3ConvertRepeatedInnerMessageNonSpecsCompliant() throws Exception {
+    String expectedSchema =
+      "message TestProto3.RepeatedInnerMessage {\n" +
+        "  repeated group repeatedInnerMessage = 1 {\n" +
+        "    optional binary one (UTF8) = 1;\n" +
+        "    optional binary two (UTF8) = 2;\n" +
+        "    optional binary three (UTF8) = 3;\n" +
+        "  }\n" +
+        "}";
+
+    testConversion(TestProto3.RepeatedInnerMessage.class, expectedSchema, false);
+  }
+
+  @Test
+  public void testConvertMapIntMessage() throws Exception {
+    String expectedSchema =
+      "message TestProtobuf.MapIntMessage {\n" +
+        "  optional group mapInt (MAP) = 1 {\n" +
+        "    repeated group key_value {\n" +
+        "      required int32 key;\n" +
+        "      optional int32 value;\n" +
+        "    }\n" +
+        "  }\n" +
+        "}";
+
+    testConversion(TestProtobuf.MapIntMessage.class, expectedSchema);
+  }
+
+  @Test
+  public void testConvertMapIntMessageNonSpecsCompliant() throws Exception {
+    String expectedSchema =
+      "message TestProtobuf.MapIntMessage {\n" +
+        "  repeated group mapInt = 1 {\n" +
+        "    optional int32 key = 1;\n" +
+        "    optional int32 value = 2;\n" +
+        "  }\n" +
+        "}";
+
+    testConversion(TestProtobuf.MapIntMessage.class, expectedSchema, false);
+  }
+
+  @Test
+  public void testProto3ConvertMapIntMessage() throws Exception {
+    String expectedSchema =
+      "message TestProto3.MapIntMessage {\n" +
+        "  optional group mapInt (MAP) = 1 {\n" +
+        "    repeated group key_value {\n" +
+        "      required int32 key;\n" +
+        "      optional int32 value;\n" +
+        "    }\n" +
+        "  }\n" +
+        "}";
+
+    testConversion(TestProto3.MapIntMessage.class, expectedSchema);
+  }
+
+  @Test
+  public void testProto3ConvertMapIntMessageNonSpecsCompliant() throws Exception {
+    String expectedSchema =
+      "message TestProto3.MapIntMessage {\n" +
+        "  repeated group mapInt = 1 {\n" +
+        "    optional int32 key = 1;\n" +
+        "    optional int32 value = 2;\n" +
+        "  }\n" +
+        "}";
+
+    testConversion(TestProto3.MapIntMessage.class, expectedSchema, false);
+  }
 }
diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoWriteSupportTest.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoWriteSupportTest.java
index b937618..f71229c 100644
--- a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoWriteSupportTest.java
+++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoWriteSupportTest.java
@@ -31,8 +31,12 @@ import org.apache.parquet.proto.test.TestProtobuf;
 public class ProtoWriteSupportTest {
 
   private <T extends Message> ProtoWriteSupport<T> createReadConsumerInstance(Class<T> cls, RecordConsumer readConsumerMock) {
+    return createReadConsumerInstance(cls, readConsumerMock, new Configuration());
+  }
+
+  private <T extends Message> ProtoWriteSupport<T> createReadConsumerInstance(Class<T> cls, RecordConsumer readConsumerMock, Configuration conf) {
     ProtoWriteSupport support = new ProtoWriteSupport(cls);
-    support.init(new Configuration());
+    support.init(conf);
     support.prepareForWrite(readConsumerMock);
     return support;
   }
@@ -80,6 +84,45 @@ public class ProtoWriteSupportTest {
   }
 
   @Test
+  public void testRepeatedIntMessageSpecsCompliant() throws Exception {
+    RecordConsumer readConsumerMock =  Mockito.mock(RecordConsumer.class);
+    Configuration conf = new Configuration();
+    ProtoWriteSupport.setWriteSpecsCompliant(conf, true);
+    ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.RepeatedIntMessage.class, readConsumerMock, conf);
+
+    TestProtobuf.RepeatedIntMessage.Builder msg = TestProtobuf.RepeatedIntMessage.newBuilder();
+    msg.addRepeatedInt(1323);
+    msg.addRepeatedInt(54469);
+
+    instance.write(msg.build());
+
+    InOrder inOrder = Mockito.inOrder(readConsumerMock);
+
+    inOrder.verify(readConsumerMock).startMessage();
+    inOrder.verify(readConsumerMock).startField("repeatedInt", 0);
+    inOrder.verify(readConsumerMock).startGroup();
+    inOrder.verify(readConsumerMock).startField("list", 0);
+
+    inOrder.verify(readConsumerMock).startGroup();
+    inOrder.verify(readConsumerMock).startField("element", 0);
+    inOrder.verify(readConsumerMock).addInteger(1323);
+    inOrder.verify(readConsumerMock).endField("element", 0);
+    inOrder.verify(readConsumerMock).endGroup();
+
+    inOrder.verify(readConsumerMock).startGroup();
+    inOrder.verify(readConsumerMock).startField("element", 0);
+    inOrder.verify(readConsumerMock).addInteger(54469);
+    inOrder.verify(readConsumerMock).endField("element", 0);
+    inOrder.verify(readConsumerMock).endGroup();
+
+    inOrder.verify(readConsumerMock).endField("list", 0);
+    inOrder.verify(readConsumerMock).endGroup();
+    inOrder.verify(readConsumerMock).endField("repeatedInt", 0);
+    inOrder.verify(readConsumerMock).endMessage();
+    Mockito.verifyNoMoreInteractions(readConsumerMock);
+  }
+
+  @Test
   public void testRepeatedIntMessage() throws Exception {
     RecordConsumer readConsumerMock =  Mockito.mock(RecordConsumer.class);
     ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.RepeatedIntMessage.class, readConsumerMock);
@@ -102,6 +145,79 @@ public class ProtoWriteSupportTest {
   }
 
   @Test
+  public void testRepeatedIntMessageEmptySpecsCompliant() throws Exception {
+    RecordConsumer readConsumerMock =  Mockito.mock(RecordConsumer.class);
+    Configuration conf = new Configuration();
+    ProtoWriteSupport.setWriteSpecsCompliant(conf, true);
+    ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.RepeatedIntMessage.class, readConsumerMock, conf);
+
+    TestProtobuf.RepeatedIntMessage.Builder msg = TestProtobuf.RepeatedIntMessage.newBuilder();
+
+    instance.write(msg.build());
+
+    InOrder inOrder = Mockito.inOrder(readConsumerMock);
+
+    inOrder.verify(readConsumerMock).startMessage();
+    inOrder.verify(readConsumerMock).endMessage();
+    Mockito.verifyNoMoreInteractions(readConsumerMock);
+  }
+
+  @Test
+  public void testRepeatedIntMessageEmpty() throws Exception {
+    RecordConsumer readConsumerMock =  Mockito.mock(RecordConsumer.class);
+    ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.RepeatedIntMessage.class, readConsumerMock);
+
+    TestProtobuf.RepeatedIntMessage.Builder msg = TestProtobuf.RepeatedIntMessage.newBuilder();
+
+    instance.write(msg.build());
+
+    InOrder inOrder = Mockito.inOrder(readConsumerMock);
+
+    inOrder.verify(readConsumerMock).startMessage();
+    inOrder.verify(readConsumerMock).endMessage();
+    Mockito.verifyNoMoreInteractions(readConsumerMock);
+  }
+
+  @Test
+  public void testProto3RepeatedIntMessageSpecsCompliant() throws Exception {
+    RecordConsumer readConsumerMock =  Mockito.mock(RecordConsumer.class);
+    Configuration conf = new Configuration();
+    ProtoWriteSupport.setWriteSpecsCompliant(conf, true);
+    ProtoWriteSupport instance = createReadConsumerInstance(TestProto3.RepeatedIntMessage.class, readConsumerMock, conf);
+
+    TestProto3.RepeatedIntMessage.Builder msg = TestProto3.RepeatedIntMessage.newBuilder();
+    msg.addRepeatedInt(1323);
+    msg.addRepeatedInt(54469);
+
+    instance.write(msg.build());
+
+    InOrder inOrder = Mockito.inOrder(readConsumerMock);
+
+    inOrder.verify(readConsumerMock).startMessage();
+    inOrder.verify(readConsumerMock).startField("repeatedInt", 0);
+    inOrder.verify(readConsumerMock).startGroup();
+    inOrder.verify(readConsumerMock).startField("list", 0);
+
+    inOrder.verify(readConsumerMock).startGroup();
+    inOrder.verify(readConsumerMock).startField("element", 0);
+    inOrder.verify(readConsumerMock).addInteger(1323);
+    inOrder.verify(readConsumerMock).endField("element", 0);
+    inOrder.verify(readConsumerMock).endGroup();
+
+    inOrder.verify(readConsumerMock).startGroup();
+    inOrder.verify(readConsumerMock).startField("element", 0);
+    inOrder.verify(readConsumerMock).addInteger(54469);
+    inOrder.verify(readConsumerMock).endField("element", 0);
+    inOrder.verify(readConsumerMock).endGroup();
+
+    inOrder.verify(readConsumerMock).endField("list", 0);
+    inOrder.verify(readConsumerMock).endGroup();
+    inOrder.verify(readConsumerMock).endField("repeatedInt", 0);
+    inOrder.verify(readConsumerMock).endMessage();
+    Mockito.verifyNoMoreInteractions(readConsumerMock);
+  }
+
+  @Test
   public void testProto3RepeatedIntMessage() throws Exception {
     RecordConsumer readConsumerMock =  Mockito.mock(RecordConsumer.class);
     ProtoWriteSupport instance = createReadConsumerInstance(TestProto3.RepeatedIntMessage.class, readConsumerMock);
@@ -124,6 +240,268 @@ public class ProtoWriteSupportTest {
   }
 
   @Test
+  public void testProto3RepeatedIntMessageEmptySpecsCompliant() throws Exception {
+    RecordConsumer readConsumerMock =  Mockito.mock(RecordConsumer.class);
+    Configuration conf = new Configuration();
+    ProtoWriteSupport.setWriteSpecsCompliant(conf, true);
+    ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.RepeatedIntMessage.class, readConsumerMock, conf);
+
+    TestProtobuf.RepeatedIntMessage.Builder msg = TestProtobuf.RepeatedIntMessage.newBuilder();
+
+    instance.write(msg.build());
+
+    InOrder inOrder = Mockito.inOrder(readConsumerMock);
+
+    inOrder.verify(readConsumerMock).startMessage();
+    inOrder.verify(readConsumerMock).endMessage();
+    Mockito.verifyNoMoreInteractions(readConsumerMock);
+  }
+
+  @Test
+  public void testProto3RepeatedIntMessageEmpty() throws Exception {
+    RecordConsumer readConsumerMock =  Mockito.mock(RecordConsumer.class);
+    ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.RepeatedIntMessage.class, readConsumerMock);
+
+    TestProtobuf.RepeatedIntMessage.Builder msg = TestProtobuf.RepeatedIntMessage.newBuilder();
+
+    instance.write(msg.build());
+
+    InOrder inOrder = Mockito.inOrder(readConsumerMock);
+
+    inOrder.verify(readConsumerMock).startMessage();
+    inOrder.verify(readConsumerMock).endMessage();
+    Mockito.verifyNoMoreInteractions(readConsumerMock);
+  }
+
+  @Test
+  public void testMapIntMessageSpecsCompliant() throws Exception {
+    RecordConsumer readConsumerMock =  Mockito.mock(RecordConsumer.class);
+    Configuration conf = new Configuration();
+    ProtoWriteSupport.setWriteSpecsCompliant(conf, true);
+    ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.MapIntMessage.class, readConsumerMock, conf);
+
+    TestProtobuf.MapIntMessage.Builder msg = TestProtobuf.MapIntMessage.newBuilder();
+    msg.putMapInt(123, 1);
+    msg.putMapInt(234, 2);
+    instance.write(msg.build());
+
+    InOrder inOrder = Mockito.inOrder(readConsumerMock);
+
+    inOrder.verify(readConsumerMock).startMessage();
+    inOrder.verify(readConsumerMock).startField("mapInt", 0);
+    inOrder.verify(readConsumerMock).startGroup();
+    inOrder.verify(readConsumerMock).startField("key_value", 0);
+
+    inOrder.verify(readConsumerMock).startGroup();
+    inOrder.verify(readConsumerMock).startField("key", 0);
+    inOrder.verify(readConsumerMock).addInteger(123);
+    inOrder.verify(readConsumerMock).endField("key", 0);
+    inOrder.verify(readConsumerMock).startField("value", 1);
+    inOrder.verify(readConsumerMock).addInteger(1);
+    inOrder.verify(readConsumerMock).endField("value", 1);
+    inOrder.verify(readConsumerMock).endGroup();
+
+    inOrder.verify(readConsumerMock).startGroup();
+    inOrder.verify(readConsumerMock).startField("key", 0);
+    inOrder.verify(readConsumerMock).addInteger(234);
+    inOrder.verify(readConsumerMock).endField("key", 0);
+    inOrder.verify(readConsumerMock).startField("value", 1);
+    inOrder.verify(readConsumerMock).addInteger(2);
+    inOrder.verify(readConsumerMock).endField("value", 1);
+    inOrder.verify(readConsumerMock).endGroup();
+
+    inOrder.verify(readConsumerMock).endField("key_value", 0);
+    inOrder.verify(readConsumerMock).endGroup();
+    inOrder.verify(readConsumerMock).endField("mapInt", 0);
+    inOrder.verify(readConsumerMock).endMessage();
+    Mockito.verifyNoMoreInteractions(readConsumerMock);
+  }
+
+  @Test
+  public void testMapIntMessage() throws Exception {
+    RecordConsumer readConsumerMock =  Mockito.mock(RecordConsumer.class);
+    ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.MapIntMessage.class, readConsumerMock);
+
+    TestProtobuf.MapIntMessage.Builder msg = TestProtobuf.MapIntMessage.newBuilder();
+    msg.putMapInt(123, 1);
+    msg.putMapInt(234, 2);
+    instance.write(msg.build());
+
+    InOrder inOrder = Mockito.inOrder(readConsumerMock);
+
+    inOrder.verify(readConsumerMock).startMessage();
+    inOrder.verify(readConsumerMock).startField("mapInt", 0);
+
+    inOrder.verify(readConsumerMock).startGroup();
+    inOrder.verify(readConsumerMock).startField("key", 0);
+    inOrder.verify(readConsumerMock).addInteger(123);
+    inOrder.verify(readConsumerMock).endField("key", 0);
+    inOrder.verify(readConsumerMock).startField("value", 1);
+    inOrder.verify(readConsumerMock).addInteger(1);
+    inOrder.verify(readConsumerMock).endField("value", 1);
+    inOrder.verify(readConsumerMock).endGroup();
+
+    inOrder.verify(readConsumerMock).startGroup();
+    inOrder.verify(readConsumerMock).startField("key", 0);
+    inOrder.verify(readConsumerMock).addInteger(234);
+    inOrder.verify(readConsumerMock).endField("key", 0);
+    inOrder.verify(readConsumerMock).startField("value", 1);
+    inOrder.verify(readConsumerMock).addInteger(2);
+    inOrder.verify(readConsumerMock).endField("value", 1);
+    inOrder.verify(readConsumerMock).endGroup();
+
+    inOrder.verify(readConsumerMock).endField("mapInt", 0);
+    inOrder.verify(readConsumerMock).endMessage();
+    Mockito.verifyNoMoreInteractions(readConsumerMock);
+  }
+
+  @Test
+  public void testMapIntMessageEmptySpecsCompliant() throws Exception {
+    RecordConsumer readConsumerMock =  Mockito.mock(RecordConsumer.class);
+    Configuration conf = new Configuration();
+    ProtoWriteSupport.setWriteSpecsCompliant(conf, true);
+    ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.MapIntMessage.class, readConsumerMock, conf);
+
+    TestProtobuf.MapIntMessage.Builder msg = TestProtobuf.MapIntMessage.newBuilder();
+    instance.write(msg.build());
+
+    InOrder inOrder = Mockito.inOrder(readConsumerMock);
+
+    inOrder.verify(readConsumerMock).startMessage();
+    inOrder.verify(readConsumerMock).endMessage();
+    Mockito.verifyNoMoreInteractions(readConsumerMock);
+  }
+
+  @Test
+  public void testMapIntMessageEmpty() throws Exception {
+    RecordConsumer readConsumerMock =  Mockito.mock(RecordConsumer.class);
+    ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.MapIntMessage.class, readConsumerMock);
+
+    TestProtobuf.MapIntMessage.Builder msg = TestProtobuf.MapIntMessage.newBuilder();
+    instance.write(msg.build());
+
+    InOrder inOrder = Mockito.inOrder(readConsumerMock);
+
+    inOrder.verify(readConsumerMock).startMessage();
+    inOrder.verify(readConsumerMock).endMessage();
+    Mockito.verifyNoMoreInteractions(readConsumerMock);
+  }
+
+  @Test
+  public void testProto3MapIntMessageSpecsCompliant() throws Exception {
+    RecordConsumer readConsumerMock =  Mockito.mock(RecordConsumer.class);
+    Configuration conf = new Configuration();
+    ProtoWriteSupport.setWriteSpecsCompliant(conf, true);
+    ProtoWriteSupport instance = createReadConsumerInstance(TestProto3.MapIntMessage.class, readConsumerMock, conf);
+
+    TestProto3.MapIntMessage.Builder msg = TestProto3.MapIntMessage.newBuilder();
+    msg.putMapInt(123, 1);
+    msg.putMapInt(234, 2);
+    instance.write(msg.build());
+
+    InOrder inOrder = Mockito.inOrder(readConsumerMock);
+
+    inOrder.verify(readConsumerMock).startMessage();
+    inOrder.verify(readConsumerMock).startField("mapInt", 0);
+    inOrder.verify(readConsumerMock).startGroup();
+    inOrder.verify(readConsumerMock).startField("key_value", 0);
+
+    inOrder.verify(readConsumerMock).startGroup();
+    inOrder.verify(readConsumerMock).startField("key", 0);
+    inOrder.verify(readConsumerMock).addInteger(123);
+    inOrder.verify(readConsumerMock).endField("key", 0);
+    inOrder.verify(readConsumerMock).startField("value", 1);
+    inOrder.verify(readConsumerMock).addInteger(1);
+    inOrder.verify(readConsumerMock).endField("value", 1);
+    inOrder.verify(readConsumerMock).endGroup();
+
+    inOrder.verify(readConsumerMock).startGroup();
+    inOrder.verify(readConsumerMock).startField("key", 0);
+    inOrder.verify(readConsumerMock).addInteger(234);
+    inOrder.verify(readConsumerMock).endField("key", 0);
+    inOrder.verify(readConsumerMock).startField("value", 1);
+    inOrder.verify(readConsumerMock).addInteger(2);
+    inOrder.verify(readConsumerMock).endField("value", 1);
+    inOrder.verify(readConsumerMock).endGroup();
+
+    inOrder.verify(readConsumerMock).endField("key_value", 0);
+    inOrder.verify(readConsumerMock).endGroup();
+    inOrder.verify(readConsumerMock).endField("mapInt", 0);
+    inOrder.verify(readConsumerMock).endMessage();
+    Mockito.verifyNoMoreInteractions(readConsumerMock);
+  }
+
+  @Test
+  public void testProto3MapIntMessage() throws Exception {
+    RecordConsumer readConsumerMock =  Mockito.mock(RecordConsumer.class);
+    ProtoWriteSupport instance = createReadConsumerInstance(TestProto3.MapIntMessage.class, readConsumerMock);
+
+    TestProto3.MapIntMessage.Builder msg = TestProto3.MapIntMessage.newBuilder();
+    msg.putMapInt(123, 1);
+    msg.putMapInt(234, 2);
+    instance.write(msg.build());
+
+    InOrder inOrder = Mockito.inOrder(readConsumerMock);
+
+    inOrder.verify(readConsumerMock).startMessage();
+    inOrder.verify(readConsumerMock).startField("mapInt", 0);
+
+    inOrder.verify(readConsumerMock).startGroup();
+    inOrder.verify(readConsumerMock).startField("key", 0);
+    inOrder.verify(readConsumerMock).addInteger(123);
+    inOrder.verify(readConsumerMock).endField("key", 0);
+    inOrder.verify(readConsumerMock).startField("value", 1);
+    inOrder.verify(readConsumerMock).addInteger(1);
+    inOrder.verify(readConsumerMock).endField("value", 1);
+    inOrder.verify(readConsumerMock).endGroup();
+
+    inOrder.verify(readConsumerMock).startGroup();
+    inOrder.verify(readConsumerMock).startField("key", 0);
+    inOrder.verify(readConsumerMock).addInteger(234);
+    inOrder.verify(readConsumerMock).endField("key", 0);
+    inOrder.verify(readConsumerMock).startField("value", 1);
+    inOrder.verify(readConsumerMock).addInteger(2);
+    inOrder.verify(readConsumerMock).endField("value", 1);
+    inOrder.verify(readConsumerMock).endGroup();
+
+    inOrder.verify(readConsumerMock).endField("mapInt", 0);
+    inOrder.verify(readConsumerMock).endMessage();
+    Mockito.verifyNoMoreInteractions(readConsumerMock);
+  }
+
+  @Test
+  public void testProto3MapIntMessageEmptySpecsCompliant() throws Exception {
+    RecordConsumer readConsumerMock =  Mockito.mock(RecordConsumer.class);
+    Configuration conf = new Configuration();
+    ProtoWriteSupport.setWriteSpecsCompliant(conf, true);
+    ProtoWriteSupport instance = createReadConsumerInstance(TestProto3.MapIntMessage.class, readConsumerMock, conf);
+
+    TestProto3.MapIntMessage.Builder msg = TestProto3.MapIntMessage.newBuilder();
+    instance.write(msg.build());
+
+    InOrder inOrder = Mockito.inOrder(readConsumerMock);
+
+    inOrder.verify(readConsumerMock).startMessage();
+    inOrder.verify(readConsumerMock).endMessage();
+    Mockito.verifyNoMoreInteractions(readConsumerMock);
+  }
+
+  @Test
+  public void testProto3MapIntMessageEmpty() throws Exception {
+    RecordConsumer readConsumerMock =  Mockito.mock(RecordConsumer.class);
+    ProtoWriteSupport instance = createReadConsumerInstance(TestProto3.MapIntMessage.class, readConsumerMock);
+
+    TestProto3.MapIntMessage.Builder msg = TestProto3.MapIntMessage.newBuilder();
+    instance.write(msg.build());
+
+    InOrder inOrder = Mockito.inOrder(readConsumerMock);
+
+    inOrder.verify(readConsumerMock).startMessage();
+    inOrder.verify(readConsumerMock).endMessage();
+    Mockito.verifyNoMoreInteractions(readConsumerMock);
+  }
+
+  @Test
   public void testRepeatedInnerMessageMessage_message() throws Exception {
     RecordConsumer readConsumerMock =  Mockito.mock(RecordConsumer.class);
     ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.TopMessage.class, readConsumerMock);
@@ -137,6 +515,7 @@ public class ProtoWriteSupportTest {
 
     inOrder.verify(readConsumerMock).startMessage();
     inOrder.verify(readConsumerMock).startField("inner", 0);
+
     inOrder.verify(readConsumerMock).startGroup();
     inOrder.verify(readConsumerMock).startField("one", 0);
     inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("one".getBytes()));
@@ -145,14 +524,54 @@ public class ProtoWriteSupportTest {
     inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("two".getBytes()));
     inOrder.verify(readConsumerMock).endField("two", 1);
     inOrder.verify(readConsumerMock).endGroup();
+
     inOrder.verify(readConsumerMock).endField("inner", 0);
     inOrder.verify(readConsumerMock).endMessage();
     Mockito.verifyNoMoreInteractions(readConsumerMock);
   }
 
   @Test
-  public void testProto3RepeatedInnerMessageMessage_message() throws Exception {
+  public void testRepeatedInnerMessageSpecsCompliantMessage_message() throws Exception {
     RecordConsumer readConsumerMock =  Mockito.mock(RecordConsumer.class);
+    Configuration conf = new Configuration();
+    ProtoWriteSupport.setWriteSpecsCompliant(conf, true);
+    ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.TopMessage.class, readConsumerMock, conf);
+
+    TestProtobuf.TopMessage.Builder msg = TestProtobuf.TopMessage.newBuilder();
+    msg.addInnerBuilder().setOne("one").setTwo("two");
+
+    instance.write(msg.build());
+
+    InOrder inOrder = Mockito.inOrder(readConsumerMock);
+
+    inOrder.verify(readConsumerMock).startMessage();
+    inOrder.verify(readConsumerMock).startField("inner", 0);
+    inOrder.verify(readConsumerMock).startGroup();
+    inOrder.verify(readConsumerMock).startField("list", 0);
+
+    inOrder.verify(readConsumerMock).startGroup();
+    inOrder.verify(readConsumerMock).startField("element", 0);
+    inOrder.verify(readConsumerMock).startGroup();
+    inOrder.verify(readConsumerMock).startField("one", 0);
+    inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("one".getBytes()));
+    inOrder.verify(readConsumerMock).endField("one", 0);
+    inOrder.verify(readConsumerMock).startField("two", 1);
+    inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("two".getBytes()));
+    inOrder.verify(readConsumerMock).endField("two", 1);
+    inOrder.verify(readConsumerMock).endGroup();
+    inOrder.verify(readConsumerMock).endField("element", 0);
+    inOrder.verify(readConsumerMock).endGroup();
+
+    inOrder.verify(readConsumerMock).endField("list", 0);
+    inOrder.verify(readConsumerMock).endGroup();
+    inOrder.verify(readConsumerMock).endField("inner", 0);
+    inOrder.verify(readConsumerMock).endMessage();
+    Mockito.verifyNoMoreInteractions(readConsumerMock);
+  }
+
+  @Test
+  public void testProto3RepeatedInnerMessageMessage_message() throws Exception {
+    RecordConsumer readConsumerMock =  Mockito.mock(RecordConsumer.class);;
     ProtoWriteSupport instance = createReadConsumerInstance(TestProto3.TopMessage.class, readConsumerMock);
 
     TestProto3.TopMessage.Builder msg = TestProto3.TopMessage.newBuilder();
@@ -164,6 +583,7 @@ public class ProtoWriteSupportTest {
 
     inOrder.verify(readConsumerMock).startMessage();
     inOrder.verify(readConsumerMock).startField("inner", 0);
+
     inOrder.verify(readConsumerMock).startGroup();
     inOrder.verify(readConsumerMock).startField("one", 0);
     inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("one".getBytes()));
@@ -172,6 +592,96 @@ public class ProtoWriteSupportTest {
     inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("two".getBytes()));
     inOrder.verify(readConsumerMock).endField("two", 1);
     inOrder.verify(readConsumerMock).endGroup();
+
+    inOrder.verify(readConsumerMock).endField("inner", 0);
+    inOrder.verify(readConsumerMock).endMessage();
+    Mockito.verifyNoMoreInteractions(readConsumerMock);
+  }
+
+  @Test
+  public void testProto3RepeatedInnerMessageSpecsCompliantMessage_message() throws Exception {
+    RecordConsumer readConsumerMock =  Mockito.mock(RecordConsumer.class);
+    Configuration conf = new Configuration();
+    ProtoWriteSupport.setWriteSpecsCompliant(conf, true);
+    ProtoWriteSupport instance = createReadConsumerInstance(TestProto3.TopMessage.class, readConsumerMock, conf);
+
+    TestProto3.TopMessage.Builder msg = TestProto3.TopMessage.newBuilder();
+    msg.addInnerBuilder().setOne("one").setTwo("two");
+
+    instance.write(msg.build());
+
+    InOrder inOrder = Mockito.inOrder(readConsumerMock);
+
+    inOrder.verify(readConsumerMock).startMessage();
+    inOrder.verify(readConsumerMock).startField("inner", 0);
+    inOrder.verify(readConsumerMock).startGroup();
+    inOrder.verify(readConsumerMock).startField("list", 0);
+    inOrder.verify(readConsumerMock).startGroup();
+
+    inOrder.verify(readConsumerMock).startField("element", 0);
+    inOrder.verify(readConsumerMock).startGroup();
+    inOrder.verify(readConsumerMock).startField("one", 0);
+    inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("one".getBytes()));
+    inOrder.verify(readConsumerMock).endField("one", 0);
+    inOrder.verify(readConsumerMock).startField("two", 1);
+    inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("two".getBytes()));
+    inOrder.verify(readConsumerMock).endField("two", 1);
+    inOrder.verify(readConsumerMock).endGroup();
+    inOrder.verify(readConsumerMock).endField("element", 0);
+
+    inOrder.verify(readConsumerMock).endGroup();
+    inOrder.verify(readConsumerMock).endField("list", 0);
+    inOrder.verify(readConsumerMock).endGroup();
+    inOrder.verify(readConsumerMock).endField("inner", 0);
+    inOrder.verify(readConsumerMock).endMessage();
+    Mockito.verifyNoMoreInteractions(readConsumerMock);
+  }
+
+
+  @Test
+  public void testRepeatedInnerMessageSpecsCompliantMessage_scalar() throws Exception {
+    RecordConsumer readConsumerMock =  Mockito.mock(RecordConsumer.class);
+    Configuration conf = new Configuration();
+    ProtoWriteSupport.setWriteSpecsCompliant(conf, true);
+    ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.TopMessage.class, readConsumerMock, conf);
+
+    TestProtobuf.TopMessage.Builder msg = TestProtobuf.TopMessage.newBuilder();
+    msg.addInnerBuilder().setOne("one");
+    msg.addInnerBuilder().setTwo("two");
+
+    instance.write(msg.build());
+
+    InOrder inOrder = Mockito.inOrder(readConsumerMock);
+
+    inOrder.verify(readConsumerMock).startMessage();
+    inOrder.verify(readConsumerMock).startField("inner", 0);
+    inOrder.verify(readConsumerMock).startGroup();
+    inOrder.verify(readConsumerMock).startField("list", 0);
+
+    //first inner message
+    inOrder.verify(readConsumerMock).startGroup();
+    inOrder.verify(readConsumerMock).startField("element", 0);
+    inOrder.verify(readConsumerMock).startGroup();
+    inOrder.verify(readConsumerMock).startField("one", 0);
+    inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("one".getBytes()));
+    inOrder.verify(readConsumerMock).endField("one", 0);
+    inOrder.verify(readConsumerMock).endGroup();
+    inOrder.verify(readConsumerMock).endField("element", 0);
+    inOrder.verify(readConsumerMock).endGroup();
+
+    //second inner message
+    inOrder.verify(readConsumerMock).startGroup();
+    inOrder.verify(readConsumerMock).startField("element", 0);
+    inOrder.verify(readConsumerMock).startGroup();
+    inOrder.verify(readConsumerMock).startField("two", 1);
+    inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("two".getBytes()));
+    inOrder.verify(readConsumerMock).endField("two", 1);
+    inOrder.verify(readConsumerMock).endGroup();
+    inOrder.verify(readConsumerMock).endField("element", 0);
+    inOrder.verify(readConsumerMock).endGroup();
+
+    inOrder.verify(readConsumerMock).endField("list", 0);
+    inOrder.verify(readConsumerMock).endGroup();
     inOrder.verify(readConsumerMock).endField("inner", 0);
     inOrder.verify(readConsumerMock).endMessage();
     Mockito.verifyNoMoreInteractions(readConsumerMock);
@@ -192,6 +702,7 @@ public class ProtoWriteSupportTest {
 
     inOrder.verify(readConsumerMock).startMessage();
     inOrder.verify(readConsumerMock).startField("inner", 0);
+
     //first inner message
     inOrder.verify(readConsumerMock).startGroup();
     inOrder.verify(readConsumerMock).startField("one", 0);
@@ -226,6 +737,7 @@ public class ProtoWriteSupportTest {
 
     inOrder.verify(readConsumerMock).startMessage();
     inOrder.verify(readConsumerMock).startField("inner", 0);
+
     //first inner message
     inOrder.verify(readConsumerMock).startGroup();
     inOrder.verify(readConsumerMock).startField("one", 0);
@@ -246,6 +758,55 @@ public class ProtoWriteSupportTest {
   }
 
   @Test
+  public void testProto3RepeatedInnerMessageSpecsCompliantMessage_scalar() throws Exception {
+    RecordConsumer readConsumerMock =  Mockito.mock(RecordConsumer.class);
+    Configuration conf = new Configuration();
+    ProtoWriteSupport.setWriteSpecsCompliant(conf, true);
+    ProtoWriteSupport instance = createReadConsumerInstance(TestProto3.TopMessage.class, readConsumerMock, conf);
+
+    TestProto3.TopMessage.Builder msg = TestProto3.TopMessage.newBuilder();
+    msg.addInnerBuilder().setOne("one");
+    msg.addInnerBuilder().setTwo("two");
+
+    instance.write(msg.build());
+
+    InOrder inOrder = Mockito.inOrder(readConsumerMock);
+
+    inOrder.verify(readConsumerMock).startMessage();
+    inOrder.verify(readConsumerMock).startField("inner", 0);
+    inOrder.verify(readConsumerMock).startGroup();
+    inOrder.verify(readConsumerMock).startField("list", 0);
+
+    //first inner message
+    inOrder.verify(readConsumerMock).startGroup();
+    inOrder.verify(readConsumerMock).startField("element", 0);
+    inOrder.verify(readConsumerMock).startGroup();
+    inOrder.verify(readConsumerMock).startField("one", 0);
+    inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("one".getBytes()));
+    inOrder.verify(readConsumerMock).endField("one", 0);
+    inOrder.verify(readConsumerMock).endGroup();
+    inOrder.verify(readConsumerMock).endField("element", 0);
+    inOrder.verify(readConsumerMock).endGroup();
+
+    //second inner message
+    inOrder.verify(readConsumerMock).startGroup();
+    inOrder.verify(readConsumerMock).startField("element", 0);
+    inOrder.verify(readConsumerMock).startGroup();
+    inOrder.verify(readConsumerMock).startField("two", 1);
+    inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("two".getBytes()));
+    inOrder.verify(readConsumerMock).endField("two", 1);
+    inOrder.verify(readConsumerMock).endGroup();
+    inOrder.verify(readConsumerMock).endField("element", 0);
+    inOrder.verify(readConsumerMock).endGroup();
+
+    inOrder.verify(readConsumerMock).endField("list", 0);
+    inOrder.verify(readConsumerMock).endGroup();
+    inOrder.verify(readConsumerMock).endField("inner", 0);
+    inOrder.verify(readConsumerMock).endMessage();
+    Mockito.verifyNoMoreInteractions(readConsumerMock);
+  }
+
+  @Test
   public void testOptionalInnerMessage() throws Exception {
     RecordConsumer readConsumerMock =  Mockito.mock(RecordConsumer.class);
     ProtoWriteSupport instance = createReadConsumerInstance(TestProtobuf.MessageA.class, readConsumerMock);
diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/utils/WriteUsingMR.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/utils/WriteUsingMR.java
index d18076a..55f9237 100644
--- a/parquet-protobuf/src/test/java/org/apache/parquet/proto/utils/WriteUsingMR.java
+++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/utils/WriteUsingMR.java
@@ -46,10 +46,18 @@ import static java.lang.Thread.sleep;
 public class WriteUsingMR {
 
   private static final Logger LOG = LoggerFactory.getLogger(WriteUsingMR.class);
-  Configuration conf = new Configuration();
+  private final Configuration conf;
   private static List<Message> inputMessages;
   Path outputPath;
 
+  public WriteUsingMR() {
+    this(new Configuration());
+  }
+
+  public WriteUsingMR(Configuration conf) {
+    this.conf = new Configuration();
+  }
+
   public Configuration getConfiguration() {
     return conf;
   }
diff --git a/parquet-protobuf/src/test/resources/TestProto3.proto b/parquet-protobuf/src/test/resources/TestProto3.proto
index 1896445..e49eef5 100644
--- a/parquet-protobuf/src/test/resources/TestProto3.proto
+++ b/parquet-protobuf/src/test/resources/TestProto3.proto
@@ -124,6 +124,14 @@ message RepeatedIntMessage {
     repeated int32 repeatedInt = 1;
 }
 
+message RepeatedInnerMessage {
+    repeated InnerMessage repeatedInnerMessage = 1;
+}
+
+message MapIntMessage {
+    map<int32, int32> mapInt = 1;
+}
+
 message HighIndexMessage {
     repeated int32 repeatedInt = 50000;
 }
diff --git a/parquet-protobuf/src/test/resources/TestProtobuf.proto b/parquet-protobuf/src/test/resources/TestProtobuf.proto
index d7cdf03..d4ab4c7 100644
--- a/parquet-protobuf/src/test/resources/TestProtobuf.proto
+++ b/parquet-protobuf/src/test/resources/TestProtobuf.proto
@@ -122,6 +122,14 @@ message RepeatedIntMessage {
     repeated int32 repeatedInt = 1;
 }
 
+message RepeatedInnerMessage {
+    repeated InnerMessage repeatedInnerMessage = 1;
+}
+
+message MapIntMessage {
+    map<int32, int32> mapInt = 1;
+}
+
 message HighIndexMessage {
     repeated int32 repeatedInt = 50000;
 }

-- 
To stop receiving notification emails like this one, please contact
julien@apache.org.