You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vi...@apache.org on 2021/04/23 09:58:44 UTC

[drill] branch master updated: DRILL-7825: Unknown logical type in Parquet (#2143)

This is an automated email from the ASF dual-hosted git repository.

vitalii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d21637  DRILL-7825: Unknown logical type <LogicalType UUID:UUIDType()> in Parquet (#2143)
5d21637 is described below

commit 5d216375271fe98b3733549bd85b93248d31075e
Author: Vitalii Diravka <vi...@apache.org>
AuthorDate: Fri Apr 23 12:58:33 2021 +0300

    DRILL-7825: Unknown logical type <LogicalType UUID:UUIDType()> in Parquet (#2143)
    
    * DRILL-7825: Error: SYSTEM ERROR: RuntimeException: Unknown logical type <LogicalType UUID:UUIDType()>
---
 exec/java-exec/pom.xml                             |    4 -
 .../exec/store/parquet/ParquetRecordWriter.java    |   14 +-
 .../parquet/columnreaders/ColumnReaderFactory.java |    4 +-
 .../store/parquet2/DrillParquetGroupConverter.java |   27 +-
 .../hadoop/ParquetColumnChunkPageWriteStore.java   |  419 +++--
 .../apache/parquet/hadoop/ParquetFileWriter.java   | 1634 ++++++++++++++++++++
 .../parquet/ParquetSimpleTestFileGenerator.java    |   61 +-
 .../exec/store/parquet/TestParquetComplex.java     |   14 +
 .../store/parquet/TestParquetLogicalTypes.java     |   31 +
 .../parquet/parquet_test_file_simple.parquet       |  Bin 0 -> 5048 bytes
 .../parquet/uuid-simple-fixed-length-array.parquet |  Bin 0 -> 1328 bytes
 .../resources/store/parquet/complex/uuid.parquet   |  Bin 0 -> 38974 bytes
 exec/jdbc-all/pom.xml                              |   35 +-
 pom.xml                                            |    2 +-
 14 files changed, 2039 insertions(+), 206 deletions(-)

diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index f30700c..a08e172 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -259,10 +259,6 @@
     </dependency>
     <dependency>
       <groupId>org.apache.parquet</groupId>
-      <artifactId>parquet-format</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.parquet</groupId>
       <artifactId>parquet-common</artifactId>
       <version>${parquet.version}</version>
     </dependency>
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index 4fd5064..45a2c7f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -57,7 +57,6 @@ import org.apache.drill.exec.vector.complex.reader.FieldReader;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
 import org.apache.parquet.column.ColumnWriteStore;
 import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.ParquetProperties.WriterVersion;
@@ -251,10 +250,6 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
     // We don't want this number to be too small either. Ideally, slightly bigger than the page size,
     // but not bigger than the block buffer
     int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize));
-    // TODO: Use initialSlabSize from ParquetProperties once drill will be updated to the latest version of Parquet library
-    int initialSlabSize = CapacityByteArrayOutputStream.initialSlabSizeHeuristic(64, pageSize, 10);
-    // TODO: Replace ParquetColumnChunkPageWriteStore with ColumnChunkPageWriteStore from parquet library
-    // once PARQUET-1006 will be resolved
     ParquetProperties parquetProperties = ParquetProperties.builder()
         .withPageSize(pageSize)
         .withDictionaryEncoding(enableDictionary)
@@ -263,10 +258,11 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
         .withAllocator(new ParquetDirectByteBufferAllocator(oContext))
         .withValuesWriterFactory(new DefaultV1ValuesWriterFactory())
         .build();
-    pageStore = new ParquetColumnChunkPageWriteStore(codecFactory.getCompressor(codec), schema, initialSlabSize,
-        pageSize, parquetProperties.getAllocator(), parquetProperties.getPageWriteChecksumEnabled(),
-        parquetProperties.getColumnIndexTruncateLength()
-    );
+    // TODO: Replace ParquetColumnChunkPageWriteStore with ColumnChunkPageWriteStore from parquet library
+    //   once DRILL-7906 (PARQUET-1006) will be resolved
+    pageStore = new ParquetColumnChunkPageWriteStore(codecFactory.getCompressor(codec), schema,
+            parquetProperties.getInitialSlabSize(), pageSize, parquetProperties.getAllocator(),
+            parquetProperties.getColumnIndexTruncateLength(), parquetProperties.getPageWriteChecksumEnabled());
     store = new ColumnWriteStoreV1(pageStore, parquetProperties);
     MessageColumnIO columnIO = new ColumnIOFactory(false).getColumnIO(this.schema);
     consumer = columnIO.getRecordWriter(store);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
index fcb61f6..25c9904 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
@@ -201,13 +201,15 @@ public class ColumnReaderFactory {
         } else if (convertedType == ConvertedType.INTERVAL) {
           return new NullableFixedByteAlignedReaders.NullableIntervalReader(recordReader, descriptor,
               columnChunkMetaData, fixedLength, (NullableIntervalVector) v, schemaElement);
+        } else {
+          return new NullableFixedByteAlignedReaders.NullableFixedBinaryReader(recordReader, descriptor,
+                  columnChunkMetaData, fixedLength, (NullableVarBinaryVector) v, schemaElement);
         }
       } else {
         return getNullableColumnReader(recordReader, descriptor,
             columnChunkMetaData, fixedLength, v, schemaElement);
       }
     }
-    throw new Exception("Unexpected parquet metadata configuration.");
   }
 
   static VarLengthValuesColumn<?> getReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
index fbe3ae3..fdb6e79 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Optional;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 
@@ -71,6 +72,7 @@ import org.apache.parquet.io.api.Converter;
 import org.apache.parquet.io.api.GroupConverter;
 import org.apache.parquet.io.api.PrimitiveConverter;
 import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Type;
 import org.apache.parquet.schema.Type.Repetition;
@@ -328,23 +330,30 @@ public class DrillParquetGroupConverter extends GroupConverter {
         }
       }
       case FIXED_LEN_BYTE_ARRAY:
