You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2020/10/05 09:40:00 UTC

[parquet-mr] branch master updated: PARQUET-313: Implement 3 level list writing rule for Parquet-Thrift (#222)

This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a4e3ee  PARQUET-313: Implement 3 level list writing rule for Parquet-Thrift (#222)
0a4e3ee is described below

commit 0a4e3eea991f7588c9c5e056e9d7b32a76eed5da
Author: Ashish Singh <as...@pinterest.com>
AuthorDate: Mon Oct 5 02:39:52 2020 -0700

    PARQUET-313: Implement 3 level list writing rule for Parquet-Thrift (#222)
---
 .../apache/parquet/scrooge/ScroogeReadSupport.java |  16 +-
 parquet-thrift/README.md                           |  33 ++++
 .../hadoop/thrift/AbstractThriftWriteSupport.java  |   9 +-
 .../thrift/ParquetThriftBytesOutputFormat.java     |  41 ++++-
 .../parquet/hadoop/thrift/TBaseWriteSupport.java   |   6 +-
 .../hadoop/thrift/ThriftBytesWriteSupport.java     |  37 ++++-
 .../parquet/hadoop/thrift/ThriftReadSupport.java   |  22 ++-
 .../hadoop/thrift/ThriftToParquetFileWriter.java   |   4 +-
 .../parquet/thrift/ParquetWriteProtocol.java       | 137 +++++++++++++---
 .../parquet/thrift/ThriftSchemaConvertVisitor.java |  44 +++++-
 .../parquet/thrift/ThriftSchemaConverter.java      |  25 ++-
 .../thrift/TestThriftToParquetFileWriter.java      |  83 ++++++++--
 .../parquet/thrift/TestParquetWriteProtocol.java   | 176 ++++++++++++++++++++-
 .../thrift/TestThriftParquetReaderWriter.java      |  16 +-
 14 files changed, 567 insertions(+), 82 deletions(-)

diff --git a/parquet-scrooge/src/main/java/org/apache/parquet/scrooge/ScroogeReadSupport.java b/parquet-scrooge/src/main/java/org/apache/parquet/scrooge/ScroogeReadSupport.java
index 63b9897..286ca0a 100644
--- a/parquet-scrooge/src/main/java/org/apache/parquet/scrooge/ScroogeReadSupport.java
+++ b/parquet-scrooge/src/main/java/org/apache/parquet/scrooge/ScroogeReadSupport.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,6 +18,7 @@
  */
 package org.apache.parquet.scrooge;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.hadoop.thrift.ThriftReadSupport;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.thrift.ThriftSchemaConverter;
@@ -37,8 +38,13 @@ public class ScroogeReadSupport extends ThriftReadSupport{
   }
 
   @Override
-  protected MessageType getProjectedSchema(FieldProjectionFilter fieldProjectionFilter) {
+  protected MessageType getProjectedSchema(Configuration configuration, FieldProjectionFilter fieldProjectionFilter) {
     ThriftType.StructType thriftStruct = new ScroogeStructConverter().convert(thriftClass);
-    return new ThriftSchemaConverter(fieldProjectionFilter).convert(thriftStruct);
+    return new ThriftSchemaConverter(configuration, fieldProjectionFilter).convert(thriftStruct);
+  }
+
+  @Deprecated
+  protected MessageType getProjectedSchema(FieldProjectionFilter fieldProjectionFilter) {
+    return getProjectedSchema(new Configuration(), fieldProjectionFilter);
   }
 }
diff --git a/parquet-thrift/README.md b/parquet-thrift/README.md
new file mode 100644
index 0000000..87713c8
--- /dev/null
+++ b/parquet-thrift/README.md
@@ -0,0 +1,33 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+# Apache Thrift Integration
+
+**Todo:** Add a description and examples on how to use Parquet-Thrift integration.
+
+## Available options via Hadoop Configuration
+
+### Configuration for reading
+**Todo:** Add read configs
+
+### Configuration for writing
+**Todo:** Add all write configs
+| Name                                    | Type      | Description                                                          |
+|-----------------------------------------|-----------|----------------------------------------------------------------------|
+| `parquet.thrift.write-three-level-lists`| `boolean` | Write lists with 3-level structure to allow list and list elements to be nullable. When set to `true`, lists will be written as per https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists|
diff --git a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/AbstractThriftWriteSupport.java b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/AbstractThriftWriteSupport.java
index d0f9d01..0c3fe44 100644
--- a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/AbstractThriftWriteSupport.java
+++ b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/AbstractThriftWriteSupport.java
@@ -40,8 +40,10 @@ import org.slf4j.LoggerFactory;
 public abstract class AbstractThriftWriteSupport<T> extends WriteSupport<T> {
   public static final String PARQUET_THRIFT_CLASS = "parquet.thrift.class";
   private static final Logger LOG = LoggerFactory.getLogger(AbstractThriftWriteSupport.class);
+  private static Configuration conf;
 
   public static void setGenericThriftClass(Configuration configuration, Class<?> thriftClass) {
+    conf = configuration;
     configuration.set(PARQUET_THRIFT_CLASS, thriftClass.getName());
   }
 
@@ -84,7 +86,8 @@ public abstract class AbstractThriftWriteSupport<T> extends WriteSupport<T> {
     this.thriftClass = thriftClass;
     this.thriftStruct = getThriftStruct();
 
-    this.schema = ThriftSchemaConverter.convertWithoutProjection(thriftStruct);
+    ThriftSchemaConverter thriftSchemaConverter = new ThriftSchemaConverter(conf);
+    this.schema = thriftSchemaConverter.convert(thriftStruct);
 
     final Map<String, String> extraMetaData = new ThriftMetaData(thriftClass.getName(), thriftStruct).toExtraMetaData();
     // adding the Pig schema as it would have been mapped from thrift
@@ -108,6 +111,7 @@ public abstract class AbstractThriftWriteSupport<T> extends WriteSupport<T> {
 
   @Override
   public WriteContext init(Configuration configuration) {
+    conf = configuration;
     if (writeContext == null) {
       init(getGenericThriftClass(configuration));
     }
@@ -117,7 +121,8 @@ public abstract class AbstractThriftWriteSupport<T> extends WriteSupport<T> {
   @Override
   public void prepareForWrite(RecordConsumer recordConsumer) {
     final MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
-    this.parquetWriteProtocol = new ParquetWriteProtocol(recordConsumer, columnIO, thriftStruct);
+    this.parquetWriteProtocol = new ParquetWriteProtocol(
+        conf, recordConsumer, columnIO, thriftStruct);
   }
 
   protected abstract StructType getThriftStruct();
diff --git a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ParquetThriftBytesOutputFormat.java b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ParquetThriftBytesOutputFormat.java
index dfdb140..9caca5a 100644
--- a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ParquetThriftBytesOutputFormat.java
+++ b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ParquetThriftBytesOutputFormat.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,6 +18,7 @@
  */
 package org.apache.parquet.hadoop.thrift;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.thrift.TBase;
@@ -55,15 +56,41 @@ public class ParquetThriftBytesOutputFormat extends ParquetOutputFormat<BytesWri
   /**
    *  The buffered implementation will buffer each record and deal with invalid records (more expansive).
    *  when catching an exception the record can be discarded.
-   *  The non-buffered implementation will stream field by field. Exceptions are unrecoverable and the file must be closed when an invalid record is written.
+   *  The non-buffered implementation will stream field by field. Exceptions are unrecoverable and the file
+   *  must be closed when an invalid record is written.
    *
+   * @param configuration configuration
    * @param protocolFactory the protocol factory to use to read the bytes
-   * @param thriftClass thriftClass the class to exctract the schema from
+   * @param thriftClass thriftClass the class to extract the schema from
    * @param buffered whether we should buffer each record
    * @param errorHandler handle record corruption and schema incompatible exception
    */
-  public ParquetThriftBytesOutputFormat(TProtocolFactory protocolFactory, Class<? extends TBase<?, ?>> thriftClass, boolean buffered, FieldIgnoredHandler errorHandler) {
-    super(new ThriftBytesWriteSupport(protocolFactory, thriftClass, buffered, errorHandler));
+  public ParquetThriftBytesOutputFormat(Configuration configuration,
+                                        TProtocolFactory protocolFactory,
+                                        Class<? extends TBase<?, ?>> thriftClass,
+                                        boolean buffered,
+                                        FieldIgnoredHandler errorHandler) {
+    super(new ThriftBytesWriteSupport(
+        configuration, protocolFactory, thriftClass, buffered, errorHandler));
+  }
+
+  /**
+   * @deprecated Use @link{ParquetThriftBytesOutputFormat(
+   * Configuration configuration, TProtocolFactory protocolFactory,
+   * {@literal Class<\? extends TBase<\?, ?>>} thriftClass, boolean buffered,
+   * FieldIgnoredHandler errorHandler)} instead.
+   *
+   * @param protocolFactory the protocol factory to use to read the bytes
+   * @param thriftClass thriftClass the class to extract the schema from
+   * @param buffered whether we should buffer each record
+   * @param errorHandler handle record corruption and schema incompatible exception
+   */
+  @Deprecated
+  public ParquetThriftBytesOutputFormat(TProtocolFactory protocolFactory,
+                                        Class<? extends TBase<?, ?>> thriftClass,
+                                        boolean buffered,
+                                        FieldIgnoredHandler errorHandler) {
+    this(new Configuration(), protocolFactory, thriftClass, buffered, errorHandler);
   }
 
 }
diff --git a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/TBaseWriteSupport.java b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/TBaseWriteSupport.java
index 56bf299..c1ece9f 100644
--- a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/TBaseWriteSupport.java
+++ b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/TBaseWriteSupport.java
@@ -25,7 +25,10 @@ import org.apache.parquet.thrift.struct.ThriftType.StructType;
 
 public class TBaseWriteSupport<T extends TBase<?, ?>> extends AbstractThriftWriteSupport<T> {
 
+  private static Configuration conf;
+
   public static <U extends TBase<?,?>> void setThriftClass(Configuration configuration, Class<U> thriftClass) {
+    conf = configuration;
     AbstractThriftWriteSupport.setGenericThriftClass(configuration, thriftClass);
   }
 
@@ -52,7 +55,8 @@ public class TBaseWriteSupport<T extends TBase<?, ?>> extends AbstractThriftWrit
 
   @Override
   protected StructType getThriftStruct() {
-    return ThriftSchemaConverter.toStructType(thriftClass);
+    ThriftSchemaConverter thriftSchemaConverter = new ThriftSchemaConverter(conf);
+    return thriftSchemaConverter.toStructType((Class<TBase<?, ?>>)thriftClass);
   }
 
   @Override
diff --git a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftBytesWriteSupport.java b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftBytesWriteSupport.java
index 78fe2d7..bdb6578 100644
--- a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftBytesWriteSupport.java
+++ b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftBytesWriteSupport.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -79,14 +79,34 @@ public class ThriftBytesWriteSupport extends WriteSupport<BytesWritable> {
   private StructType thriftStruct;
   private ParquetWriteProtocol parquetWriteProtocol;
   private final FieldIgnoredHandler errorHandler;
+  private Configuration configuration;
 
   public ThriftBytesWriteSupport() {
     this.buffered = true;
     this.errorHandler = null;
   }
 
-  public ThriftBytesWriteSupport(TProtocolFactory protocolFactory, Class<? extends TBase<?, ?>> thriftClass, boolean buffered, FieldIgnoredHandler errorHandler) {
+  /**
+   * @deprecated Use @link{ThriftBytesWriteSupport(Configuration configuration,
+   * TProtocolFactory protocolFactory, {@literal Class<? extends TBase<?,?>>} thriftClass,
+   * boolean buffered, FieldIgnoredHandler errorHandler)} instead
+   */
+  @Deprecated
+  public ThriftBytesWriteSupport(TProtocolFactory protocolFactory,
+                                 Class<? extends TBase<?, ?>> thriftClass,
+                                 boolean buffered,
+                                 FieldIgnoredHandler errorHandler) {
+    this(new Configuration(), protocolFactory, thriftClass, buffered, errorHandler);
+  }
+
+  public ThriftBytesWriteSupport(
+      Configuration configuration,
+      TProtocolFactory protocolFactory,
+      Class<? extends TBase<?, ?>> thriftClass,
+      boolean buffered,
+      FieldIgnoredHandler errorHandler) {
     super();
+    this.configuration = configuration;
     this.protocolFactory = protocolFactory;
     this.thriftClass = thriftClass;
     this.buffered = buffered;
@@ -103,6 +123,7 @@ public class ThriftBytesWriteSupport extends WriteSupport<BytesWritable> {
 
   @Override
   public WriteContext init(Configuration configuration) {
+    this.configuration = configuration;
     if (this.protocolFactory == null) {
       try {
         this.protocolFactory = getTProtocolFactoryClass(configuration).newInstance();
@@ -115,8 +136,9 @@ public class ThriftBytesWriteSupport extends WriteSupport<BytesWritable> {
     } else {
       thriftClass = TBaseWriteSupport.getThriftClass(configuration);
     }
-    this.thriftStruct = ThriftSchemaConverter.toStructType(thriftClass);
-    this.schema = ThriftSchemaConverter.convertWithoutProjection(thriftStruct);
+    ThriftSchemaConverter thriftSchemaConverter = new ThriftSchemaConverter(this.configuration);
+    thriftStruct = thriftSchemaConverter.toStructType(thriftClass);
+    schema = thriftSchemaConverter.convert(thriftStruct);
     if (buffered) {
       readToWrite = new BufferedProtocolReadToWrite(thriftStruct, errorHandler);
     } else {
@@ -156,7 +178,8 @@ public class ThriftBytesWriteSupport extends WriteSupport<BytesWritable> {
   @Override
   public void prepareForWrite(RecordConsumer recordConsumer) {
     final MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
-    this.parquetWriteProtocol = new ParquetWriteProtocol(recordConsumer, columnIO, thriftStruct);
+    parquetWriteProtocol = new ParquetWriteProtocol(
+        configuration, recordConsumer, columnIO, thriftStruct);
     thriftWriteSupport.prepareForWrite(recordConsumer);
   }
 
diff --git a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java
index 9b6881e..6bad970 100644
--- a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java
+++ b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -178,7 +178,7 @@ public class ThriftReadSupport<T> extends ReadSupport<T> {
     } else if (projectionFilter != null) {
       try {
         initThriftClassFromMultipleFiles(context.getKeyValueMetadata(), configuration);
-        requestedProjection =  getProjectedSchema(projectionFilter);
+        requestedProjection =  getProjectedSchema(configuration, projectionFilter);
       } catch (ClassNotFoundException e) {
         throw new ThriftProjectionException("can not find thriftClass from configuration", e);
       }
@@ -189,8 +189,18 @@ public class ThriftReadSupport<T> extends ReadSupport<T> {
   }
 
   @SuppressWarnings("unchecked")
-  protected MessageType getProjectedSchema(FieldProjectionFilter fieldProjectionFilter) {
-    return new ThriftSchemaConverter(fieldProjectionFilter).convert((Class<TBase<?, ?>>)thriftClass);
+  protected MessageType getProjectedSchema(Configuration configuration, FieldProjectionFilter
+      fieldProjectionFilter) {
+    return new ThriftSchemaConverter(configuration, fieldProjectionFilter)
+        .convert((Class<TBase<?, ?>>)thriftClass);
+  }
+
+  @Deprecated
+  @SuppressWarnings("unchecked")
+  protected MessageType getProjectedSchema(FieldProjectionFilter
+    fieldProjectionFilter) {
+    return new ThriftSchemaConverter(new Configuration(), fieldProjectionFilter)
+      .convert((Class<TBase<?, ?>>)thriftClass);
   }
 
   @SuppressWarnings("unchecked")
diff --git a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftToParquetFileWriter.java b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftToParquetFileWriter.java
index b0ace04..a638715 100644
--- a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftToParquetFileWriter.java
+++ b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftToParquetFileWriter.java
@@ -93,7 +93,9 @@ public class ThriftToParquetFileWriter implements Closeable {
       boolean buffered,
       FieldIgnoredHandler errorHandler) throws IOException, InterruptedException {
     this.taskAttemptContext = taskAttemptContext;
-    this.recordWriter = new ParquetThriftBytesOutputFormat(protocolFactory, thriftClass, buffered, errorHandler).getRecordWriter(taskAttemptContext, fileToCreate);
+    this.recordWriter = new ParquetThriftBytesOutputFormat(
+        taskAttemptContext.getConfiguration(), protocolFactory, thriftClass, buffered, errorHandler)
+        .getRecordWriter(taskAttemptContext, fileToCreate);
   }
 
   /**
diff --git a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetWriteProtocol.java b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetWriteProtocol.java
index 8755ee4..ba48b37 100644
--- a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetWriteProtocol.java
+++ b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetWriteProtocol.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,6 +21,7 @@ package org.apache.parquet.thrift;
 
 import java.nio.ByteBuffer;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TField;
 import org.apache.thrift.protocol.TList;
@@ -51,6 +52,11 @@ import org.slf4j.LoggerFactory;
 
 public class ParquetWriteProtocol extends ParquetProtocol {
 
+  public static final String WRITE_THREE_LEVEL_LISTS = "parquet.thrift.write-three-level-lists";
+  public static final boolean WRITE_THREE_LEVEL_LISTS_DEFAULT = false;
+
+  private boolean writeThreeLevelList = WRITE_THREE_LEVEL_LISTS_DEFAULT;
+
   interface Events {
 
     public void start();
@@ -98,13 +104,46 @@ public class ParquetWriteProtocol extends ParquetProtocol {
 
   }
 
-  class ListWriteProtocol extends FieldBaseWriteProtocol {
+  abstract class ListWriteProtocol extends FieldBaseWriteProtocol {
+    protected int size;
+
+    public ListWriteProtocol(Events returnClause) {
+      super(returnClause);
+    }
+
+    abstract protected void startListWrapper();
 
+    abstract protected void endListWrapper();
+
+    @Override
+    public void writeListBegin(TList list) throws TException {
+      size = list.size;
+      startListWrapper();
+    }
+
+    @Override
+    public void writeListEnd() throws TException {
+      endListWrapper();
+    }
+
+    @Override
+    public void writeSetBegin(TSet set) throws TException {
+      size = set.size;
+      startListWrapper();
+    }
+
+    @Override
+    public void writeSetEnd() throws TException {
+      endListWrapper();
+    }
+
+  }
+
+  class TwoLevelListWriteProtocol extends ListWriteProtocol {
     private ColumnIO listContent;
     private TProtocol contentProtocol;
-    private int size;
 
-    public ListWriteProtocol(GroupColumnIO columnIO, ThriftField values, Events returnClause) {
+    public TwoLevelListWriteProtocol(GroupColumnIO columnIO, ThriftField values, Events returnClause) {
       super(returnClause);
       this.listContent = columnIO.getChild(0);
       this.contentProtocol = getProtocol(values, listContent, new Events() {
@@ -117,14 +156,15 @@ public class ParquetWriteProtocol extends ParquetProtocol {
         public void end() {
           ++ consumedRecords;
           if (consumedRecords == size) {
-            currentProtocol = ListWriteProtocol.this;
+            currentProtocol = TwoLevelListWriteProtocol.this;
             consumedRecords = 0;
           }
         }
       });
     }
 
-    private void startListWrapper() {
+    @Override
+    protected void startListWrapper() {
       start();
       recordConsumer.startGroup();
       if (size > 0) {
@@ -133,36 +173,67 @@ public class ParquetWriteProtocol extends ParquetProtocol {
       }
     }
 
-    private void endListWrapper() {
+    @Override
+    protected void endListWrapper() {
       if (size > 0) {
         recordConsumer.endField(listContent.getType().getName(), 0);
       }
       recordConsumer.endGroup();
       end();
     }
+  }
 
-    @Override
-    public void writeListBegin(TList list) throws TException {
-      size = list.size;
-      startListWrapper();
-    }
+  class ThreeLevelListWriteProtocol extends ListWriteProtocol {
+    private GroupColumnIO listContent;
+    private ColumnIO elementContent;
+    private TProtocol contentProtocol;
 
-    @Override
-    public void writeListEnd() throws TException {
-      endListWrapper();
+    public ThreeLevelListWriteProtocol(GroupColumnIO columnIO, ThriftField values,
+                                    Events returnClause) {
+      super(returnClause);
+      this.listContent = (GroupColumnIO) columnIO.getChild(0);
+      this.elementContent = listContent.getChild(0);
+      this.contentProtocol = getProtocol(values, elementContent, new Events() {
+        int consumedRecords = 0;
+        @Override
+        public void start() {
+          recordConsumer.startGroup();
+          recordConsumer.startField("element", 0);
+        }
+
+        @Override
+        public void end() {
+          recordConsumer.endField("element", 0);
+          recordConsumer.endGroup();
+          ++ consumedRecords;
+          if (consumedRecords == size) {
+            currentProtocol = ThreeLevelListWriteProtocol.this;
+            consumedRecords = 0;
+          } else {
+            currentProtocol = contentProtocol;
+          }
+        }
+      });
     }
 
     @Override
-    public void writeSetBegin(TSet set) throws TException {
-      size = set.size;
-      startListWrapper();
+    protected void startListWrapper() {
+      start();
+      recordConsumer.startGroup();
+      if (size > 0) {
+        recordConsumer.startField("list", 0);
+        currentProtocol = contentProtocol;
+      }
     }
 
     @Override
-    public void writeSetEnd() throws TException {
-      endListWrapper();
+    protected void endListWrapper() {
+      if (size > 0) {
+        recordConsumer.endField("list", 0);
+      }
+      recordConsumer.endGroup();
+      end();
     }
-
   }
 
   class MapWriteProtocol extends FieldBaseWriteProtocol {
@@ -427,6 +498,16 @@ public class ParquetWriteProtocol extends ParquetProtocol {
     return "<TMap keyType:" + map.keyType + " valueType:" + map.valueType + " size:" + map.size + ">";
   }
 
+  public ParquetWriteProtocol(
+      Configuration configuration, RecordConsumer recordConsumer, MessageColumnIO schema, StructType thriftType) {
+    this.recordConsumer = recordConsumer;
+    if (configuration != null) {
+      this.writeThreeLevelList = configuration.getBoolean(
+          WRITE_THREE_LEVEL_LISTS, WRITE_THREE_LEVEL_LISTS_DEFAULT);
+    }
+    this.currentProtocol = new MessageWriteProtocol(schema, thriftType);
+  }
+
   public ParquetWriteProtocol(RecordConsumer recordConsumer, MessageColumnIO schema, StructType thriftType) {
     this.currentProtocol = new MessageWriteProtocol(schema, thriftType);
     this.recordConsumer = recordConsumer;
@@ -677,10 +758,10 @@ public class ParquetWriteProtocol extends ParquetProtocol {
       p = new MapWriteProtocol((GroupColumnIO)columnIO, (MapType)type, returnClause);
       break;
     case SET:
-      p = new ListWriteProtocol((GroupColumnIO)columnIO, ((SetType)type).getValues(), returnClause);
+      p = getListWriteProtocol((GroupColumnIO) columnIO, ((SetType) type).getValues(), returnClause);
       break;
     case LIST:
-      p = new ListWriteProtocol((GroupColumnIO)columnIO, ((ListType)type).getValues(), returnClause);
+      p = getListWriteProtocol((GroupColumnIO) columnIO, ((ListType) type).getValues(), returnClause);
       break;
     case ENUM:
       p = new EnumWriteProtocol((PrimitiveColumnIO)columnIO, (EnumType)type, returnClause);
@@ -688,4 +769,10 @@ public class ParquetWriteProtocol extends ParquetProtocol {
     }
     return p;
   }
+
+  private TProtocol getListWriteProtocol(GroupColumnIO columnIO, ThriftField values, Events returnClause) {
+    return writeThreeLevelList ?
+      new ThreeLevelListWriteProtocol(columnIO, values, returnClause):
+      new TwoLevelListWriteProtocol(columnIO, values, returnClause);
+  }
 }
diff --git a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConvertVisitor.java b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConvertVisitor.java
index c256880..83d6dd5 100644
--- a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConvertVisitor.java
+++ b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConvertVisitor.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.ShouldNeverHappenException;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.LogicalTypeAnnotation;
@@ -53,6 +54,7 @@ import org.apache.parquet.thrift.struct.ThriftType.StringType;
 import org.apache.parquet.thrift.struct.ThriftType.StructType;
 import org.apache.parquet.thrift.struct.ThriftType.StructType.StructOrUnionType;
 
+import static org.apache.parquet.schema.ConversionPatterns.listOfElements;
 import static org.apache.parquet.schema.ConversionPatterns.listType;
 import static org.apache.parquet.schema.ConversionPatterns.mapType;
 import static org.apache.parquet.schema.LogicalTypeAnnotation.enumType;
@@ -76,22 +78,39 @@ class ThriftSchemaConvertVisitor implements ThriftType.StateVisitor<ConvertedFie
   private final boolean doProjection;
   private final boolean keepOneOfEachUnion;
 
-  private ThriftSchemaConvertVisitor(FieldProjectionFilter fieldProjectionFilter, boolean doProjection, boolean keepOneOfEachUnion) {
+  private final boolean writeThreeLevelList;
+
+  private ThriftSchemaConvertVisitor(FieldProjectionFilter fieldProjectionFilter, boolean doProjection,
+                                     boolean keepOneOfEachUnion, Configuration configuration) {
     this.fieldProjectionFilter = Objects.requireNonNull(fieldProjectionFilter,
-        "fieldProjectionFilter cannot be null");
+      "fieldProjectionFilter cannot be null");
     this.doProjection = doProjection;
     this.keepOneOfEachUnion = keepOneOfEachUnion;
+    if (configuration != null) {
+      this.writeThreeLevelList = configuration.getBoolean(
+        ParquetWriteProtocol.WRITE_THREE_LEVEL_LISTS,
+        ParquetWriteProtocol.WRITE_THREE_LEVEL_LISTS_DEFAULT);
+    } else {
+      writeThreeLevelList = ParquetWriteProtocol.WRITE_THREE_LEVEL_LISTS_DEFAULT;
+    }
+  }
+
+  private ThriftSchemaConvertVisitor(FieldProjectionFilter fieldProjectionFilter, boolean doProjection,
+                                     boolean keepOneOfEachUnion) {
+    this(fieldProjectionFilter, doProjection, keepOneOfEachUnion, new Configuration());
   }
 
   @Deprecated
   public static MessageType convert(StructType struct, FieldProjectionFilter filter) {
-    return convert(struct, filter, true);
+    return convert(struct, filter, true, new Configuration());
   }
 
-  public static MessageType convert(StructType struct, FieldProjectionFilter filter, boolean keepOneOfEachUnion) {
+  public static MessageType convert(StructType struct, FieldProjectionFilter filter, boolean keepOneOfEachUnion,
+                                    Configuration conf) {
     State state = new State(new FieldsPath(), REPEATED, "ParquetSchema");
 
-    ConvertedField converted = struct.accept(new ThriftSchemaConvertVisitor(filter, true, keepOneOfEachUnion), state);
+    ConvertedField converted = struct.accept(
+      new ThriftSchemaConvertVisitor(filter, true, keepOneOfEachUnion, conf), state);
 
     if (!converted.isKeep()) {
       throw new ThriftProjectionException("No columns have been selected");
@@ -178,7 +197,12 @@ class ThriftSchemaConvertVisitor implements ThriftType.StateVisitor<ConvertedFie
   }
 
   private ConvertedField visitListLike(ThriftField listLike, State state, boolean isSet) {
-    State childState = new State(state.path, REPEATED, state.name + "_tuple");
+    State childState;
+    if (writeThreeLevelList) {
+      childState = new State(state.path, REQUIRED, "element");
+    } else {
+      childState = new State(state.path, REPEATED, state.name + "_tuple");
+    }
 
     ConvertedField converted = listLike.getType().accept(this, childState);
 
@@ -194,7 +218,13 @@ class ThriftSchemaConvertVisitor implements ThriftType.StateVisitor<ConvertedFie
         }
       }
 
-      return new Keep(state.path, listType(state.repetition, state.name, converted.asKeep().getType()));
+      if (writeThreeLevelList) {
+        return new Keep(
+            state.path, listOfElements(state.repetition, state.name, converted.asKeep().getType()));
+      } else {
+        return new Keep(
+            state.path, listType(state.repetition, state.name, converted.asKeep().getType()));
+      }
     }
 
     return new Drop(state.path);
diff --git a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConverter.java b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConverter.java
index 2cb061b..61cfd4c 100644
--- a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConverter.java
+++ b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConverter.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,6 +22,8 @@ import com.twitter.elephantbird.thrift.TStructDescriptor;
 import com.twitter.elephantbird.thrift.TStructDescriptor.Field;
 import java.util.HashSet;
 import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TEnum;
 import org.apache.thrift.TUnion;
@@ -50,10 +52,23 @@ import static org.apache.parquet.schema.Type.Repetition.REPEATED;
 public class ThriftSchemaConverter {
   private final FieldProjectionFilter fieldProjectionFilter;
 
+  private Configuration conf;
+
   public ThriftSchemaConverter() {
     this(FieldProjectionFilter.ALL_COLUMNS);
   }
 
+  public ThriftSchemaConverter(Configuration configuration) {
+    this();
+    conf = configuration;
+  }
+
+  public ThriftSchemaConverter(
+      Configuration configuration, FieldProjectionFilter fieldProjectionFilter) {
+    this(fieldProjectionFilter);
+    conf = configuration;
+  }
+
   public ThriftSchemaConverter(FieldProjectionFilter fieldProjectionFilter) {
     this.fieldProjectionFilter = fieldProjectionFilter;
   }
@@ -72,7 +87,7 @@ public class ThriftSchemaConverter {
    * @return the struct as a Parquet message type
    */
   public MessageType convert(StructType struct) {
-    MessageType messageType = ThriftSchemaConvertVisitor.convert(struct, fieldProjectionFilter, true);
+    MessageType messageType = ThriftSchemaConvertVisitor.convert(struct, fieldProjectionFilter, true, conf);
     fieldProjectionFilter.assertNoUnmatchedPatterns();
     return messageType;
   }
@@ -85,7 +100,7 @@ public class ThriftSchemaConverter {
    * @return the struct as a Parquet message type
    */
   public static MessageType convertWithoutProjection(StructType struct) {
-    return ThriftSchemaConvertVisitor.convert(struct, FieldProjectionFilter.ALL_COLUMNS, false);
+    return ThriftSchemaConvertVisitor.convert(struct, FieldProjectionFilter.ALL_COLUMNS, false, new Configuration());
   }
 
   public static <T extends TBase<?,?>> StructOrUnionType structOrUnionType(Class<T> klass) {
diff --git a/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestThriftToParquetFileWriter.java b/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestThriftToParquetFileWriter.java
index b8ff23c..e96b226 100644
--- a/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestThriftToParquetFileWriter.java
+++ b/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestThriftToParquetFileWriter.java
@@ -32,6 +32,7 @@ import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.hadoop.util.ContextUtil;
 import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.thrift.ParquetWriteProtocol;
 import org.apache.parquet.thrift.test.RequiredPrimitiveFixture;
 import org.apache.parquet.thrift.test.TestListsInMap;
 
@@ -79,7 +80,7 @@ public class TestThriftToParquetFileWriter {
                 "bob.roberts@example.com",
                 Arrays.asList(new PhoneNumber("1234567890")))));
 
-    final Path fileToCreate = createFile(a);
+    final Path fileToCreate = createFile(new Configuration(), a);
 
     ParquetReader<Group> reader = createRecordReader(fileToCreate);
 
@@ -109,7 +110,8 @@ public class TestThriftToParquetFileWriter {
       boolStats.setMinMax(false, true);
 
       //write rows to a file
-      Path p = createFile(new RequiredPrimitiveFixture(false, (byte)32, (short)32, 2, 90l, -15.55d, "as"),
+      Path p = createFile(new Configuration(),
+                          new RequiredPrimitiveFixture(false, (byte)32, (short)32, 2, 90l, -15.55d, "as"),
                           new RequiredPrimitiveFixture(false, (byte)100, (short)100, 100, 287l, -9.0d, "world"),
                           new RequiredPrimitiveFixture(true, (byte)2, (short)2, 9, -17l, 9.63d, "hello"));
       final Configuration configuration = new Configuration();
@@ -151,7 +153,8 @@ public class TestThriftToParquetFileWriter {
       binaryStatsLarge.setMinMax(Binary.fromString("some small string"),
                                  Binary.fromString("some very large string here to test in this function"));
       //write rows to a file
-      Path p_large = createFile(new RequiredPrimitiveFixture(false, (byte)2, (short)32, -Integer.MAX_VALUE,
+      Path p_large = createFile(new Configuration(),
+                                new RequiredPrimitiveFixture(false, (byte)2, (short)32, -Integer.MAX_VALUE,
                                                             -Long.MAX_VALUE, -Double.MAX_VALUE, "some small string"),
                                 new RequiredPrimitiveFixture(false, (byte)100, (short)100, Integer.MAX_VALUE,
                                                              Long.MAX_VALUE, Double.MAX_VALUE,
@@ -201,7 +204,7 @@ public class TestThriftToParquetFileWriter {
     final TestMapInList listMap = new TestMapInList("listmap",
         Arrays.asList(map1, map2));
 
-    final Path fileToCreate = createFile(listMap);
+    final Path fileToCreate = createFile(new Configuration(), listMap);
 
     ParquetReader<Group> reader = createRecordReader(fileToCreate);
 
@@ -221,7 +224,7 @@ public class TestThriftToParquetFileWriter {
     Map<String, List<String>> map = new HashMap<String,List<String>>();
     map.put("key", Arrays.asList("val1","val2"));
     final TestListInMap mapList = new TestListInMap("maplist", map);
-    final Path fileToCreate = createFile(mapList);
+    final Path fileToCreate = createFile(new Configuration(), mapList);
 
     ParquetReader<Group> reader = createRecordReader(fileToCreate);
 
@@ -239,7 +242,7 @@ public class TestThriftToParquetFileWriter {
     Map<List<String>, List<String>> map = new HashMap<List<String>,List<String>>();
     map.put(Arrays.asList("key1","key2"), Arrays.asList("val1","val2"));
     final TestListsInMap mapList = new TestListsInMap("maplists", map);
-    final Path fileToCreate = createFile(mapList);
+    final Path fileToCreate = createFile(new Configuration(), mapList);
 
     ParquetReader<Group> reader = createRecordReader(fileToCreate);
 
@@ -256,6 +259,68 @@ public class TestThriftToParquetFileWriter {
     }
   }
 
+  @Test
+  public void testWriteFileWithThreeLevelsList()
+      throws IOException, InterruptedException, TException {
+    final AddressBook a = new AddressBook(
+        Arrays.asList(
+            new Person(
+                new Name("Bob", "Roberts"),
+                0,
+                "bob.roberts@example.com",
+                Arrays.asList(new PhoneNumber("1234567890")))));
+
+    Configuration conf = new Configuration();
+    conf.set(ParquetWriteProtocol.WRITE_THREE_LEVEL_LISTS, "true");
+
+    final Path fileToCreate = createFile(conf, a);
+
+    ParquetReader<Group> reader = createRecordReader(fileToCreate);
+
+    Group g = null;
+    int i = 0;
+    while((g = reader.read()) != null) {
+      assertEquals(a.persons.size(), g.getFieldRepetitionCount("persons"));
+      assertEquals(
+          a.persons.get(0).email,
+          g.getGroup("persons", 0).getGroup(0, 0).getGroup(0, 0).getString("email", 0));
+      // just some sanity check, we're testing the various layers somewhere else
+      ++i;
+    }
+    assertEquals("read 1 record", 1, i);
+  }
+
+  @Test
+  public void testWriteFileListOfMapWithThreeLevelLists()
+      throws IOException, InterruptedException, TException {
+    Map<String, String> map1 = new HashMap<String,String>();
+    map1.put("key11", "value11");
+    map1.put("key12", "value12");
+    Map<String, String> map2 = new HashMap<String,String>();
+    map2.put("key21", "value21");
+    final TestMapInList listMap = new TestMapInList("listmap",
+        Arrays.asList(map1, map2));
+
+    Configuration conf = new Configuration();
+    conf.set(ParquetWriteProtocol.WRITE_THREE_LEVEL_LISTS, "true");
+
+    final Path fileToCreate = createFile(conf, listMap);
+
+    ParquetReader<Group> reader = createRecordReader(fileToCreate);
+
+    Group g = null;
+    while((g = reader.read()) != null) {
+      assertEquals(listMap.names.size(),
+          g.getGroup("names", 0).getFieldRepetitionCount("list"));
+      assertEquals(listMap.names.get(0).size(),
+          g.getGroup("names", 0).getGroup("list", 0).
+              getGroup("element", 0).getFieldRepetitionCount("key_value"));
+      assertEquals(listMap.names.get(1).size(),
+          g.getGroup("names", 0).getGroup("list", 1).
+              getGroup("element", 0).getFieldRepetitionCount("key_value"));
+    }
+  }
+
   private ParquetReader<Group> createRecordReader(Path parquetFilePath) throws IOException {
     Configuration configuration = new Configuration(true);
 
@@ -267,10 +332,10 @@ public class TestThriftToParquetFileWriter {
     return new ParquetReader<Group>(parquetFilePath, readSupport);
   }
 
-  private <T extends TBase<?,?>> Path createFile(T... tObjs) throws IOException, InterruptedException, TException  {
+  private <T extends TBase<?,?>> Path createFile(Configuration conf, T... tObjs)
+      throws IOException, InterruptedException, TException  {
     final Path fileToCreate = new Path("target/test/TestThriftToParquetFileWriter/"+tObjs[0].getClass()+".parquet");
-    LOG.info("File created: {}", fileToCreate.toString());
-    Configuration conf = new Configuration();
+    LOG.info("File created: " + fileToCreate.toString());
     final FileSystem fs = fileToCreate.getFileSystem(conf);
     if (fs.exists(fileToCreate)) {
       fs.delete(fileToCreate, true);
diff --git a/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetWriteProtocol.java b/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetWriteProtocol.java
index f9702c0..1311d76 100644
--- a/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetWriteProtocol.java
+++ b/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetWriteProtocol.java
@@ -30,6 +30,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 
+import com.twitter.elephantbird.thrift.test.TestMapInList;
+import com.twitter.elephantbird.thrift.test.TestNameSet;
+import org.apache.hadoop.conf.Configuration;
 import org.junit.ComparisonFailure;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,8 +51,6 @@ import org.apache.parquet.io.RecordConsumerLoggingWrapper;
 import org.apache.parquet.pig.PigSchemaConverter;
 import org.apache.parquet.pig.TupleWriteSupport;
 import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.thrift.ParquetWriteProtocol;
-import org.apache.parquet.thrift.ThriftSchemaConverter;
 import org.apache.parquet.thrift.struct.ThriftType.StructType;
 
 import com.twitter.data.proto.tutorial.thrift.AddressBook;
@@ -518,9 +519,175 @@ public class TestParquetWriteProtocol {
    validateThrift(thriftExpectations, a);
   }
 
+  @Test
+  public void testSetWithTwoLevelList() throws TException {
+    final Set<String> names = new HashSet<String>();
+    names.add("John");
+    names.add("Jack");
+    final TestNameSet o = new TestNameSet("name", names);
+
+    String[] expectations = {
+      "startMessage()",
+        "startField(name, 0)",
+          "addBinary(name)",
+        "endField(name, 0)",
+        "startField(names, 1)",
+          "startGroup()",
+            "startField(names_tuple, 0)",
+              "addBinary(John)",
+              "addBinary(Jack)",
+            "endField(names_tuple, 0)",
+          "endGroup()",
+        "endField(names, 1)",
+      "endMessage()"};
+    Configuration conf = new Configuration();
+    conf.set(ParquetWriteProtocol.WRITE_THREE_LEVEL_LISTS, "false");
+    validateThrift(conf, expectations, o);
+  }
+
+  @Test
+  public void testSetWithThreeLevelList() throws TException {
+    final Set<String> names = new HashSet<String>();
+    names.add("John");
+    names.add("Jack");
+    final TestNameSet o = new TestNameSet("name", names);
+
+    String[] expectations = {
+      "startMessage()",
+        "startField(name, 0)",
+          "addBinary(name)",
+        "endField(name, 0)",
+        "startField(names, 1)",
+          "startGroup()",
+            "startField(list, 0)",
+              "startGroup()",
+                "startField(element, 0)",
+                  "addBinary(John)",
+                "endField(element, 0)",
+              "endGroup()",
+              "startGroup()",
+                "startField(element, 0)",
+                  "addBinary(Jack)",
+                "endField(element, 0)",
+              "endGroup()",
+            "endField(list, 0)",
+          "endGroup()",
+        "endField(names, 1)",
+      "endMessage()"};
+    Configuration conf = new Configuration();
+    conf.set(ParquetWriteProtocol.WRITE_THREE_LEVEL_LISTS, "true");
+    validateThrift(conf, expectations, o);
+  }
+
+  @Test
+  public void testNameThreeLevelList() throws TException {
+    final List<String> names = new ArrayList<String>();
+    names.add("John");
+    names.add("Jack");
+    final TestNameList o = new TestNameList("name", names);
+
+    String[] expectations = {
+        "startMessage()",
+          "startField(name, 0)",
+            "addBinary(name)",
+          "endField(name, 0)",
+          "startField(names, 1)",
+            "startGroup()",
+              "startField(list, 0)",
+                "startGroup()",
+                  "startField(element, 0)",
+                    "addBinary(John)",
+                  "endField(element, 0)",
+                "endGroup()",
+                "startGroup()",
+                  "startField(element, 0)",
+                    "addBinary(Jack)",
+                  "endField(element, 0)",
+                "endGroup()",
+              "endField(list, 0)",
+            "endGroup()",
+          "endField(names, 1)",
+        "endMessage()"};
+    Configuration conf = new Configuration();
+    conf.set(ParquetWriteProtocol.WRITE_THREE_LEVEL_LISTS, "true");
+    validateThrift(conf, expectations, o);
+  }
+
+  @Test
+  public void testListOfMapThreeLevelList() throws TException {
+    Map<String, String> map1 = new HashMap<String,String>();
+    map1.put("key11", "value11");
+    map1.put("key12", "value12");
+    Map<String, String> map2 = new HashMap<String,String>();
+    map2.put("key21", "value21");
+    final TestMapInList listMap = new TestMapInList("listmap",
+        Arrays.asList(map1, map2));
+
+    String[] expectations = {
+        "startMessage()",
+          "startField(name, 0)",
+            "addBinary(listmap)",
+          "endField(name, 0)",
+          "startField(names, 1)",
+            "startGroup()",
+              "startField(list, 0)",
+                "startGroup()",
+                  "startField(element, 0)",
+                    "startGroup()",
+                      "startField(key_value, 0)",
+                        "startGroup()",
+                          "startField(key, 0)",
+                            "addBinary(key12)",
+                          "endField(key, 0)",
+                          "startField(value, 1)",
+                            "addBinary(value12)",
+                          "endField(value, 1)",
+                        "endGroup()",
+                        "startGroup()",
+                          "startField(key, 0)",
+                            "addBinary(key11)",
+                          "endField(key, 0)",
+                          "startField(value, 1)",
+                            "addBinary(value11)",
+                          "endField(value, 1)",
+                        "endGroup()",
+                      "endField(key_value, 0)",
+                    "endGroup()",
+                  "endField(element, 0)",
+                "endGroup()",
+                "startGroup()",
+                  "startField(element, 0)",
+                    "startGroup()",
+                      "startField(key_value, 0)",
+                        "startGroup()",
+                          "startField(key, 0)",
+                            "addBinary(key21)",
+                          "endField(key, 0)",
+                          "startField(value, 1)",
+                            "addBinary(value21)",
+                          "endField(value, 1)",
+                        "endGroup()",
+                      "endField(key_value, 0)",
+                    "endGroup()",
+                  "endField(element, 0)",
+                "endGroup()",
+              "endField(list, 0)",
+            "endGroup()",
+          "endField(names, 1)",
+        "endMessage()"};
+    Configuration conf = new Configuration();
+    conf.set(ParquetWriteProtocol.WRITE_THREE_LEVEL_LISTS, "true");
+    validateThrift(conf, expectations, listMap);
+  }
+
   private void validateThrift(String[] expectations, TBase<?, ?> a)
       throws TException {
-    final ThriftSchemaConverter thriftSchemaConverter = new ThriftSchemaConverter();
+    validateThrift(new Configuration(), expectations, a);
+  }
+
+  private void validateThrift(Configuration configuration, String[] expectations, TBase<?, ?> a)
+      throws TException {
+    final ThriftSchemaConverter thriftSchemaConverter = new ThriftSchemaConverter(configuration);
 //      System.out.println(a);
     final Class<TBase<?,?>> class1 = (Class<TBase<?,?>>)a.getClass();
     final MessageType schema = thriftSchemaConverter.convert(class1);
@@ -528,7 +695,8 @@ public class TestParquetWriteProtocol {
     final StructType structType = thriftSchemaConverter.toStructType(class1);
     ExpectationValidatingRecordConsumer recordConsumer = new ExpectationValidatingRecordConsumer(new ArrayDeque<String>(Arrays.asList(expectations)));
     final MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
-    ParquetWriteProtocol p = new ParquetWriteProtocol(new RecordConsumerLoggingWrapper(recordConsumer), columnIO, structType);
+    ParquetWriteProtocol p = new ParquetWriteProtocol(
+        configuration, new RecordConsumerLoggingWrapper(recordConsumer), columnIO, structType);
     a.write(p);
   }
 
diff --git a/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestThriftParquetReaderWriter.java b/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestThriftParquetReaderWriter.java
index e23aad7..cfd1a5c 100644
--- a/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestThriftParquetReaderWriter.java
+++ b/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestThriftParquetReaderWriter.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -39,7 +39,17 @@ public class TestThriftParquetReaderWriter {
 
   @Test
   public void testWriteRead() throws IOException {
+    readWriteTest(true);
+  }
+
+  @Test
+  public void testWriteReadTwoLevelList() throws IOException {
+    readWriteTest(false);
+  }
+
+  private void readWriteTest(Boolean useThreeLevelLists) throws IOException {
     Configuration configuration = new Configuration();
+    configuration.set(ParquetWriteProtocol.WRITE_THREE_LEVEL_LISTS, useThreeLevelLists.toString());
     Path f = new Path("target/test/TestThriftParquetReaderWriter");
     FileSystem fs = f.getFileSystem(configuration);
     if (fs.exists(f)) {