-        switch (type.getOriginalType()) {
-          case DECIMAL: {
+        LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<PrimitiveConverter> typeAnnotationVisitor = new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<PrimitiveConverter>() {
+          @Override
+          public Optional<PrimitiveConverter> visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) {
             ParquetReaderUtility.checkDecimalTypeEnabled(options);
-            return getVarDecimalConverter(name, type);
+            return Optional.of(getVarDecimalConverter(name, type));
           }
-          case INTERVAL: {
+
+          @Override
+          public Optional<PrimitiveConverter> visit(LogicalTypeAnnotation.IntervalLogicalTypeAnnotation intervalLogicalType) {
             IntervalWriter writer = type.isRepetition(Repetition.REPEATED)
                 ? getWriter(name, (m, f) -> m.list(f).interval(), l -> l.list().interval())
-                : getWriter(name, (m, f) -> m.interval(f), l -> l.interval());
-            return new DrillFixedLengthByteArrayToInterval(writer);
+                : getWriter(name, MapWriter::interval, ListWriter::interval);
+            return Optional.of(new DrillFixedLengthByteArrayToInterval(writer));
           }
-          default: {
+        };
+
+        LogicalTypeAnnotation logicalTypeAnnotation = type.getLogicalTypeAnnotation();
+        if (logicalTypeAnnotation != null) {
+          return logicalTypeAnnotation.accept(typeAnnotationVisitor).orElseGet(() -> {
             VarBinaryWriter writer = type.isRepetition(Repetition.REPEATED)
                 ? getWriter(name, (m, f) -> m.list(f).varBinary(), l -> l.list().varBinary())
-                : getWriter(name, (m, f) -> m.varBinary(f), l -> l.varBinary());
+                : getWriter(name, MapWriter::varBinary, ListWriter::varBinary);
             return new DrillFixedBinaryToVarbinaryConverter(writer, type.getTypeLength(), mutator.getManagedBuffer());
-          }
+          });
         }
       default:
         throw new UnsupportedOperationException("Unsupported type: " + type.getPrimitiveTypeName());
diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java
index b8f707d..790e3c3 100644
--- a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java
+++ b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java
@@ -31,77 +31,43 @@ import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.page.DictionaryPage;
 import org.apache.parquet.column.page.PageWriteStore;
 import org.apache.parquet.column.page.PageWriter;
 import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.InternalColumnEncryptionSetup;
+import org.apache.parquet.crypto.InternalFileEncryptor;
+import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import org.apache.parquet.format.BlockCipher;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
 import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder;
 import org.apache.parquet.io.ParquetEncodingException;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.bytes.ByteBufferAllocator;
+import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/**
- * This is a copy of ColumnChunkPageWriteStore from parquet library except of OutputStream that is used here.
- * Using of CapacityByteArrayOutputStream allows to use different ByteBuffer allocators.
- * It will be no need in this class once PARQUET-1006 is resolved.
- */
-public class ParquetColumnChunkPageWriteStore implements PageWriteStore, Closeable {
-
-  private static final Logger logger = LoggerFactory.getLogger(ParquetColumnChunkPageWriteStore.class);
+@InterfaceAudience.Private
+public class ParquetColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore, Closeable {
+  private static final Logger LOG = LoggerFactory.getLogger(ParquetColumnChunkPageWriteStore.class);
 
   private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
 
-  private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new HashMap<>();
-  private final MessageType schema;
-
-  public ParquetColumnChunkPageWriteStore(BytesCompressor compressor,
-                                          MessageType schema,
-                                          int initialSlabSize,
-                                          int maxCapacityHint,
-                                          ByteBufferAllocator allocator,
-                                          boolean pageWriteChecksumEnabled,
-                                          int columnIndexTruncateLength) {
-    this.schema = schema;
-    for (ColumnDescriptor path : schema.getColumns()) {
-      writers.put(path, new ColumnChunkPageWriter(path, compressor, initialSlabSize,
-          maxCapacityHint, allocator, pageWriteChecksumEnabled, columnIndexTruncateLength));
-    }
-  }
-
-  @Override
-  public PageWriter getPageWriter(ColumnDescriptor path) {
-    return writers.get(path);
-  }
-
-  /**
-   * Writes the column chunks in the corresponding row group
-   * @param writer the parquet file writer
-   * @throws IOException if the file can not be created
-   */
-  public void flushToFileWriter(ParquetFileWriter writer) throws IOException {
-    for (ColumnDescriptor path : schema.getColumns()) {
-      ColumnChunkPageWriter pageWriter = writers.get(path);
-      pageWriter.writeToFileWriter(writer);
-    }
-  }
-
-  @Override
-  public void close() {
-    for (ColumnChunkPageWriter pageWriter : writers.values()) {
-      pageWriter.close();
-    }
-  }
-
-  private static final class ColumnChunkPageWriter implements PageWriter, Closeable {
+  private static final class ColumnChunkPageWriter implements PageWriter, BloomFilterWriter, Closeable {
 
     private final ColumnDescriptor path;
     private final BytesCompressor compressor;
 
+    private final CapacityByteArrayOutputStream tempOutputStream;
     private final CapacityByteArrayOutputStream buf;
     private DictionaryPage dictionaryPage;
 
@@ -111,37 +77,74 @@ public class ParquetColumnChunkPageWriteStore implements PageWriteStore, Closeab
     private int pageCount;
 
     // repetition and definition level encodings are used only for v1 pages and don't change
-    private Set<Encoding> rlEncodings = new HashSet<>();
-    private Set<Encoding> dlEncodings = new HashSet<>();
-    private List<Encoding> dataEncodings = new ArrayList<>();
+    private Set<Encoding> rlEncodings = new HashSet<Encoding>();
+    private Set<Encoding> dlEncodings = new HashSet<Encoding>();
+    private List<Encoding> dataEncodings = new ArrayList<Encoding>();
 
+    private BloomFilter bloomFilter;
     private ColumnIndexBuilder columnIndexBuilder;
     private OffsetIndexBuilder offsetIndexBuilder;
     private Statistics totalStatistics;
+    private final ByteBufferAllocator allocator;
 
     private final CRC32 crc;
     boolean pageWriteChecksumEnabled;
 
+    private final BlockCipher.Encryptor headerBlockEncryptor;
+    private final BlockCipher.Encryptor pageBlockEncryptor;
+    private final int rowGroupOrdinal;
+    private final int columnOrdinal;
+    private int pageOrdinal;
+    private final byte[] dataPageAAD;
+    private final byte[] dataPageHeaderAAD;
+    private final byte[] fileAAD;
+
     private ColumnChunkPageWriter(ColumnDescriptor path,
                                   BytesCompressor compressor,
                                   int initialSlabSize,
                                   int maxCapacityHint,
                                   ByteBufferAllocator allocator,
+                                  int columnIndexTruncateLength,
                                   boolean pageWriteChecksumEnabled,
-                                  int columnIndexTruncateLength) {
+                                  BlockCipher.Encryptor headerBlockEncryptor,
+                                  BlockCipher.Encryptor pageBlockEncryptor,
+                                  byte[] fileAAD,
+                                  int rowGroupOrdinal,
+                                  int columnOrdinal) {
       this.path = path;
       this.compressor = compressor;
+      this.allocator = allocator;
+      this.tempOutputStream = new CapacityByteArrayOutputStream(initialSlabSize, maxCapacityHint, allocator);
       this.buf = new CapacityByteArrayOutputStream(initialSlabSize, maxCapacityHint, allocator);
-      this.totalStatistics = Statistics.createStats(this.path.getPrimitiveType());
       this.columnIndexBuilder = ColumnIndexBuilder.getBuilder(path.getPrimitiveType(), columnIndexTruncateLength);
       this.offsetIndexBuilder = OffsetIndexBuilder.getBuilder();
       this.pageWriteChecksumEnabled = pageWriteChecksumEnabled;
       this.crc = pageWriteChecksumEnabled ? new CRC32() : null;
+
+      this.headerBlockEncryptor = headerBlockEncryptor;
+      this.pageBlockEncryptor = pageBlockEncryptor;
+      this.fileAAD = fileAAD;
+      this.rowGroupOrdinal = rowGroupOrdinal;
+      this.columnOrdinal = columnOrdinal;
+      this.pageOrdinal = -1;
+      if (null != headerBlockEncryptor) {
+        dataPageHeaderAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DataPageHeader,
+                rowGroupOrdinal, columnOrdinal, 0);
+      } else {
+        dataPageHeaderAAD = null;
+      }
+      if (null != pageBlockEncryptor) {
+        dataPageAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DataPage,
+                rowGroupOrdinal, columnOrdinal, 0);
+      } else {
+        dataPageAAD = null;
+      }
     }
 
     @Override
+    @Deprecated
     public void writePage(BytesInput bytesInput, int valueCount, Statistics<?> statistics, Encoding rlEncoding,
-          Encoding dlEncoding, Encoding valuesEncoding) throws IOException {
+                          Encoding dlEncoding, Encoding valuesEncoding) throws IOException {
       // Setting the builders to the no-op ones so no column/offset indexes will be written for this column chunk
       columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder();
       offsetIndexBuilder = OffsetIndexBuilder.getNoOpBuilder();
@@ -150,49 +153,66 @@ public class ParquetColumnChunkPageWriteStore implements PageWriteStore, Closeab
     }
 
     @Override
-    public void writePage(BytesInput bytes, int valueCount, int rowCount, Statistics statistics,
-                          Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) throws IOException {
+    public void writePage(BytesInput bytes,
+                          int valueCount,
+                          int rowCount,
+                          Statistics statistics,
+                          Encoding rlEncoding,
+                          Encoding dlEncoding,
+                          Encoding valuesEncoding) throws IOException {
+      pageOrdinal++;
       long uncompressedSize = bytes.size();
       if (uncompressedSize > Integer.MAX_VALUE) {
         throw new ParquetEncodingException(
-            "Cannot write page larger than Integer.MAX_VALUE bytes: " +
-                uncompressedSize);
+                "Cannot write page larger than Integer.MAX_VALUE bytes: " +
+                        uncompressedSize);
       }
-
       BytesInput compressedBytes = compressor.compress(bytes);
+      if (null != pageBlockEncryptor) {
+        AesCipher.quickUpdatePageAAD(dataPageAAD, pageOrdinal);
+        compressedBytes = BytesInput.from(pageBlockEncryptor.encrypt(compressedBytes.toByteArray(), dataPageAAD));
+      }
       long compressedSize = compressedBytes.size();
       if (compressedSize > Integer.MAX_VALUE) {
         throw new ParquetEncodingException(
-            "Cannot write compressed page larger than Integer.MAX_VALUE bytes: "
-                + compressedSize);
+                "Cannot write compressed page larger than Integer.MAX_VALUE bytes: "
+                        + compressedSize);
+      }
+      tempOutputStream.reset();
+      if (null != headerBlockEncryptor) {
+        AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
       }
-
       if (pageWriteChecksumEnabled) {
         crc.reset();
         crc.update(compressedBytes.toByteArray());
-        parquetMetadataConverter.writeDataPageV1Header((int) uncompressedSize, (int) compressedSize,
-            valueCount, rlEncoding, dlEncoding, valuesEncoding, (int) crc.getValue(), buf);
+        parquetMetadataConverter.writeDataPageV1Header(
+                (int)uncompressedSize,
+                (int)compressedSize,
+                valueCount,
+                rlEncoding,
+                dlEncoding,
+                valuesEncoding,
+                (int) crc.getValue(),
+                tempOutputStream,
+                headerBlockEncryptor,
+                dataPageHeaderAAD);
       } else {
-        parquetMetadataConverter.writeDataPageV1Header((int) uncompressedSize, (int) compressedSize,
-            valueCount, rlEncoding, dlEncoding, valuesEncoding, buf);
+        parquetMetadataConverter.writeDataPageV1Header(
+                (int)uncompressedSize,
+                (int)compressedSize,
+                valueCount,
+                rlEncoding,
+                dlEncoding,
+                valuesEncoding,
+                tempOutputStream,
+                headerBlockEncryptor,
+                dataPageHeaderAAD);
       }
-
       this.uncompressedLength += uncompressedSize;
       this.compressedLength += compressedSize;
       this.totalValueCount += valueCount;
       this.pageCount += 1;
 
-      addStatistics(statistics);
-
-      offsetIndexBuilder.add(toIntWithCheck(buf.size() + compressedSize), rowCount);
-
-      compressedBytes.writeAllTo(buf);
-      rlEncodings.add(rlEncoding);
-      dlEncodings.add(dlEncoding);
-      dataEncodings.add(valuesEncoding);
-    }
-
-    private void addStatistics(Statistics statistics) {
       // Copying the statistics if it is not initialized yet so we have the correct typed one
       if (totalStatistics == null) {
         totalStatistics = statistics.copy();
@@ -201,55 +221,81 @@ public class ParquetColumnChunkPageWriteStore implements PageWriteStore, Closeab
       }
 
       columnIndexBuilder.add(statistics);
+      offsetIndexBuilder.add(toIntWithCheck(tempOutputStream.size() + compressedSize), rowCount);
+
+      // by concatenating before writing instead of writing twice,
+      // we only allocate one buffer to copy into instead of multiple.
+      BytesInput.concat(BytesInput.from(tempOutputStream), compressedBytes).writeAllTo(buf); // is used instead of above
+      rlEncodings.add(rlEncoding);
+      dlEncodings.add(dlEncoding);
+      dataEncodings.add(valuesEncoding);
     }
 
     @Override
-    public void writePageV2(int rowCount,
-                            int nullCount,
-                            int valueCount,
-                            BytesInput repetitionLevels,
-                            BytesInput definitionLevels,
-                            Encoding dataEncoding,
-                            BytesInput data,
-                            Statistics<?> statistics) throws IOException {
+    public void writePageV2(
+            int rowCount, int nullCount, int valueCount,
+            BytesInput repetitionLevels, BytesInput definitionLevels,
+            Encoding dataEncoding, BytesInput data,
+            Statistics<?> statistics) throws IOException {
+      pageOrdinal++;
+
       int rlByteLength = toIntWithCheck(repetitionLevels.size());
       int dlByteLength = toIntWithCheck(definitionLevels.size());
       int uncompressedSize = toIntWithCheck(
-          data.size() + repetitionLevels.size() + definitionLevels.size()
+              data.size() + repetitionLevels.size() + definitionLevels.size()
       );
+      // TODO: decide if we compress
       BytesInput compressedData = compressor.compress(data);
+      if (null != pageBlockEncryptor) {
+        AesCipher.quickUpdatePageAAD(dataPageAAD, pageOrdinal);
+        compressedData = BytesInput.from(pageBlockEncryptor.encrypt(compressedData.toByteArray(), dataPageAAD));
+      }
       int compressedSize = toIntWithCheck(
-          compressedData.size() + repetitionLevels.size() + definitionLevels.size()
+              compressedData.size() + repetitionLevels.size() + definitionLevels.size()
       );
+      tempOutputStream.reset();
+      if (null != headerBlockEncryptor) {
+        AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
+      }
       parquetMetadataConverter.writeDataPageV2Header(
-          uncompressedSize, compressedSize,
-          valueCount, nullCount, rowCount,
-          statistics,
-          dataEncoding,
-          rlByteLength,
-          dlByteLength,
-          buf);
+              uncompressedSize, compressedSize,
+              valueCount, nullCount, rowCount,
+              dataEncoding,
+              rlByteLength,
+              dlByteLength,
+              tempOutputStream,
+              headerBlockEncryptor,
+              dataPageHeaderAAD);
       this.uncompressedLength += uncompressedSize;
       this.compressedLength += compressedSize;
       this.totalValueCount += valueCount;
       this.pageCount += 1;
 
-      addStatistics(statistics);
-
-      offsetIndexBuilder.add(toIntWithCheck(buf.size() + compressedSize), rowCount);
-
-      repetitionLevels.writeAllTo(buf);
-      definitionLevels.writeAllTo(buf);
-      compressedData.writeAllTo(buf);
+      // Copying the statistics if it is not initialized yet so we have the correct typed one
+      if (totalStatistics == null) {
+        totalStatistics = statistics.copy();
+      } else {
+        totalStatistics.mergeStatistics(statistics);
+      }
 
+      columnIndexBuilder.add(statistics);
+      offsetIndexBuilder.add(toIntWithCheck((long) tempOutputStream.size() + compressedSize), rowCount);
+
+      // by concatenating before writing instead of writing twice,
+      // we only allocate one buffer to copy into instead of multiple.
+      BytesInput.concat(
+              BytesInput.from(tempOutputStream),
+              repetitionLevels,
+              definitionLevels,
+              compressedData).writeAllTo(buf);
       dataEncodings.add(dataEncoding);
     }
 
     private int toIntWithCheck(long size) {
       if (size > Integer.MAX_VALUE) {
         throw new ParquetEncodingException(
-            "Cannot write page larger than " + Integer.MAX_VALUE + " bytes: " +
-                size);
+                "Cannot write page larger than " + Integer.MAX_VALUE + " bytes: " +
+                        size);
       }
       return (int)size;
     }
@@ -259,35 +305,64 @@ public class ParquetColumnChunkPageWriteStore implements PageWriteStore, Closeab
       return buf.size();
     }
 
-    /**
-     * Writes a number of pages within corresponding column chunk
-     * @param writer the parquet file writer
-     * @throws IOException if the file can not be created
-     */
     public void writeToFileWriter(ParquetFileWriter writer) throws IOException {
-      writer.writeColumnChunk(path, totalValueCount, compressor.getCodecName(),
-          dictionaryPage, BytesInput.from(buf), uncompressedLength, compressedLength, totalStatistics,
-          columnIndexBuilder, offsetIndexBuilder, rlEncodings, dlEncodings, dataEncodings);
-      if (logger.isDebugEnabled()) {
-        logger.debug(
-            String.format(
-                "written %,dB for %s: %,d values, %,dB raw, %,dB comp, %d pages, encodings: %s",
-                buf.size(), path, totalValueCount, uncompressedLength, compressedLength, pageCount, new HashSet<>(dataEncodings))
-                + (dictionaryPage != null ? String.format(
-                ", dic { %,d entries, %,dB raw, %,dB comp}",
-                dictionaryPage.getDictionarySize(), dictionaryPage.getUncompressedSize(), dictionaryPage.getDictionarySize())
-                : "")
-        );
+      if (null == headerBlockEncryptor) {
+        writer.writeColumnChunk(
+                path,
+                totalValueCount,
+                compressor.getCodecName(),
+                dictionaryPage,
+                BytesInput.from(buf),
+                uncompressedLength,
+                compressedLength,
+                totalStatistics,
+                columnIndexBuilder,
+                offsetIndexBuilder,
+                bloomFilter,
+                rlEncodings,
+                dlEncodings,
+                dataEncodings);
+      } else {
+        writer.writeColumnChunk(
+                path,
+                totalValueCount,
+                compressor.getCodecName(),
+                dictionaryPage,
+                BytesInput.from(buf),
+                uncompressedLength,
+                compressedLength,
+                totalStatistics,
+                columnIndexBuilder,
+                offsetIndexBuilder,
+                bloomFilter,
+                rlEncodings,
+                dlEncodings,
+                dataEncodings,
+                headerBlockEncryptor,
+                rowGroupOrdinal,
+                columnOrdinal,
+                fileAAD);
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+                String.format(
+                        "written %,dB for %s: %,d values, %,dB raw, %,dB comp, %d pages, encodings: %s",
+                        buf.size(), path, totalValueCount, uncompressedLength, compressedLength, pageCount, new HashSet<Encoding>(dataEncodings))
+                        + (dictionaryPage != null ? String.format(
+                        ", dic { %,d entries, %,dB raw, %,dB comp}",
+                        dictionaryPage.getDictionarySize(), dictionaryPage.getUncompressedSize(), dictionaryPage.getDictionarySize())
+                        : ""));
       }
       rlEncodings.clear();
       dlEncodings.clear();
       dataEncodings.clear();
       pageCount = 0;
+      pageOrdinal = -1;
     }
 
     @Override
     public long allocatedSize() {
-      return buf.getCapacity();
+      return buf.size();
     }
 
     @Override
@@ -298,8 +373,13 @@ public class ParquetColumnChunkPageWriteStore implements PageWriteStore, Closeab
       BytesInput dictionaryBytes = dictionaryPage.getBytes();
       int uncompressedSize = (int)dictionaryBytes.size();
       BytesInput compressedBytes = compressor.compress(dictionaryBytes);
+      if (null != pageBlockEncryptor) {
+        byte[] dictonaryPageAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DictionaryPage,
+                rowGroupOrdinal, columnOrdinal, -1);
+        compressedBytes = BytesInput.from(pageBlockEncryptor.encrypt(compressedBytes.toByteArray(), dictonaryPageAAD));
+      }
       this.dictionaryPage = new DictionaryPage(BytesInput.copy(compressedBytes), uncompressedSize,
-          dictionaryPage.getDictionarySize(), dictionaryPage.getEncoding());
+              dictionaryPage.getDictionarySize(), dictionaryPage.getEncoding());
     }
 
     @Override
@@ -308,9 +388,96 @@ public class ParquetColumnChunkPageWriteStore implements PageWriteStore, Closeab
     }
 
     @Override
+    public void writeBloomFilter(BloomFilter bloomFilter) {
+      this.bloomFilter = bloomFilter;
+    }
+
+    @Override
     public void close() {
+      tempOutputStream.close();
       buf.close();
     }
   }
 
+  private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new HashMap<ColumnDescriptor, ColumnChunkPageWriter>();
+  private final MessageType schema;
+
+  public ParquetColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int initialSlabSize,
+                                          int maxCapacityHint, ByteBufferAllocator allocator,
+                                          int columnIndexTruncateLength) {
+    this(compressor, schema, initialSlabSize, maxCapacityHint, allocator, columnIndexTruncateLength,
+            ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED);
+  }
+
+  public ParquetColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int initialSlabSize,
+                                          int maxCapacityHint, ByteBufferAllocator allocator,
+                                          int columnIndexTruncateLength, boolean pageWriteChecksumEnabled) {
+    this.schema = schema;
+    for (ColumnDescriptor path : schema.getColumns()) {
+      writers.put(path, new ColumnChunkPageWriter(path, compressor, initialSlabSize, maxCapacityHint, allocator, columnIndexTruncateLength,
+              pageWriteChecksumEnabled, null, null, null, -1, -1));
+    }
+  }
+
+  public ParquetColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int initialSlabSize,
+                                          int maxCapacityHint, ByteBufferAllocator allocator,
+                                          int columnIndexTruncateLength, boolean pageWriteChecksumEnabled,
+                                          InternalFileEncryptor fileEncryptor, int rowGroupOrdinal) {
+    this.schema = schema;
+    if (null == fileEncryptor) {
+      for (ColumnDescriptor path : schema.getColumns()) {
+        writers.put(path, new ColumnChunkPageWriter(path, compressor, initialSlabSize, maxCapacityHint, allocator,
+                columnIndexTruncateLength, pageWriteChecksumEnabled, null, null,
+                null, -1, -1));
+      }
+      return;
+    }
+
+    // Encrypted file
+    int columnOrdinal = -1;
+    byte[] fileAAD = fileEncryptor.getFileAAD();
+    for (ColumnDescriptor path : schema.getColumns()) {
+      columnOrdinal++;
+      BlockCipher.Encryptor headerBlockEncryptor = null;
+      BlockCipher.Encryptor pageBlockEncryptor = null;
+      ColumnPath columnPath = ColumnPath.get(path.getPath());
+
+      InternalColumnEncryptionSetup columnSetup = fileEncryptor.getColumnSetup(columnPath, true, columnOrdinal);
+      if (columnSetup.isEncrypted()) {
+        headerBlockEncryptor = columnSetup.getMetaDataEncryptor();
+        pageBlockEncryptor = columnSetup.getDataEncryptor();
+      }
+
+      writers.put(path,  new ColumnChunkPageWriter(path, compressor, initialSlabSize, maxCapacityHint, allocator,
+              columnIndexTruncateLength, pageWriteChecksumEnabled, headerBlockEncryptor, pageBlockEncryptor, fileAAD,
+              rowGroupOrdinal, columnOrdinal));
+    }
+  }
+
+  @Override
+  public PageWriter getPageWriter(ColumnDescriptor path) {
+    return writers.get(path);
+  }
+
+  @Override
+  public BloomFilterWriter getBloomFilterWriter(ColumnDescriptor path) {
+    return writers.get(path);
+  }
+
+
+
+  public void flushToFileWriter(ParquetFileWriter writer) throws IOException {
+    for (ColumnDescriptor path : schema.getColumns()) {
+      ColumnChunkPageWriter pageWriter = writers.get(path);
+      pageWriter.writeToFileWriter(writer);
+    }
+  }
+
+  @Override
+  public void close() {
+    for (ColumnChunkPageWriter pageWriter : writers.values()) {
+      pageWriter.close();
+    }
+  }
+
 }
diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
new file mode 100644
index 0000000..f90f4c8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -0,0 +1,1634 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static org.apache.parquet.format.Util.writeFileCryptoMetaData;
+import static org.apache.parquet.format.Util.writeFileMetaData;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.MAX_STATS_SIZE;
+import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
+import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.zip.CRC32;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.Version;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.EncodingStats;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.ColumnEncryptionProperties;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.InternalColumnEncryptionSetup;
+import org.apache.parquet.crypto.InternalFileEncryptor;
+import org.apache.parquet.crypto.ModuleCipherFactory;
+import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.format.BlockCipher;
+import org.apache.parquet.format.Util;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.StrictKeyValueMetadataMergeStrategy;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.hadoop.metadata.GlobalMetaData;
+import org.apache.parquet.hadoop.metadata.KeyValueMetadataMergeStrategy;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+import org.apache.parquet.hadoop.util.HadoopStreams;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder;
+import org.apache.parquet.internal.hadoop.metadata.IndexReference;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.OutputFile;
+import org.apache.parquet.io.SeekableInputStream;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.io.PositionOutputStream;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.TypeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Internal implementation of the Parquet file writer as a block container<br>
+ * Note: this is temporary Drill-Parquet class needed to write empty parquet files. Details in
+ * <a href="https://issues.apache.org/jira/browse/PARQUET-2026">PARQUET-2026</a> and
+ * <a href="https://issues.apache.org/jira/browse/DRILL-7907">DRILL-7907</a>
+ */
+public class ParquetFileWriter {
+    private static final Logger LOG = LoggerFactory.getLogger(ParquetFileWriter.class);
+
+    private final ParquetMetadataConverter metadataConverter;
+
+    public static final String PARQUET_METADATA_FILE = "_metadata";
+    public static final String MAGIC_STR = "PAR1";
+    public static final byte[] MAGIC = MAGIC_STR.getBytes(StandardCharsets.US_ASCII);
+    public static final String EF_MAGIC_STR = "PARE";
+    public static final byte[] EFMAGIC = EF_MAGIC_STR.getBytes(StandardCharsets.US_ASCII);
+    public static final String PARQUET_COMMON_METADATA_FILE = "_common_metadata";
+    public static final int CURRENT_VERSION = 1;
+
+    // File creation modes
+    public static enum Mode {
+        CREATE,
+        OVERWRITE
+    }
+
+    protected final PositionOutputStream out;
+
+    private final MessageType schema;
+    private final AlignmentStrategy alignment;
+    private final int columnIndexTruncateLength;
+
+    // file data
+    private List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
+
+    // The column/offset indexes per blocks per column chunks
+    private final List<List<ColumnIndex>> columnIndexes = new ArrayList<>();
+    private final List<List<OffsetIndex>> offsetIndexes = new ArrayList<>();
+
+    // The Bloom filters
+    private final List<Map<String, BloomFilter>> bloomFilters = new ArrayList<>();
+
+    // The file encryptor
+    private final InternalFileEncryptor fileEncryptor;
+
+    // row group data
+    private BlockMetaData currentBlock; // appended to by endColumn
+
+    // The column/offset indexes for the actual block
+    private List<ColumnIndex> currentColumnIndexes;
+    private List<OffsetIndex> currentOffsetIndexes;
+
+    // The Bloom filter for the actual block
+    private Map<String, BloomFilter> currentBloomFilters;
+
+    // row group data set at the start of a row group
+    private long currentRecordCount; // set in startBlock
+
+    // column chunk data accumulated as pages are written
+    private EncodingStats.Builder encodingStatsBuilder;
+    private Set<Encoding> currentEncodings;
+    private long uncompressedLength;
+    private long compressedLength;
+    private Statistics currentStatistics; // accumulated in writePage(s)
+    private ColumnIndexBuilder columnIndexBuilder;
+    private OffsetIndexBuilder offsetIndexBuilder;
+
+    // column chunk data set at the start of a column
+    private CompressionCodecName currentChunkCodec; // set in startColumn
+    private ColumnPath currentChunkPath;            // set in startColumn
+    private PrimitiveType currentChunkType;         // set in startColumn
+    private long currentChunkValueCount;            // set in startColumn
+    private long currentChunkFirstDataPage;         // set in startColumn & page writes
+    private long currentChunkDictionaryPageOffset;  // set in writeDictionaryPage
+
+    // set when end is called
+    private ParquetMetadata footer = null;
+
+    private final CRC32 crc;
+    private boolean pageWriteChecksumEnabled;
+
+    /**
+     * Captures the order in which methods should be called
+     */
+    private enum STATE {
+        NOT_STARTED {
+            STATE start() {
+                return STARTED;
+            }
+        },
+        STARTED {
+            STATE startBlock() {
+                return BLOCK;
+            }
+            STATE end() {
+                return ENDED;
+            }
+        },
+        BLOCK  {
+            STATE startColumn() {
+                return COLUMN;
+            }
+            STATE endBlock() {
+                return STARTED;
+            }
+        },
+        COLUMN {
+            STATE endColumn() {
+                return BLOCK;
+            };
+            STATE write() {
+                return this;
+            }
+        },
+        ENDED;
+
+        STATE start() throws IOException { return error(); }
+        STATE startBlock() throws IOException { return error(); }
+        STATE startColumn() throws IOException { return error(); }
+        STATE write() throws IOException { return error(); }
+        STATE endColumn() throws IOException { return error(); }
+        STATE endBlock() throws IOException { return error(); }
+        STATE end() throws IOException { return error(); }
+
+        private final STATE error() throws IOException {
+            throw new IOException("The file being written is in an invalid state. Probably caused by an error thrown previously. Current state: " + this.name());
+        }
+    }
+
+    private STATE state = STATE.NOT_STARTED;
+
+    /**
+     * @param configuration Hadoop configuration
+     * @param schema the schema of the data
+     * @param file the file to write to
+     * @throws IOException if the file can not be created
+     * @deprecated will be removed in 2.0.0
+     */
+    @Deprecated
+    public ParquetFileWriter(Configuration configuration, MessageType schema,
+                             Path file) throws IOException {
+        this(HadoopOutputFile.fromPath(file, configuration),
+                schema, Mode.CREATE, DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT);
+    }
+
+    /**
+     * @param configuration Hadoop configuration
+     * @param schema the schema of the data
+     * @param file the file to write to
+     * @param mode file creation mode
+     * @throws IOException if the file can not be created
+     * @deprecated will be removed in 2.0.0
+     */
+    @Deprecated
+    public ParquetFileWriter(Configuration configuration, MessageType schema,
+                             Path file, Mode mode) throws IOException {
+        this(HadoopOutputFile.fromPath(file, configuration),
+                schema, mode, DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT);
+    }
+
+    /**
+     * @param configuration Hadoop configuration
+     * @param schema the schema of the data
+     * @param file the file to write to
+     * @param mode file creation mode
+     * @param rowGroupSize the row group size
+     * @param maxPaddingSize the maximum padding
+     * @throws IOException if the file can not be created
+     * @deprecated will be removed in 2.0.0
+     */
+    @Deprecated
+    public ParquetFileWriter(Configuration configuration, MessageType schema,
+                             Path file, Mode mode, long rowGroupSize,
+                             int maxPaddingSize)
+            throws IOException {
+        this(HadoopOutputFile.fromPath(file, configuration),
+                schema, mode, rowGroupSize, maxPaddingSize);
+    }
+
+    /**
+     * @param file OutputFile to create or overwrite
+     * @param schema the schema of the data
+     * @param mode file creation mode
+     * @param rowGroupSize the row group size
+     * @param maxPaddingSize the maximum padding
+     * @throws IOException if the file can not be created
+     * @deprecated will be removed in 2.0.0
+     */
+    @Deprecated
+    public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode,
+                             long rowGroupSize, int maxPaddingSize)
+            throws IOException {
+        this(file, schema, mode, rowGroupSize, maxPaddingSize,
+                ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH,
+                ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH,
+                ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED);
+    }
+
+    /**
+     * @param file OutputFile to create or overwrite
+     * @param schema the schema of the data
+     * @param mode file creation mode
+     * @param rowGroupSize the row group size
+     * @param maxPaddingSize the maximum padding
+     * @param columnIndexTruncateLength the length which the min/max values in column indexes tried to be truncated to
+     * @param statisticsTruncateLength the length which the min/max values in row groups tried to be truncated to
+     * @param pageWriteChecksumEnabled whether to write out page level checksums
+     * @throws IOException if the file can not be created
+     */
+    public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode,
+                             long rowGroupSize, int maxPaddingSize, int columnIndexTruncateLength,
+                             int statisticsTruncateLength, boolean pageWriteChecksumEnabled)
+            throws IOException{
+        this(file, schema, mode, rowGroupSize, maxPaddingSize, columnIndexTruncateLength,
+                statisticsTruncateLength, pageWriteChecksumEnabled, null);
+    }
+
+    public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode,
+                             long rowGroupSize, int maxPaddingSize, int columnIndexTruncateLength,
+                             int statisticsTruncateLength, boolean pageWriteChecksumEnabled,
+                             FileEncryptionProperties encryptionProperties)
+            throws IOException {
+        TypeUtil.checkValidWriteSchema(schema);
+
+        this.schema = schema;
+
+        long blockSize = rowGroupSize;
+        if (file.supportsBlockSize()) {
+            blockSize = Math.max(file.defaultBlockSize(), rowGroupSize);
+            this.alignment = PaddingAlignment.get(blockSize, rowGroupSize, maxPaddingSize);
+        } else {
+            this.alignment = NoAlignment.get(rowGroupSize);
+        }
+
+        if (mode == Mode.OVERWRITE) {
+            this.out = file.createOrOverwrite(blockSize);
+        } else {
+            this.out = file.create(blockSize);
+        }
+
+        this.encodingStatsBuilder = new EncodingStats.Builder();
+        this.columnIndexTruncateLength = columnIndexTruncateLength;
+        this.pageWriteChecksumEnabled = pageWriteChecksumEnabled;
+        this.crc = pageWriteChecksumEnabled ? new CRC32() : null;
+
+        this.metadataConverter = new ParquetMetadataConverter(statisticsTruncateLength);
+
+        if (null == encryptionProperties) {
+            this.fileEncryptor = null;
+        } else {
+            // Verify that every encrypted column is in file schema
+            Map<ColumnPath, ColumnEncryptionProperties> columnEncryptionProperties = encryptionProperties.getEncryptedColumns();
+            if (null != columnEncryptionProperties) { // if null, every column in file schema will be encrypted with footer key
+                for (Map.Entry<ColumnPath, ColumnEncryptionProperties> entry : columnEncryptionProperties.entrySet()) {
+                    String[] path = entry.getKey().toArray();
+                    if(!schema.containsPath(path)) {
+                        throw new ParquetCryptoRuntimeException("Encrypted column " + Arrays.toString(path) + " not in file schema");
+                    }
+                }
+            }
+            this.fileEncryptor = new InternalFileEncryptor(encryptionProperties);
+        }
+    }
+
+    /**
+     * FOR TESTING ONLY. This supports testing block padding behavior on the local FS.
+     *
+     * @param configuration Hadoop configuration
+     * @param schema the schema of the data
+     * @param file the file to write to
+     * @param rowAndBlockSize the row group size
+     * @param maxPaddingSize the maximum padding
+     * @throws IOException if the file can not be created
+     */
+    ParquetFileWriter(Configuration configuration, MessageType schema,
+                      Path file, long rowAndBlockSize, int maxPaddingSize)
+            throws IOException {
+        FileSystem fs = file.getFileSystem(configuration);
+        this.schema = schema;
+        this.alignment = PaddingAlignment.get(
+                rowAndBlockSize, rowAndBlockSize, maxPaddingSize);
+        this.out = HadoopStreams.wrap(
+                fs.create(file, true, 8192, fs.getDefaultReplication(file), rowAndBlockSize));
+        this.encodingStatsBuilder = new EncodingStats.Builder();
+        // no truncation is needed for testing
+        this.columnIndexTruncateLength = Integer.MAX_VALUE;
+        this.pageWriteChecksumEnabled = ParquetOutputFormat.getPageWriteChecksumEnabled(configuration);
+        this.crc = pageWriteChecksumEnabled ? new CRC32() : null;
+        this.metadataConverter = new ParquetMetadataConverter(ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH);
+        this.fileEncryptor = null;
+    }
+    /**
+     * start the file
+     * @throws IOException if there is an error while writing
+     */
+    public void start() throws IOException {
+        state = state.start();
+        LOG.debug("{}: start", out.getPos());
+        byte[] magic = MAGIC;
+        if (null != fileEncryptor && fileEncryptor.isFooterEncrypted()) {
+            magic = EFMAGIC;
+        }
+        out.write(magic);
+    }
+
+    InternalFileEncryptor getEncryptor() {
+        return fileEncryptor;
+    }
+
+    /**
+     * start a block
+     * @param recordCount the record count in this block
+     * @throws IOException if there is an error while writing
+     */
+    public void startBlock(long recordCount) throws IOException {
+        state = state.startBlock();
+        LOG.debug("{}: start block", out.getPos());
+//    out.write(MAGIC); // TODO: add a magic delimiter
+
+        alignment.alignForRowGroup(out);
+
+        currentBlock = new BlockMetaData();
+        currentRecordCount = recordCount;
+
+        currentColumnIndexes = new ArrayList<>();
+        currentOffsetIndexes = new ArrayList<>();
+
+        currentBloomFilters = new HashMap<>();
+    }
+
+    /**
+     * start a column inside a block
+     * @param descriptor the column descriptor
+     * @param valueCount the value count in this column
+     * @param compressionCodecName a compression codec name
+     * @throws IOException if there is an error while writing
+     */
+    public void startColumn(ColumnDescriptor descriptor,
+                            long valueCount,
+                            CompressionCodecName compressionCodecName) throws IOException {
+        state = state.startColumn();
+        encodingStatsBuilder.clear();
+        currentEncodings = new HashSet<Encoding>();
+        currentChunkPath = ColumnPath.get(descriptor.getPath());
+        currentChunkType = descriptor.getPrimitiveType();
+        currentChunkCodec = compressionCodecName;
+        currentChunkValueCount = valueCount;
+        currentChunkFirstDataPage = -1;
+        compressedLength = 0;
+        uncompressedLength = 0;
+        // The statistics will be copied from the first one added at writeDataPage(s) so we have the correct typed one
+        currentStatistics = null;
+
+        columnIndexBuilder = ColumnIndexBuilder.getBuilder(currentChunkType, columnIndexTruncateLength);
+        offsetIndexBuilder = OffsetIndexBuilder.getBuilder();
+    }
+
+    /**
+     * writes a dictionary page page
+     * @param dictionaryPage the dictionary page
+     * @throws IOException if there is an error while writing
+     */
+    public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException {
+        writeDictionaryPage(dictionaryPage, null, null);
+    }
+
+    public void writeDictionaryPage(DictionaryPage dictionaryPage,
+                                    BlockCipher.Encryptor headerBlockEncryptor, byte[] AAD) throws IOException {
+        state = state.write();
+        LOG.debug("{}: write dictionary page: {} values", out.getPos(), dictionaryPage.getDictionarySize());
+        currentChunkDictionaryPageOffset = out.getPos();
+        int uncompressedSize = dictionaryPage.getUncompressedSize();
+        int compressedPageSize = (int)dictionaryPage.getBytes().size(); // TODO: fix casts
+        if (pageWriteChecksumEnabled) {
+            crc.reset();
+            crc.update(dictionaryPage.getBytes().toByteArray());
+            metadataConverter.writeDictionaryPageHeader(
+                    uncompressedSize,
+                    compressedPageSize,
+                    dictionaryPage.getDictionarySize(),
+                    dictionaryPage.getEncoding(),
+                    (int) crc.getValue(),
+                    out,
+                    headerBlockEncryptor,
+                    AAD);
+        } else {
+            metadataConverter.writeDictionaryPageHeader(
+                    uncompressedSize,
+                    compressedPageSize,
+                    dictionaryPage.getDictionarySize(),
+                    dictionaryPage.getEncoding(),
+                    out,
+                    headerBlockEncryptor,
+                    AAD);
+        }
+        long headerSize = out.getPos() - currentChunkDictionaryPageOffset;
+        this.uncompressedLength += uncompressedSize + headerSize;
+        this.compressedLength += compressedPageSize + headerSize;
+        LOG.debug("{}: write dictionary page content {}", out.getPos(), compressedPageSize);
+        dictionaryPage.getBytes().writeAllTo(out); // for encrypted column, dictionary page bytes are already encrypted
+        encodingStatsBuilder.addDictEncoding(dictionaryPage.getEncoding());
+        currentEncodings.add(dictionaryPage.getEncoding());
+    }
+
+
+    /**
+     * writes a single page
+     * @param valueCount count of values
+     * @param uncompressedPageSize the size of the data once uncompressed
+     * @param bytes the compressed data for the page without header
+     * @param rlEncoding encoding of the repetition level
+     * @param dlEncoding encoding of the definition level
+     * @param valuesEncoding encoding of values
+     * @throws IOException if there is an error while writing
+     */
+    @Deprecated
+    public void writeDataPage(
+            int valueCount, int uncompressedPageSize,
+            BytesInput bytes,
+            Encoding rlEncoding,
+            Encoding dlEncoding,
+            Encoding valuesEncoding) throws IOException {
+        state = state.write();
+        // We are unable to build indexes without rowCount so skip them for this column
+        offsetIndexBuilder = OffsetIndexBuilder.getNoOpBuilder();
+        columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder();
+        long beforeHeader = out.getPos();
+        LOG.debug("{}: write data page: {} values", beforeHeader, valueCount);
+        int compressedPageSize = (int)bytes.size();
+        metadataConverter.writeDataPageV1Header(
+                uncompressedPageSize, compressedPageSize,
+                valueCount,
+                rlEncoding,
+                dlEncoding,
+                valuesEncoding,
+                out);
+        long headerSize = out.getPos() - beforeHeader;
+        this.uncompressedLength += uncompressedPageSize + headerSize;
+        this.compressedLength += compressedPageSize + headerSize;
+        LOG.debug("{}: write data page content {}", out.getPos(), compressedPageSize);
+        bytes.writeAllTo(out);
+        encodingStatsBuilder.addDataEncoding(valuesEncoding);
+        currentEncodings.add(rlEncoding);
+        currentEncodings.add(dlEncoding);
+        currentEncodings.add(valuesEncoding);
+        if (currentChunkFirstDataPage < 0) {
+            currentChunkFirstDataPage = beforeHeader;
+        }
+    }
+
+    /**
+     * writes a single page
+     * @param valueCount count of values
+     * @param uncompressedPageSize the size of the data once uncompressed
+     * @param bytes the compressed data for the page without header
+     * @param statistics statistics for the page
+     * @param rlEncoding encoding of the repetition level
+     * @param dlEncoding encoding of the definition level
+     * @param valuesEncoding encoding of values
+     * @throws IOException if there is an error while writing
+     * @deprecated this method does not support writing column indexes; Use
+     *             {@link #writeDataPage(int, int, BytesInput, Statistics, long, Encoding, Encoding, Encoding)} instead
+     */
+    @Deprecated
+    public void writeDataPage(
+            int valueCount, int uncompressedPageSize,
+            BytesInput bytes,
+            Statistics statistics,
+            Encoding rlEncoding,
+            Encoding dlEncoding,
+            Encoding valuesEncoding) throws IOException {
+        // We are unable to build indexes without rowCount so skip them for this column
+        offsetIndexBuilder = OffsetIndexBuilder.getNoOpBuilder();
+        columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder();
+        innerWriteDataPage(valueCount, uncompressedPageSize, bytes, statistics, rlEncoding, dlEncoding, valuesEncoding);
+    }
+
+    /**
+     * Writes a single page
+     * @param valueCount count of values
+     * @param uncompressedPageSize the size of the data once uncompressed
+     * @param bytes the compressed data for the page without header
+     * @param statistics the statistics of the page
+     * @param rowCount the number of rows in the page
+     * @param rlEncoding encoding of the repetition level
+     * @param dlEncoding encoding of the definition level
+     * @param valuesEncoding encoding of values
+     * @throws IOException if any I/O error occurs during writing the file
+     */
+    public void writeDataPage(
+            int valueCount, int uncompressedPageSize,
+            BytesInput bytes,
+            Statistics statistics,
+            long rowCount,
+            Encoding rlEncoding,
+            Encoding dlEncoding,
+            Encoding valuesEncoding) throws IOException {
+        long beforeHeader = out.getPos();
+        innerWriteDataPage(valueCount, uncompressedPageSize, bytes, statistics, rlEncoding, dlEncoding, valuesEncoding);
+
+        offsetIndexBuilder.add((int) (out.getPos() - beforeHeader), rowCount);
+    }
+
+    private void innerWriteDataPage(
+            int valueCount, int uncompressedPageSize,
+            BytesInput bytes,
+            Statistics statistics,
+            Encoding rlEncoding,
+            Encoding dlEncoding,
+            Encoding valuesEncoding) throws IOException {
+        state = state.write();
+        long beforeHeader = out.getPos();
+        if (currentChunkFirstDataPage < 0) {
+            currentChunkFirstDataPage = beforeHeader;
+        }
+        LOG.debug("{}: write data page: {} values", beforeHeader, valueCount);
+        int compressedPageSize = (int) bytes.size();
+        if (pageWriteChecksumEnabled) {
+            crc.reset();
+            crc.update(bytes.toByteArray());
+            metadataConverter.writeDataPageV1Header(
+                    uncompressedPageSize, compressedPageSize,
+                    valueCount,
+                    rlEncoding,
+                    dlEncoding,
+                    valuesEncoding,
+                    (int) crc.getValue(),
+                    out);
+        } else {
+            metadataConverter.writeDataPageV1Header(
+                    uncompressedPageSize, compressedPageSize,
+                    valueCount,
+                    rlEncoding,
+                    dlEncoding,
+                    valuesEncoding,
+                    out);
+        }
+        long headerSize = out.getPos() - beforeHeader;
+        this.uncompressedLength += uncompressedPageSize + headerSize;
+        this.compressedLength += compressedPageSize + headerSize;
+        LOG.debug("{}: write data page content {}", out.getPos(), compressedPageSize);
+        bytes.writeAllTo(out);
+
+        // Copying the statistics if it is not initialized yet so we have the correct typed one
+        if (currentStatistics == null) {
+            currentStatistics = statistics.copy();
+        } else {
+            currentStatistics.mergeStatistics(statistics);
+        }
+
+        columnIndexBuilder.add(statistics);
+
+        encodingStatsBuilder.addDataEncoding(valuesEncoding);
+        currentEncodings.add(rlEncoding);
+        currentEncodings.add(dlEncoding);
+        currentEncodings.add(valuesEncoding);
+    }
+
+    /**
+     * Add a Bloom filter that will be written out. This is only used in unit test.
+     *
+     * @param column the column name
+     * @param bloomFilter the bloom filter of column values
+     */
+    void addBloomFilter(String column, BloomFilter bloomFilter) {
+        currentBloomFilters.put(column, bloomFilter);
+    }
+
+    /**
+     * Writes a single v2 data page
+     * @param rowCount count of rows
+     * @param nullCount count of nulls
+     * @param valueCount count of values
+     * @param repetitionLevels repetition level bytes
+     * @param definitionLevels definition level bytes
+     * @param dataEncoding encoding for data
+     * @param compressedData compressed data bytes
+     * @param uncompressedDataSize the size of uncompressed data
+     * @param statistics the statistics of the page
+     * @throws IOException if any I/O error occurs during writing the file
+     */
+    public void writeDataPageV2(int rowCount, int nullCount, int valueCount,
+                                BytesInput repetitionLevels,
+                                BytesInput definitionLevels,
+                                Encoding dataEncoding,
+                                BytesInput compressedData,
+                                int uncompressedDataSize,
+                                Statistics<?> statistics) throws IOException {
+        state = state.write();
+        int rlByteLength = toIntWithCheck(repetitionLevels.size());
+        int dlByteLength = toIntWithCheck(definitionLevels.size());
+
+        int compressedSize = toIntWithCheck(
+                compressedData.size() + repetitionLevels.size() + definitionLevels.size()
+        );
+
+        int uncompressedSize = toIntWithCheck(
+                uncompressedDataSize + repetitionLevels.size() + definitionLevels.size()
+        );
+
+        long beforeHeader = out.getPos();
+        if (currentChunkFirstDataPage < 0) {
+            currentChunkFirstDataPage = beforeHeader;
+        }
+
+        metadataConverter.writeDataPageV2Header(
+                uncompressedSize, compressedSize,
+                valueCount, nullCount, rowCount,
+                dataEncoding,
+                rlByteLength,
+                dlByteLength,
+                out);
+
+        long headersSize  = out.getPos() - beforeHeader;
+        this.uncompressedLength += uncompressedSize + headersSize;
+        this.compressedLength += compressedSize + headersSize;
+
+        if (currentStatistics == null) {
+            currentStatistics = statistics.copy();
+        } else {
+            currentStatistics.mergeStatistics(statistics);
+        }
+
+        columnIndexBuilder.add(statistics);
+        currentEncodings.add(dataEncoding);
+        encodingStatsBuilder.addDataEncoding(dataEncoding);
+
+        BytesInput.concat(repetitionLevels, definitionLevels, compressedData)
+                .writeAllTo(out);
+
+        offsetIndexBuilder.add((int) (out.getPos() - beforeHeader), rowCount);
+    }
+
+    /**
+     * Writes a column chunk at once
+     * @param descriptor the descriptor of the column
+     * @param valueCount the value count in this column
+     * @param compressionCodecName the name of the compression codec used for compressing the pages
+     * @param dictionaryPage the dictionary page for this column chunk (might be null)
+     * @param bytes the encoded pages including page headers to be written as is
+     * @param uncompressedTotalPageSize total uncompressed size (without page headers)
+     * @param compressedTotalPageSize total compressed size (without page headers)
+     * @param totalStats accumulated statistics for the column chunk
+     * @param columnIndexBuilder the builder object for the column index
+     * @param offsetIndexBuilder the builder object for the offset index
+     * @param bloomFilter the bloom filter for this column
+     * @param rlEncodings the RL encodings used in this column chunk
+     * @param dlEncodings the DL encodings used in this column chunk
+     * @param dataEncodings the data encodings used in this column chunk
+     * @throws IOException if there is an error while writing
+     */
+    void writeColumnChunk(ColumnDescriptor descriptor,
+                          long valueCount,
+                          CompressionCodecName compressionCodecName,
+                          DictionaryPage dictionaryPage,
+                          BytesInput bytes,
+                          long uncompressedTotalPageSize,
+                          long compressedTotalPageSize,
+                          Statistics<?> totalStats,
+                          ColumnIndexBuilder columnIndexBuilder,
+                          OffsetIndexBuilder offsetIndexBuilder,
+                          BloomFilter bloomFilter,
+                          Set<Encoding> rlEncodings,
+                          Set<Encoding> dlEncodings,
+                          List<Encoding> dataEncodings) throws IOException {
+        writeColumnChunk(descriptor, valueCount, compressionCodecName, dictionaryPage, bytes,
+                uncompressedTotalPageSize, compressedTotalPageSize, totalStats, columnIndexBuilder, offsetIndexBuilder,
+                bloomFilter, rlEncodings, dlEncodings, dataEncodings, null, 0, 0, null);
+    }
+
+    void writeColumnChunk(ColumnDescriptor descriptor,
+                          long valueCount,
+                          CompressionCodecName compressionCodecName,
+                          DictionaryPage dictionaryPage,
+                          BytesInput bytes,
+                          long uncompressedTotalPageSize,
+                          long compressedTotalPageSize,
+                          Statistics<?> totalStats,
+                          ColumnIndexBuilder columnIndexBuilder,
+                          OffsetIndexBuilder offsetIndexBuilder,
+                          BloomFilter bloomFilter,
+                          Set<Encoding> rlEncodings,
+                          Set<Encoding> dlEncodings,
+                          List<Encoding> dataEncodings,
+                          BlockCipher.Encryptor headerBlockEncryptor,
+                          int rowGroupOrdinal,
+                          int columnOrdinal,
+                          byte[] fileAAD) throws IOException {
+        startColumn(descriptor, valueCount, compressionCodecName);
+
+        state = state.write();
+        if (dictionaryPage != null) {
+            byte[] dictonaryPageHeaderAAD = null;
+            if (null != headerBlockEncryptor) {
+                dictonaryPageHeaderAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DictionaryPageHeader,
+                        rowGroupOrdinal, columnOrdinal, -1);
+            }
+            writeDictionaryPage(dictionaryPage, headerBlockEncryptor, dictonaryPageHeaderAAD);
+        }
+
+        if (bloomFilter != null) {
+            // write bloom filter if one of data pages is not dictionary encoded
+            boolean isWriteBloomFilter = false;
+            for (Encoding encoding : dataEncodings) {
+                if (encoding != Encoding.RLE_DICTIONARY) {
+                    isWriteBloomFilter = true;
+                    break;
+                }
+            }
+            if (isWriteBloomFilter) {
+                currentBloomFilters.put(String.join(".", descriptor.getPath()), bloomFilter);
+            }
+        }
+        LOG.debug("{}: write data pages", out.getPos());
+        long headersSize = bytes.size() - compressedTotalPageSize;
+        this.uncompressedLength += uncompressedTotalPageSize + headersSize;
+        this.compressedLength += compressedTotalPageSize + headersSize;
+        LOG.debug("{}: write data pages content", out.getPos());
+        currentChunkFirstDataPage = out.getPos();
+        bytes.writeAllTo(out);
+        encodingStatsBuilder.addDataEncodings(dataEncodings);
+        if (rlEncodings.isEmpty()) {
+            encodingStatsBuilder.withV2Pages();
+        }
+        currentEncodings.addAll(rlEncodings);
+        currentEncodings.addAll(dlEncodings);
+        currentEncodings.addAll(dataEncodings);
+        currentStatistics = totalStats;
+
+        this.columnIndexBuilder = columnIndexBuilder;
+        this.offsetIndexBuilder = offsetIndexBuilder;
+
+        endColumn();
+    }
+
+    /**
+     * end a column (once all rep, def and data have been written)
+     * @throws IOException if there is an error while writing
+     */
+    public void endColumn() throws IOException {
+        state = state.endColumn();
+        LOG.debug("{}: end column", out.getPos());
+        if (columnIndexBuilder.getMinMaxSize() > columnIndexBuilder.getPageCount() * MAX_STATS_SIZE) {
+            currentColumnIndexes.add(null);
+        } else {
+            currentColumnIndexes.add(columnIndexBuilder.build());
+        }
+        currentOffsetIndexes.add(offsetIndexBuilder.build(currentChunkFirstDataPage));
+        currentBlock.addColumn(ColumnChunkMetaData.get(
+                currentChunkPath,
+                currentChunkType,
+                currentChunkCodec,
+                encodingStatsBuilder.build(),
+                currentEncodings,
+                currentStatistics,
+                currentChunkFirstDataPage,
+                currentChunkDictionaryPageOffset,
+                currentChunkValueCount,
+                compressedLength,
+                uncompressedLength));
+        this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + uncompressedLength);
+        this.uncompressedLength = 0;
+        this.compressedLength = 0;
+        columnIndexBuilder = null;
+        offsetIndexBuilder = null;
+    }
+
+    /**
+     * ends a block once all column chunks have been written
+     * @throws IOException if there is an error while writing
+     */
+    public void endBlock() throws IOException {
+//        if (currentRecordCount == 0) {
+//            throw new ParquetEncodingException("End block with zero record");
+//        }
+
+        state = state.endBlock();
+        LOG.debug("{}: end block", out.getPos());
+        currentBlock.setRowCount(currentRecordCount);
+        currentBlock.setOrdinal(blocks.size());
+        blocks.add(currentBlock);
+        columnIndexes.add(currentColumnIndexes);
+        offsetIndexes.add(currentOffsetIndexes);
+        bloomFilters.add(currentBloomFilters);
+        currentColumnIndexes = null;
+        currentOffsetIndexes = null;
+        currentBloomFilters = null;
+        currentBlock = null;
+    }
+
+    /**
+     * @param conf a configuration
+     * @param file a file path to append the contents of to this file
+     * @throws IOException if there is an error while reading or writing
+     * @deprecated will be removed in 2.0.0; use {@link #appendFile(InputFile)} instead
+     */
+    @Deprecated
+    public void appendFile(Configuration conf, Path file) throws IOException {
+        ParquetFileReader.open(conf, file).appendTo(this);
+    }
+
+    public void appendFile(InputFile file) throws IOException {
+        try (ParquetFileReader reader = ParquetFileReader.open(file)) {
+            reader.appendTo(this);
+        }
+    }
+
+    /**
+     * @param file a file stream to read from
+     * @param rowGroups row groups to copy
+     * @param dropColumns whether to drop columns from the file that are not in this file's schema
+     * @throws IOException if there is an error while reading or writing
+     * @deprecated will be removed in 2.0.0;
+     *             use {@link #appendRowGroups(SeekableInputStream,List,boolean)} instead
+     */
+    @Deprecated
+    public void appendRowGroups(FSDataInputStream file,
+                                List<BlockMetaData> rowGroups,
+                                boolean dropColumns) throws IOException {
+        appendRowGroups(HadoopStreams.wrap(file), rowGroups, dropColumns);
+    }
+
+    public void appendRowGroups(SeekableInputStream file,
+                                List<BlockMetaData> rowGroups,
+                                boolean dropColumns) throws IOException {
+        for (BlockMetaData block : rowGroups) {
+            appendRowGroup(file, block, dropColumns);
+        }
+    }
+
+    /**
+     * @param from a file stream to read from
+     * @param rowGroup row group to copy
+     * @param dropColumns whether to drop columns from the file that are not in this file's schema
+     * @throws IOException if there is an error while reading or writing
+     * @deprecated will be removed in 2.0.0;
+     *             use {@link #appendRowGroup(SeekableInputStream,BlockMetaData,boolean)} instead
+     */
+    @Deprecated
+    public void appendRowGroup(FSDataInputStream from, BlockMetaData rowGroup,
+                               boolean dropColumns) throws IOException {
+        appendRowGroup(HadoopStreams.wrap(from), rowGroup, dropColumns);
+    }
+
+    public void appendRowGroup(SeekableInputStream from, BlockMetaData rowGroup,
+                               boolean dropColumns) throws IOException {
+        startBlock(rowGroup.getRowCount());
+
+        Map<String, ColumnChunkMetaData> columnsToCopy =
+                new HashMap<String, ColumnChunkMetaData>();
+        for (ColumnChunkMetaData chunk : rowGroup.getColumns()) {
+            columnsToCopy.put(chunk.getPath().toDotString(), chunk);
+        }
+
+        List<ColumnChunkMetaData> columnsInOrder =
+                new ArrayList<ColumnChunkMetaData>();
+
+        for (ColumnDescriptor descriptor : schema.getColumns()) {
+            String path = ColumnPath.get(descriptor.getPath()).toDotString();
+            ColumnChunkMetaData chunk = columnsToCopy.remove(path);
+            if (chunk != null) {
+                columnsInOrder.add(chunk);
+            } else {
+                throw new IllegalArgumentException(String.format(
+                        "Missing column '%s', cannot copy row group: %s", path, rowGroup));
+            }
+        }
+
+        // complain if some columns would be dropped and that's not okay
+        if (!dropColumns && !columnsToCopy.isEmpty()) {
+            throw new IllegalArgumentException(String.format(
+                    "Columns cannot be copied (missing from target schema): %s",
+                    String.join(", ", columnsToCopy.keySet())));
+        }
+
+        // copy the data for all chunks
+        long start = -1;
+        long length = 0;
+        long blockUncompressedSize = 0L;
+        for (int i = 0; i < columnsInOrder.size(); i += 1) {
+            ColumnChunkMetaData chunk = columnsInOrder.get(i);
+
+            // get this chunk's start position in the new file
+            long newChunkStart = out.getPos() + length;
+
+            // add this chunk to be copied with any previous chunks
+            if (start < 0) {
+                // no previous chunk included, start at this chunk's starting pos
+                start = chunk.getStartingPos();
+            }
+            length += chunk.getTotalSize();
+
+            if ((i + 1) == columnsInOrder.size() ||
+                    columnsInOrder.get(i + 1).getStartingPos() != (start + length)) {
+                // not contiguous. do the copy now.
+                copy(from, out, start, length);
+                // reset to start at the next column chunk
+                start = -1;
+                length = 0;
+            }
+
+            // TODO: column/offset indexes are not copied
+            // (it would require seeking to the end of the file for each row groups)
+            currentColumnIndexes.add(null);
+            currentOffsetIndexes.add(null);
+
+            Offsets offsets = Offsets.getOffsets(from, chunk, newChunkStart);
+            currentBlock.addColumn(ColumnChunkMetaData.get(
+                    chunk.getPath(),
+                    chunk.getPrimitiveType(),
+                    chunk.getCodec(),
+                    chunk.getEncodingStats(),
+                    chunk.getEncodings(),
+                    chunk.getStatistics(),
+                    offsets.firstDataPageOffset,
+                    offsets.dictionaryPageOffset,
+                    chunk.getValueCount(),
+                    chunk.getTotalSize(),
+                    chunk.getTotalUncompressedSize()));
+
+            blockUncompressedSize += chunk.getTotalUncompressedSize();
+        }
+
+        currentBlock.setTotalByteSize(blockUncompressedSize);
+
+        endBlock();
+    }
+
+    /**
+     * @param descriptor the descriptor for the target column
+     * @param from a file stream to read from
+     * @param chunk the column chunk to be copied
+     * @param bloomFilter the bloomFilter for this chunk
+     * @param columnIndex the column index for this chunk
+     * @param offsetIndex the offset index for this chunk
+     * @throws IOException
+     */
+    public void appendColumnChunk(ColumnDescriptor descriptor, SeekableInputStream from, ColumnChunkMetaData chunk,
+                                  BloomFilter bloomFilter, ColumnIndex columnIndex, OffsetIndex offsetIndex) throws IOException {
+        long start = chunk.getStartingPos();
+        long length = chunk.getTotalSize();
+        long newChunkStart = out.getPos();
+
+        copy(from, out, start, length);
+
+        currentBloomFilters.put(String.join(".", descriptor.getPath()), bloomFilter);
+        currentColumnIndexes.add(columnIndex);
+        currentOffsetIndexes.add(offsetIndex);
+
+        Offsets offsets = Offsets.getOffsets(from, chunk, newChunkStart);
+        currentBlock.addColumn(ColumnChunkMetaData.get(
+                chunk.getPath(),
+                chunk.getPrimitiveType(),
+                chunk.getCodec(),
+                chunk.getEncodingStats(),
+                chunk.getEncodings(),
+                chunk.getStatistics(),
+                offsets.firstDataPageOffset,
+                offsets.dictionaryPageOffset,
+                chunk.getValueCount(),
+                chunk.getTotalSize(),
+                chunk.getTotalUncompressedSize()));
+
+        currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + chunk.getTotalUncompressedSize());
+    }
+
+    // Buffers for the copy function.
+    private static final ThreadLocal<byte[]> COPY_BUFFER = ThreadLocal.withInitial(() -> new byte[8192]);
+
+    /**
+     * Copy from a FS input stream to an output stream. Thread-safe
+     *
+     * @param from a {@link SeekableInputStream}
+     * @param to any {@link PositionOutputStream}
+     * @param start where in the from stream to start copying
+     * @param length the number of bytes to copy
+     * @throws IOException if there is an error while reading or writing
+     */
+    private static void copy(SeekableInputStream from, PositionOutputStream to,
+                             long start, long length) throws IOException{
+        LOG.debug("Copying {} bytes at {} to {}", length, start, to.getPos());
+        from.seek(start);
+        long bytesCopied = 0;
+        byte[] buffer = COPY_BUFFER.get();
+        while (bytesCopied < length) {
+            long bytesLeft = length - bytesCopied;
+            int bytesRead = from.read(buffer, 0,
+                    (buffer.length < bytesLeft ? buffer.length : (int) bytesLeft));
+            if (bytesRead < 0) {
+                throw new IllegalArgumentException(
+                        "Unexpected end of input file at " + start + bytesCopied);
+            }
+            to.write(buffer, 0, bytesRead);
+            bytesCopied += bytesRead;
+        }
+    }
+
+    /**
+     * ends a file once all blocks have been written.
+     * closes the file.
+     * @param extraMetaData the extra meta data to write in the footer
+     * @throws IOException if there is an error while writing
+     */
+    public void end(Map<String, String> extraMetaData) throws IOException {
+        state = state.end();
+        serializeColumnIndexes(columnIndexes, blocks, out, fileEncryptor);
+        serializeOffsetIndexes(offsetIndexes, blocks, out, fileEncryptor);
+        serializeBloomFilters(bloomFilters, blocks, out, fileEncryptor);
+        LOG.debug("{}: end", out.getPos());
+        this.footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks);
+        serializeFooter(footer, out, fileEncryptor);
+        out.close();
+    }
+
+    private static void serializeColumnIndexes(
+            List<List<ColumnIndex>> columnIndexes,
+            List<BlockMetaData> blocks,
+            PositionOutputStream out,
+            InternalFileEncryptor fileEncryptor) throws IOException {
+        LOG.debug("{}: column indexes", out.getPos());
+        for (int bIndex = 0, bSize = blocks.size(); bIndex < bSize; ++bIndex) {
+            BlockMetaData block = blocks.get(bIndex);
+            List<ColumnChunkMetaData> columns = block.getColumns();
+            List<ColumnIndex> blockColumnIndexes = columnIndexes.get(bIndex);
+            for (int cIndex = 0, cSize = columns.size(); cIndex < cSize; ++cIndex) {
+                ColumnChunkMetaData column = columns.get(cIndex);
+                org.apache.parquet.format.ColumnIndex columnIndex = ParquetMetadataConverter
+                        .toParquetColumnIndex(column.getPrimitiveType(), blockColumnIndexes.get(cIndex));
+                if (columnIndex == null) {
+                    continue;
+                }
+                BlockCipher.Encryptor columnIndexEncryptor = null;
+                byte[] columnIndexAAD = null;
+                if (null != fileEncryptor) {
+                    InternalColumnEncryptionSetup columnEncryptionSetup = fileEncryptor.getColumnSetup(column.getPath(), false, cIndex);
+                    if (columnEncryptionSetup.isEncrypted()) {
+                        columnIndexEncryptor = columnEncryptionSetup.getMetaDataEncryptor();
+                        columnIndexAAD = AesCipher.createModuleAAD(fileEncryptor.getFileAAD(), ModuleType.ColumnIndex,
+                                block.getOrdinal(), columnEncryptionSetup.getOrdinal(), -1);
+                    }
+                }
+                long offset = out.getPos();
+                Util.writeColumnIndex(columnIndex, out, columnIndexEncryptor, columnIndexAAD);
+                column.setColumnIndexReference(new IndexReference(offset, (int) (out.getPos() - offset)));
+            }
+        }
+    }
+
+    private int toIntWithCheck(long size) {
+        if ((int)size != size) {
+            throw new ParquetEncodingException("Cannot write page larger than " + Integer.MAX_VALUE + " bytes: " + size);
+        }
+        return (int)size;
+    }
+
+    private static void serializeOffsetIndexes(
+            List<List<OffsetIndex>> offsetIndexes,
+            List<BlockMetaData> blocks,
+            PositionOutputStream out,
+            InternalFileEncryptor fileEncryptor) throws IOException {
+        LOG.debug("{}: offset indexes", out.getPos());
+        for (int bIndex = 0, bSize = blocks.size(); bIndex < bSize; ++bIndex) {
+            BlockMetaData block = blocks.get(bIndex);
+            List<ColumnChunkMetaData> columns = block.getColumns();
+            List<OffsetIndex> blockOffsetIndexes = offsetIndexes.get(bIndex);
+            for (int cIndex = 0, cSize = columns.size(); cIndex < cSize; ++cIndex) {
+                OffsetIndex offsetIndex = blockOffsetIndexes.get(cIndex);
+                if (offsetIndex == null) {
+                    continue;
+                }
+                ColumnChunkMetaData column = columns.get(cIndex);
+                BlockCipher.Encryptor offsetIndexEncryptor = null;
+                byte[] offsetIndexAAD = null;
+                if (null != fileEncryptor) {
+                    InternalColumnEncryptionSetup columnEncryptionSetup = fileEncryptor.getColumnSetup(column.getPath(), false, cIndex);
+                    if (columnEncryptionSetup.isEncrypted()) {
+                        offsetIndexEncryptor = columnEncryptionSetup.getMetaDataEncryptor();
+                        offsetIndexAAD = AesCipher.createModuleAAD(fileEncryptor.getFileAAD(), ModuleType.OffsetIndex,
+                                block.getOrdinal(), columnEncryptionSetup.getOrdinal(), -1);
+                    }
+                }
+                long offset = out.getPos();
+                Util.writeOffsetIndex(ParquetMetadataConverter.toParquetOffsetIndex(offsetIndex), out, offsetIndexEncryptor, offsetIndexAAD);
+                column.setOffsetIndexReference(new IndexReference(offset, (int) (out.getPos() - offset)));
+            }
+        }
+    }
+
+    private static void serializeBloomFilters(
+            List<Map<String, BloomFilter>> bloomFilters,
+            List<BlockMetaData> blocks,
+            PositionOutputStream out,
+            InternalFileEncryptor fileEncryptor) throws IOException {
+        LOG.debug("{}: bloom filters", out.getPos());
+        for (int bIndex = 0, bSize = blocks.size(); bIndex < bSize; ++bIndex) {
+            BlockMetaData block = blocks.get(bIndex);
+            List<ColumnChunkMetaData> columns = block.getColumns();
+            Map<String, BloomFilter> blockBloomFilters = bloomFilters.get(bIndex);
+            if (blockBloomFilters.isEmpty()) {
+                continue;
+            }
+            for (int cIndex = 0, cSize = columns.size(); cIndex < cSize; ++cIndex) {
+                ColumnChunkMetaData column = columns.get(cIndex);
+                BloomFilter bloomFilter = blockBloomFilters.get(column.getPath().toDotString());
+                if (bloomFilter == null) {
+                    continue;
+                }
+
+                long offset = out.getPos();
+                column.setBloomFilterOffset(offset);
+
+                BlockCipher.Encryptor bloomFilterEncryptor = null;
+                byte[] bloomFilterHeaderAAD = null;
+                byte[] bloomFilterBitsetAAD = null;
+                if (null != fileEncryptor) {
+                    InternalColumnEncryptionSetup columnEncryptionSetup = fileEncryptor.getColumnSetup(column.getPath(), false, cIndex);
+                    if (columnEncryptionSetup.isEncrypted()) {
+                        bloomFilterEncryptor = columnEncryptionSetup.getMetaDataEncryptor();
+                        int columnOrdinal = columnEncryptionSetup.getOrdinal();
+                        bloomFilterHeaderAAD = AesCipher.createModuleAAD(fileEncryptor.getFileAAD(), ModuleType.BloomFilterHeader,
+                                block.getOrdinal(), columnOrdinal, -1);
+                        bloomFilterBitsetAAD = AesCipher.createModuleAAD(fileEncryptor.getFileAAD(), ModuleType.BloomFilterBitset,
+                                block.getOrdinal(), columnOrdinal, -1);
+                    }
+                }
+
+                Util.writeBloomFilterHeader(ParquetMetadataConverter.toBloomFilterHeader(bloomFilter), out,
+                        bloomFilterEncryptor, bloomFilterHeaderAAD);
+
+                ByteArrayOutputStream tempOutStream = new ByteArrayOutputStream();
+                bloomFilter.writeTo(tempOutStream);
+                byte[] serializedBitset = tempOutStream.toByteArray();
+                if (null != bloomFilterEncryptor) {
+                    serializedBitset = bloomFilterEncryptor.encrypt(serializedBitset, bloomFilterBitsetAAD);
+                }
+                out.write(serializedBitset);
+            }
+        }
+    }
+
+    private static void serializeFooter(ParquetMetadata footer, PositionOutputStream out,
+                                        InternalFileEncryptor fileEncryptor) throws IOException {
+
+        ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter();
+
+        // Unencrypted file
+        if (null == fileEncryptor) {
+            long footerIndex = out.getPos();
+            org.apache.parquet.format.FileMetaData parquetMetadata = metadataConverter.toParquetMetadata(CURRENT_VERSION, footer);
+            writeFileMetaData(parquetMetadata, out);
+            LOG.debug("{}: footer length = {}", out.getPos(), (out.getPos() - footerIndex));
+            BytesUtils.writeIntLittleEndian(out, (int) (out.getPos() - footerIndex));
+            out.write(MAGIC);
+            return;
+        }
+
+        org.apache.parquet.format.FileMetaData parquetMetadata =
+                metadataConverter.toParquetMetadata(CURRENT_VERSION, footer, fileEncryptor);
+
+        // Encrypted file with plaintext footer
+        if (!fileEncryptor.isFooterEncrypted()) {
+            long footerIndex = out.getPos();
+            parquetMetadata.setEncryption_algorithm(fileEncryptor.getEncryptionAlgorithm());
+            // create footer signature (nonce + tag of encrypted footer)
+            byte[] footerSigningKeyMetaData = fileEncryptor.getFooterSigningKeyMetaData();
+            if (null != footerSigningKeyMetaData) {
+                parquetMetadata.setFooter_signing_key_metadata(footerSigningKeyMetaData);
+            }
+            ByteArrayOutputStream tempOutStream = new ByteArrayOutputStream();
+            writeFileMetaData(parquetMetadata, tempOutStream);
+            byte[] serializedFooter = tempOutStream.toByteArray();
+            byte[] footerAAD = AesCipher.createFooterAAD(fileEncryptor.getFileAAD());
+            byte[] encryptedFooter = fileEncryptor.getSignedFooterEncryptor().encrypt(serializedFooter, footerAAD);
+            byte[] signature = new byte[AesCipher.NONCE_LENGTH + AesCipher.GCM_TAG_LENGTH];
+            System.arraycopy(encryptedFooter, ModuleCipherFactory.SIZE_LENGTH, signature, 0, AesCipher.NONCE_LENGTH); // copy Nonce
+            System.arraycopy(encryptedFooter, encryptedFooter.length - AesCipher.GCM_TAG_LENGTH,
+                    signature, AesCipher.NONCE_LENGTH, AesCipher.GCM_TAG_LENGTH); // copy GCM Tag
+            out.write(serializedFooter);
+            out.write(signature);
+            LOG.debug("{}: footer and signature length = {}", out.getPos(), (out.getPos() - footerIndex));
+            BytesUtils.writeIntLittleEndian(out, (int) (out.getPos() - footerIndex));
+            out.write(MAGIC);
+            return;
+        }
+
+        // Encrypted file with encrypted footer
+        long cryptoFooterIndex = out.getPos();
+        writeFileCryptoMetaData(fileEncryptor.getFileCryptoMetaData(), out);
+        byte[] footerAAD = AesCipher.createFooterAAD(fileEncryptor.getFileAAD());
+        writeFileMetaData(parquetMetadata, out, fileEncryptor.getFooterEncryptor(), footerAAD);
+        int combinedMetaDataLength = (int)(out.getPos() - cryptoFooterIndex);
+        LOG.debug("{}: crypto metadata and footer length = {}", out.getPos(), combinedMetaDataLength);
+        BytesUtils.writeIntLittleEndian(out, combinedMetaDataLength);
+        out.write(EFMAGIC);
+    }
+
+    public ParquetMetadata getFooter() {
+        Preconditions.checkState(state == STATE.ENDED, "Cannot return unfinished footer.");
+        return footer;
+    }
+
+    /**
+     * Given a list of metadata files, merge them into a single ParquetMetadata
+     * Requires that the schemas be compatible, and the extraMetadata be exactly equal.
+     * @param files a list of files to merge metadata from
+     * @param conf a configuration
+     * @return merged parquet metadata for the files
+     * @throws IOException if there is an error while writing
+     * @deprecated metadata files are not recommended and will be removed in 2.0.0
+     */
+    @Deprecated
+    public static ParquetMetadata mergeMetadataFiles(List<Path> files,  Configuration conf) throws IOException {
+        return mergeMetadataFiles(files, conf, new StrictKeyValueMetadataMergeStrategy());
+    }
+
+    /**
+     * Given a list of metadata files, merge them into a single ParquetMetadata
+     * Requires that the schemas be compatible, and the extraMetadata be exactly equal.
+     * @param files a list of files to merge metadata from
+     * @param conf a configuration
+     * @param keyValueMetadataMergeStrategy strategy to merge values for same key, if there are multiple
+     * @return merged parquet metadata for the files
+     * @throws IOException if there is an error while writing
+     * @deprecated metadata files are not recommended and will be removed in 2.0.0
+     */
+    @Deprecated
+    public static ParquetMetadata mergeMetadataFiles(List<Path> files, Configuration conf,
+                                                     KeyValueMetadataMergeStrategy keyValueMetadataMergeStrategy) throws IOException {
+        Preconditions.checkArgument(!files.isEmpty(), "Cannot merge an empty list of metadata");
+
+        GlobalMetaData globalMetaData = null;
+        List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
+
+        for (Path p : files) {
+            ParquetMetadata pmd = ParquetFileReader.readFooter(conf, p, ParquetMetadataConverter.NO_FILTER);
+            FileMetaData fmd = pmd.getFileMetaData();
+            globalMetaData = mergeInto(fmd, globalMetaData, true);
+            blocks.addAll(pmd.getBlocks());
+        }
+
+        // collapse GlobalMetaData into a single FileMetaData, which will throw if they are not compatible
+        return new ParquetMetadata(globalMetaData.merge(keyValueMetadataMergeStrategy), blocks);
+    }
+
+    /**
+     * Given a list of metadata files, merge them into a single metadata file.
+     * Requires that the schemas be compatible, and the extraMetaData be exactly equal.
+     * This is useful when merging 2 directories of parquet files into a single directory, as long
+     * as both directories were written with compatible schemas and equal extraMetaData.
+     * @param files a list of files to merge metadata from
+     * @param outputPath path to write merged metadata to
+     * @param conf a configuration
+     * @throws IOException if there is an error while reading or writing
+     * @deprecated metadata files are not recommended and will be removed in 2.0.0
+     */
+    @Deprecated
+    public static void writeMergedMetadataFile(List<Path> files, Path outputPath, Configuration conf) throws IOException {
+        ParquetMetadata merged = mergeMetadataFiles(files, conf);
+        writeMetadataFile(outputPath, merged, outputPath.getFileSystem(conf));
+    }
+
+    /**
+     * writes a _metadata and _common_metadata file
+     * @param configuration the configuration to use to get the FileSystem
+     * @param outputPath the directory to write the _metadata file to
+     * @param footers the list of footers to merge
+     * @throws IOException if there is an error while writing
+     * @deprecated metadata files are not recommended and will be removed in 2.0.0
+     */
+    @Deprecated
+    public static void writeMetadataFile(Configuration configuration, Path outputPath, List<Footer> footers) throws IOException {
+        writeMetadataFile(configuration, outputPath, footers, JobSummaryLevel.ALL);
+    }
+
+    /**
+     * writes _common_metadata file, and optionally a _metadata file depending on the {@link JobSummaryLevel} provided
+     * @param configuration the configuration to use to get the FileSystem
+     * @param outputPath the directory to write the _metadata file to
+     * @param footers the list of footers to merge
+     * @param level level of summary to write
+     * @throws IOException if there is an error while writing
+     * @deprecated metadata files are not recommended and will be removed in 2.0.0
+     */
+    @Deprecated
+    public static void writeMetadataFile(Configuration configuration, Path outputPath, List<Footer> footers, JobSummaryLevel level) throws IOException {
+        Preconditions.checkArgument(level == JobSummaryLevel.ALL || level == JobSummaryLevel.COMMON_ONLY,
+                "Unsupported level: " + level);
+
+        FileSystem fs = outputPath.getFileSystem(configuration);
+        outputPath = outputPath.makeQualified(fs);
+        ParquetMetadata metadataFooter = mergeFooters(outputPath, footers);
+
+        if (level == JobSummaryLevel.ALL) {
+            writeMetadataFile(outputPath, metadataFooter, fs, PARQUET_METADATA_FILE);
+        }
+
+        metadataFooter.getBlocks().clear();
+        writeMetadataFile(outputPath, metadataFooter, fs, PARQUET_COMMON_METADATA_FILE);
+    }
+
+    /**
+     * @deprecated metadata files are not recommended and will be removed in 2.0.0
+     */
+    @Deprecated
+    private static void writeMetadataFile(Path outputPathRoot, ParquetMetadata metadataFooter, FileSystem fs, String parquetMetadataFile)
+            throws IOException {
+        Path metaDataPath = new Path(outputPathRoot, parquetMetadataFile);
+        writeMetadataFile(metaDataPath, metadataFooter, fs);
+    }
+
+    /**
+     * @deprecated metadata files are not recommended and will be removed in 2.0.0
+     */
+    @Deprecated
+    private static void writeMetadataFile(Path outputPath, ParquetMetadata metadataFooter, FileSystem fs)
+            throws IOException {
+        PositionOutputStream metadata = HadoopStreams.wrap(fs.create(outputPath));
+        metadata.write(MAGIC);
+        serializeFooter(metadataFooter, metadata, null);
+        metadata.close();
+    }
+
+    /**
+     * Will merge the metadata of all the footers together
+     * @param root the directory containing all footers
+     * @param footers the list files footers to merge
+     * @return the global meta data for all the footers
+     */
+    static ParquetMetadata mergeFooters(Path root, List<Footer> footers) {
+        return mergeFooters(root, footers, new StrictKeyValueMetadataMergeStrategy());
+    }
+
+    /**
+     * Will merge the metadata of all the footers together
+     * @param root the directory containing all footers
+     * @param footers the list files footers to merge
+     * @param keyValueMergeStrategy strategy to merge values for a given key (if there are multiple values)
+     * @return the global meta data for all the footers
+     */
+    static ParquetMetadata mergeFooters(Path root, List<Footer> footers, KeyValueMetadataMergeStrategy keyValueMergeStrategy) {
+        String rootPath = root.toUri().getPath();
+        GlobalMetaData fileMetaData = null;
+        List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
+        for (Footer footer : footers) {
+            String footerPath = footer.getFile().toUri().getPath();
+            if (!footerPath.startsWith(rootPath)) {
+                throw new ParquetEncodingException(footerPath + " invalid: all the files must be contained in the root " + root);
+            }
+            footerPath = footerPath.substring(rootPath.length());
+            while (footerPath.startsWith("/")) {
+                footerPath = footerPath.substring(1);
+            }
+            fileMetaData = mergeInto(footer.getParquetMetadata().getFileMetaData(), fileMetaData);
+            for (BlockMetaData block : footer.getParquetMetadata().getBlocks()) {
+                block.setPath(footerPath);
+                blocks.add(block);
+            }
+        }
+        return new ParquetMetadata(fileMetaData.merge(keyValueMergeStrategy), blocks);
+    }
+
+    /**
+     * @return the current position in the underlying file
+     * @throws IOException if there is an error while getting the current stream's position
+     */
+    public long getPos() throws IOException {
+        return out.getPos();
+    }
+
+    public long getNextRowGroupSize() throws IOException {
+        return alignment.nextRowGroupSize(out);
+    }
+
+    /**
+     * Will merge the metadata of all the footers together
+     * @param footers the list files footers to merge
+     * @return the global meta data for all the footers
+     */
+    static GlobalMetaData getGlobalMetaData(List<Footer> footers) {
+        return getGlobalMetaData(footers, true);
+    }
+
+    static GlobalMetaData getGlobalMetaData(List<Footer> footers, boolean strict) {
+        GlobalMetaData fileMetaData = null;
+        for (Footer footer : footers) {
+            ParquetMetadata currentMetadata = footer.getParquetMetadata();
+            fileMetaData = mergeInto(currentMetadata.getFileMetaData(), fileMetaData, strict);
+        }
+        return fileMetaData;
+    }
+
+    /**
+     * Will return the result of merging toMerge into mergedMetadata
+     * @param toMerge the metadata toMerge
+     * @param mergedMetadata the reference metadata to merge into
+     * @return the result of the merge
+     */
+    static GlobalMetaData mergeInto(
+            FileMetaData toMerge,
+            GlobalMetaData mergedMetadata) {
+        return mergeInto(toMerge, mergedMetadata, true);
+    }
+
+    static GlobalMetaData mergeInto(
+            FileMetaData toMerge,
+            GlobalMetaData mergedMetadata,
+            boolean strict) {
+        MessageType schema = null;
+        Map<String, Set<String>> newKeyValues = new HashMap<String, Set<String>>();
+        Set<String> createdBy = new HashSet<String>();
+        if (mergedMetadata != null) {
+            schema = mergedMetadata.getSchema();
+            newKeyValues.putAll(mergedMetadata.getKeyValueMetaData());
+            createdBy.addAll(mergedMetadata.getCreatedBy());
+        }
+        if ((schema == null && toMerge.getSchema() != null)
+                || (schema != null && !schema.equals(toMerge.getSchema()))) {
+            schema = mergeInto(toMerge.getSchema(), schema, strict);
+        }
+        for (Entry<String, String> entry : toMerge.getKeyValueMetaData().entrySet()) {
+            Set<String> values = newKeyValues.get(entry.getKey());
+            if (values == null) {
+                values = new LinkedHashSet<String>();
+                newKeyValues.put(entry.getKey(), values);
+            }
+            values.add(entry.getValue());
+        }
+        createdBy.add(toMerge.getCreatedBy());
+        return new GlobalMetaData(
+                schema,
+                newKeyValues,
+                createdBy);
+    }
+
+    /**
+     * will return the result of merging toMerge into mergedSchema
+     * @param toMerge the schema to merge into mergedSchema
+     * @param mergedSchema the schema to append the fields to
+     * @return the resulting schema
+     */
+    static MessageType mergeInto(MessageType toMerge, MessageType mergedSchema) {
+        return mergeInto(toMerge, mergedSchema, true);
+    }
+
+    /**
+     * will return the result of merging toMerge into mergedSchema
+     * @param toMerge the schema to merge into mergedSchema
+     * @param mergedSchema the schema to append the fields to
+     * @param strict should schema primitive types match
+     * @return the resulting schema
+     */
+    static MessageType mergeInto(MessageType toMerge, MessageType mergedSchema, boolean strict) {
+        if (mergedSchema == null) {
+            return toMerge;
+        }
+
+        return mergedSchema.union(toMerge, strict);
+    }
+
+    private interface AlignmentStrategy {
+        void alignForRowGroup(PositionOutputStream out) throws IOException;
+
+        long nextRowGroupSize(PositionOutputStream out) throws IOException;
+    }
+
+    private static class NoAlignment implements AlignmentStrategy {
+        public static NoAlignment get(long rowGroupSize) {
+            return new NoAlignment(rowGroupSize);
+        }
+
+        private final long rowGroupSize;
+
+        private NoAlignment(long rowGroupSize) {
+            this.rowGroupSize = rowGroupSize;
+        }
+
+        @Override
+        public void alignForRowGroup(PositionOutputStream out) {
+        }
+
+        @Override
+        public long nextRowGroupSize(PositionOutputStream out) {
+            return rowGroupSize;
+        }
+    }
+
+    /**
+     * Alignment strategy that pads when less than half the row group size is
+     * left before the next DFS block.
+     */
+    private static class PaddingAlignment implements AlignmentStrategy {
+        private static final byte[] zeros = new byte[4096];
+
+        public static PaddingAlignment get(long dfsBlockSize, long rowGroupSize,
+                                           int maxPaddingSize) {
+            return new PaddingAlignment(dfsBlockSize, rowGroupSize, maxPaddingSize);
+        }
+
+        protected final long dfsBlockSize;
+        protected final long rowGroupSize;
+        protected final int maxPaddingSize;
+
+        private PaddingAlignment(long dfsBlockSize, long rowGroupSize,
+                                 int maxPaddingSize) {
+            this.dfsBlockSize = dfsBlockSize;
+            this.rowGroupSize = rowGroupSize;
+            this.maxPaddingSize = maxPaddingSize;
+        }
+
+        @Override
+        public void alignForRowGroup(PositionOutputStream out) throws IOException {
+            long remaining = dfsBlockSize - (out.getPos() % dfsBlockSize);
+
+            if (isPaddingNeeded(remaining)) {
+                LOG.debug("Adding {} bytes of padding (row group size={}B, block size={}B)", remaining, rowGroupSize, dfsBlockSize);
+                for (; remaining > 0; remaining -= zeros.length) {
+                    out.write(zeros, 0, (int) Math.min((long) zeros.length, remaining));
+                }
+            }
+        }
+
+        @Override
+        public long nextRowGroupSize(PositionOutputStream out) throws IOException {
+            if (maxPaddingSize <= 0) {
+                return rowGroupSize;
+            }
+
+            long remaining = dfsBlockSize - (out.getPos() % dfsBlockSize);
+
+            if (isPaddingNeeded(remaining)) {
+                return rowGroupSize;
+            }
+
+            return Math.min(remaining, rowGroupSize);
+        }
+
+        protected boolean isPaddingNeeded(long remaining) {
+            return (remaining <= maxPaddingSize);
+        }
+    }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetSimpleTestFileGenerator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetSimpleTestFileGenerator.java
index e399ee7..d4a6fcb 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetSimpleTestFileGenerator.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetSimpleTestFileGenerator.java
@@ -46,7 +46,8 @@ import static org.apache.drill.exec.store.parquet.ParquetSimpleTestFileGenerator
  * that are supported by Drill. Embedded types specified in the Parquet specification are not covered by the
  * examples but can be added.
  * To create a new parquet file, define a schema, create a GroupWriter based on the schema, then add values
- * for individual records to the GroupWriter.
+ * for individual records to the GroupWriter.<br>
+ *     TODO: DRILL-7904. To run this tool please use 28.2-jre <guava.version> instead of 19.0 in main POM file
  * @see  org.apache.drill.exec.store.parquet.TestFileGenerator TestFileGenerator
  * @see org.apache.parquet.hadoop.example.GroupWriteSupport GroupWriteSupport
  * @see org.apache.parquet.example.Paper Dremel Example
@@ -65,7 +66,7 @@ public class ParquetSimpleTestFileGenerator {
           "  required int32 rowKey; \n" +
           "  required binary _UTF8  ( UTF8 ) ; \n" +
           "  required binary _Enum  ( ENUM ) ; \n" +
-          //    "    required binary _UUID  ( UUID ) ; \n" +
+          "  required fixed_len_byte_array(16) _UUID  ( UUID ) ; \n" +
           "  required int32  _INT32_RAW  ; \n" +
           "  required int32 _INT_8  ( INT_8 ) ; \n" +
           "  required int32 _INT_16  ( INT_16 ) ; \n" +
@@ -93,7 +94,7 @@ public class ParquetSimpleTestFileGenerator {
           "  required int32 rowKey; \n" +
           "  optional binary _UTF8  ( UTF8 ) ; \n" +
           "  optional binary _Enum  ( ENUM ) ; \n" +
-          //    "    optional binary _UUID  ( UUID ) ; \n" +
+          "  optional fixed_len_byte_array(16) _UUID  ( UUID ) ; \n" +
           "  optional int32  _INT32_RAW  ; \n" +
           "  optional int32 _INT_8  ( INT_8 ) ; \n" +
           "  optional int32 _INT_16  ( INT_16 ) ; \n" +
@@ -123,7 +124,7 @@ public class ParquetSimpleTestFileGenerator {
           "  required group StringTypes { \n" +
           "    required binary _UTF8  ( UTF8 ) ; \n" +
           "    required binary _Enum  ( ENUM ) ; \n" +
-          //      "    required binary _UUID  ( UUID ) ; \n" +
+          "    required fixed_len_byte_array(16) _UUID  ( UUID ) ; \n" +
           "  } \n" +
           "  required group NumericTypes { \n" +
           "    required group Int32 { \n" +
@@ -167,7 +168,7 @@ public class ParquetSimpleTestFileGenerator {
           "  optional group StringTypes { \n" +
           "    optional binary _UTF8  ( UTF8 ) ; \n" +
           "    optional binary _Enum  ( ENUM ) ; \n" +
-          //      "    optional binary _UUID  ( UUID ) ; \n" +
+          "    optional fixed_len_byte_array(16) _UUID  ( UUID ) ; \n" +
           "  } \n" +
           "  optional group NumericTypes { \n" +
           "    optional group Int32 { \n" +
@@ -243,9 +244,12 @@ public class ParquetSimpleTestFileGenerator {
     {
       Group complexGroup = gf.newGroup();
       complexGroup.add("rowKey", ++rowKey);
-      complexGroup.addGroup("StringTypes").append("_UTF8", "UTF8 string" + rowKey).append("_Enum", RANDOM_VALUE
-          .toString());
-      //        .append("_UUID", "00112233445566778899aabbccddeeff");
+      byte[] bytes = new byte[30];
+      Arrays.fill(bytes, (byte) 1);
+      complexGroup.addGroup("StringTypes")
+          .append("_UTF8", "UTF8 string" + rowKey)
+          .append("_Enum", RANDOM_VALUE.toString())
+          .append("_UUID", Binary.fromConstantByteArray(bytes, 0, 16));
       Group numeric = complexGroup.addGroup("NumericTypes");
       numeric.addGroup("Int32")
           .append("_INT32_RAW", 1234567)
@@ -275,9 +279,12 @@ public class ParquetSimpleTestFileGenerator {
     {
       Group complexGroup = gf.newGroup();
       complexGroup.add("rowKey", ++rowKey);
-      complexGroup.addGroup("StringTypes").append("_UTF8", "UTF8 string" + rowKey).append("_Enum", MAX_VALUE
-          .toString());
-      //        .append("_UUID", "00112233445566778899aabbccddeeff");
+      byte[] bytes = new byte[30];
+      Arrays.fill(bytes, (byte) 1);
+      complexGroup.addGroup("StringTypes")
+          .append("_UTF8", "UTF8 string" + rowKey)
+          .append("_Enum", MAX_VALUE.toString())
+          .append("_UUID", Binary.fromConstantByteArray(bytes, 0, 16));
       Group numeric = complexGroup.addGroup("NumericTypes");
       numeric.addGroup("Int32")
           .append("_INT32_RAW", 0x7FFFFFFF)
@@ -293,8 +300,6 @@ public class ParquetSimpleTestFileGenerator {
           .append("_INT_64", 0x7FFFFFFFFFFFFFFFL)
           .append("_UINT_64", 0xFFFFFFFFFFFFFFFFL)
           .append("_DECIMAL_decimal18", 0xFFFFFFFFFFFFFFFFL);
-      byte[] bytes = new byte[30];
-      Arrays.fill(bytes, (byte) 1);
       numeric.addGroup("FixedLen").append("_DECIMAL_fixed_n", Binary.fromConstantByteArray(bytes, 0, 20));
       numeric.addGroup("Binary").append("_DECIMAL_unlimited", Binary.fromConstantByteArray(bytes, 0, 30));
       numeric.addGroup("DateTimeTypes")
@@ -309,9 +314,12 @@ public class ParquetSimpleTestFileGenerator {
     {
       Group complexGroup = gf.newGroup();
       complexGroup.add("rowKey", ++rowKey);
-      complexGroup.addGroup("StringTypes").append("_UTF8", "UTF8 string" + rowKey).append("_Enum", MIN_VALUE
-          .toString());
-      //        .append("_UUID", "00112233445566778899aabbccddeeff");
+      byte[] bytes = new byte[30];
+      Arrays.fill(bytes, (byte) 1);
+      complexGroup.addGroup("StringTypes")
+          .append("_UTF8", "UTF8 string" + rowKey)
+          .append("_Enum", MIN_VALUE.toString())
+          .append("_UUID", Binary.fromConstantByteArray(bytes, 0, 16));
       Group numeric = complexGroup.addGroup("NumericTypes");
       numeric.addGroup("Int32")
           .append("_INT32_RAW", 0x80000000)
@@ -354,8 +362,12 @@ public class ParquetSimpleTestFileGenerator {
       byte[] bytes12 = {'1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'a', 'b' };
       Group simpleGroup = sgf.newGroup();
       simpleGroup.append("rowKey", ++rowKey);
-      simpleGroup.append("_UTF8", "UTF8 string" + rowKey).append("_Enum", RANDOM_VALUE.toString())
-          //        .append("_UUID", "00112233445566778899aabbccddeeff");
+      byte[] bytes = new byte[30];
+      Arrays.fill(bytes, (byte) 1);
+      simpleGroup
+          .append("_UTF8", "UTF8 string" + rowKey)
+          .append("_Enum", RANDOM_VALUE.toString())
+          .append("_UUID", Binary.fromConstantByteArray(bytes, 0, 16))
           .append("_INT32_RAW", 1234567)
           .append("_INT_8", 123)
           .append("_INT_16", 12345)
@@ -383,9 +395,10 @@ public class ParquetSimpleTestFileGenerator {
       byte[] bytes = new byte[30];
       Arrays.fill(bytes, (byte) 1);
       simpleGroup.append("rowKey", ++rowKey);
-      simpleGroup.append("_UTF8", "UTF8 string" + rowKey)
+      simpleGroup
+          .append("_UTF8", "UTF8 string" + rowKey)
           .append("_Enum", MAX_VALUE.toString())
-          //        .append("_UUID", "00112233445566778899aabbccddeeff");
+          .append("_UUID", Binary.fromConstantByteArray(bytes, 0, 16))
           .append("_INT32_RAW", 0x7FFFFFFF)
           .append("_INT_8", 0x7F)
           .append("_INT_16", 0x7FFF)
@@ -411,8 +424,12 @@ public class ParquetSimpleTestFileGenerator {
     {
       Group simpleGroup = sgf.newGroup();
       simpleGroup.append("rowKey", ++rowKey);
-      simpleGroup.append("_UTF8", "UTF8 string" + rowKey).append("_Enum", MIN_VALUE.toString())
-          //        .append("_UUID", "00112233445566778899aabbccddeeff");
+      byte[] bytes = new byte[30];
+      Arrays.fill(bytes, (byte) 1);
+      simpleGroup
+          .append("_UTF8", "UTF8 string" + rowKey)
+          .append("_Enum", MIN_VALUE.toString())
+          .append("_UUID", Binary.fromConstantByteArray(bytes, 0, 16))
           .append("_INT32_RAW", 0x80000000)
           .append("_INT_8", 0xFFFFFF80)
           .append("_INT_16", 0xFFFF8000)
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java
index 92f3d39..e03af04 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java
@@ -887,4 +887,18 @@ public class TestParquetComplex extends BaseTestQuery {
         .baselineValues(2, null)
         .go();
   }
+
+  @Test
+  public void testNewComplexParquetReaderUUID() throws Exception {
+    String query = "select `uuid_req1`, `uuid_opt1`, `uuid_req2` from cp.`store/parquet/complex/uuid.parquet` order by `uuid_req1`, `uuid_opt1`, `uuid_req2` limit 1";
+    byte[] firstValue = {0, 39, -125, -76, -113, 95, 73, -68, -68, 61, -89, -24, 123, -40, 94, -6};
+    byte[] secondValue = {74, -38, 0, -43, -73, 101, 67, -11, -68, -17, -63, 111, -20, 70, -93, -76};
+    testBuilder()
+            .optionSettingQueriesForTestQuery("alter session set `store.parquet.use_new_reader` = false")
+            .sqlQuery(query)
+            .unOrdered()
+            .baselineColumns("uuid_req1", "uuid_opt1", "uuid_req2")
+            .baselineValues(firstValue, null, secondValue)
+            .go();
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetLogicalTypes.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetLogicalTypes.java
index 2aee104..0ac7d0a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetLogicalTypes.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetLogicalTypes.java
@@ -666,4 +666,35 @@ public class TestParquetLogicalTypes extends BaseTestQuery {
         .build().run();
   }
 
+  @Test
+  public void testUUID() throws Exception {
+    String query = "select `_UUID` from cp.`parquet/parquet_test_file_simple.parquet`";
+    byte[] bytes = new byte[16];
+    Arrays.fill(bytes, (byte) 1);
+    testBuilder()
+            .sqlQuery(query)
+            .unOrdered()
+            .baselineColumns("_UUID")
+            .baselineValues(bytes)
+            .baselineValues(bytes)
+            .baselineValues(bytes)
+            .go();
+  }
+
+  @Test
+  public void testNullableVarBinaryUUID() throws Exception {
+    String query = "select `opt1_p1` from cp.`parquet/uuid-simple-fixed-length-array.parquet` order by `opt1_p1` limit 4";
+    byte[] firstValue = {60, -19, 30, -38, -49, 8, 79, 38, -103, -105, -6, -36, 65, -27, -60, -91};
+    byte[] secondValue = {-13, 5, 4, -97, 62, -78, 73, 69, -115, -25, -62, 88, 116, 37, 86, -41};
+
+    testBuilder()
+            .sqlQuery(query)
+            .ordered()
+            .baselineColumns("opt1_p1")
+            .baselineValues(firstValue)
+            .baselineValues(secondValue)
+            .baselineValues(new Object[]{null})
+            .baselineValues(new Object[]{null})
+            .go();
+  }
 }
diff --git a/exec/java-exec/src/test/resources/parquet/parquet_test_file_simple.parquet b/exec/java-exec/src/test/resources/parquet/parquet_test_file_simple.parquet
new file mode 100644
index 0000000..16a555d
Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/parquet_test_file_simple.parquet differ
diff --git a/exec/java-exec/src/test/resources/parquet/uuid-simple-fixed-length-array.parquet b/exec/java-exec/src/test/resources/parquet/uuid-simple-fixed-length-array.parquet
new file mode 100644
index 0000000..e1a793e
Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/uuid-simple-fixed-length-array.parquet differ
diff --git a/exec/java-exec/src/test/resources/store/parquet/complex/uuid.parquet b/exec/java-exec/src/test/resources/store/parquet/complex/uuid.parquet
new file mode 100644
index 0000000..9597a9f
Binary files /dev/null and b/exec/java-exec/src/test/resources/store/parquet/complex/uuid.parquet differ
diff --git a/exec/jdbc-all/pom.xml b/exec/jdbc-all/pom.xml
index 246bc29..f0d79f4 100644
--- a/exec/jdbc-all/pom.xml
+++ b/exec/jdbc-all/pom.xml
@@ -535,7 +535,7 @@
                   This is likely due to you adding new dependencies to a java-exec and not updating the excludes in this module. This is important as it minimizes the size of the dependency of Drill application users.
 
                   </message>
-                  <maxsize>46300000</maxsize>
+                  <maxsize>46600000</maxsize>
                   <minsize>15000000</minsize>
                   <files>
                    <file>${project.build.directory}/drill-jdbc-all-${project.version}.jar</file>
@@ -561,39 +561,6 @@
         <build>
           <plugins>
             <plugin>
-              <groupId>org.apache.maven.plugins</groupId>
-              <artifactId>maven-enforcer-plugin</artifactId>
-              <executions>
-                <execution>
-                  <id>enforce-jdbc-jar-compactness</id>
-                  <goals>
-                    <goal>enforce</goal>
-                  </goals>
-                  <phase>verify</phase>
-                  <configuration>
-                    <rules>
-                      <requireFilesSize>
-                        <message>
-
-                          The file drill-jdbc-all-${project.version}.jar is outside the expected size range.
-
-                          This is likely due to you adding new dependencies to a java-exec and not updating the excludes in this module. This is important as it minimizes the size of the dependency of Drill application users.
-
-                        </message>
-                        <maxsize>46300000</maxsize>
-                        <minsize>15000000</minsize>
-                        <files>
-                          <file>${project.build.directory}/drill-jdbc-all-${project.version}.jar</file>
-                        </files>
-                      </requireFilesSize>
-                    </rules>
-                    <fail>true</fail>
-                  </configuration>
-                </execution>
-              </executions>
-            </plugin>
-
-            <plugin>
               <artifactId>maven-shade-plugin</artifactId>
               <configuration>
                 <shadedArtifactAttached>false</shadedArtifactAttached>
diff --git a/pom.xml b/pom.xml
index 986996e..f65ea39 100644
--- a/pom.xml
+++ b/pom.xml
@@ -49,7 +49,7 @@
     <shaded.guava.version>28.2-jre</shaded.guava.version>
     <guava.version>19.0</guava.version>
     <forkCount>2</forkCount>
-    <parquet.version>1.11.0</parquet.version>
+    <parquet.version>1.12.0</parquet.version>
     <parquet.format.version>2.8.0</parquet.format.version>
     <!--
       For development purposes to be able to use custom Calcite versions (e.g. not present in jitpack