You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by sh...@apache.org on 2021/10/04 21:04:35 UTC

[parquet-mr] branch master updated: PARQUET-2081: Encryption translation tool - Parquet-hadoop (#928)

This is an automated email from the ASF dual-hosted git repository.

shangxinli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 1adc228  PARQUET-2081: Encryption translation tool - Parquet-hadoop (#928)
1adc228 is described below

commit 1adc22804a700d78f8480667d083e91d6147339f
Author: Xinli Shang <sh...@uber.com>
AuthorDate: Mon Oct 4 14:04:27 2021 -0700

    PARQUET-2081: Encryption translation tool - Parquet-hadoop (#928)
    
    * PARQUET-2081: Encryption translation tool - Parquet-hadoop
    
    Summary:
    Design doc - High Throughput CLAC Writer: https://docs.google.com/document/d/1-XdE8-QyDHnBsYrClwNsR8X3ks0JmKJ1-rXq7_th0hc
    
    Added unit tests
    
    Integration tests with real data
    
    * Address feedbacks
    
    * Address more comments
    
    * Revert the refactoring code to avoid execlusion in public api check
    
    * Address more feedbbacks
    
    * Refactor the code to have rewrite offset index always
    
    * Rename methods to reflect the change better
    
    * Use 'encrypt' flag to create different encrytion runtime
    Add checking of encrypted column
    
    * Address comments
    
    * Address more comments
---
 .../column/columnindex/OffsetIndexBuilder.java     |  26 +-
 .../parquet/crypto/ColumnEncryptionProperties.java |  11 +-
 .../format/converter/ParquetMetadataConverter.java |  20 +-
 .../apache/parquet/hadoop/ParquetFileReader.java   |   2 +-
 .../apache/parquet/hadoop/ParquetFileWriter.java   |  79 ++++-
 .../hadoop/metadata/ColumnChunkMetaData.java       |  38 ++-
 .../parquet/hadoop/util/ColumnEncryptor.java       | 329 +++++++++++++++++++++
 .../apache/parquet/hadoop/util/ColumnMasker.java   |   4 +-
 .../parquet/hadoop/util/H2SeekableInputStream.java |   1 +
 .../parquet/hadoop/util/ColumnEncryptorTest.java   | 293 ++++++++++++++++++
 .../hadoop/util/CompressionConveterTest.java       |   7 -
 .../parquet/hadoop/util/EncDecProperties.java      |  99 +++++++
 .../apache/parquet/hadoop/util/TestFileHelper.java | 153 ++++++++++
 13 files changed, 1016 insertions(+), 46 deletions(-)

diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndexBuilder.java
index 4909744..169c82e 100644
--- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndexBuilder.java
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndexBuilder.java
@@ -65,7 +65,7 @@ public class OffsetIndexBuilder {
     public long getFirstRowIndex(int pageIndex) {
       return firstRowIndexes[pageIndex];
     }
-    
+
     @Override
     public int getPageOrdinal(int pageIndex) {
       return pageIndex;
@@ -151,22 +151,36 @@ public class OffsetIndexBuilder {
     return build(0);
   }
 
+  public OffsetIndexBuilder fromOffsetIndex(OffsetIndex offsetIndex) {
+    assert offsetIndex instanceof OffsetIndexImpl;
+    OffsetIndexImpl offsetIndexImpl = (OffsetIndexImpl) offsetIndex;
+    this.offsets.addAll(new LongArrayList(offsetIndexImpl.offsets));
+    this.compressedPageSizes.addAll(new IntArrayList(offsetIndexImpl.compressedPageSizes));
+    this.firstRowIndexes.addAll(new LongArrayList(offsetIndexImpl.firstRowIndexes));
+    this.previousOffset = 0;
+    this.previousPageSize = 0;
+    this.previousRowIndex = 0;
+    this.previousRowCount  = 0;
+
+    return this;
+  }
+
   /**
    * Builds the offset index. Used by the writers to building up {@link OffsetIndex} objects to be
    * written to the Parquet file.
    *
-   * @param firstPageOffset
-   *          the actual offset in the file to be used to translate all the collected offsets
+   * @param shift
+   *          how much to be shifted away 
    * @return the newly created offset index or {@code null} if the {@link OffsetIndex} object would be empty
    */
-  public OffsetIndex build(long firstPageOffset) {
+  public OffsetIndex build(long shift) {
     if (compressedPageSizes.isEmpty()) {
       return null;
     }
     long[] offsets = this.offsets.toLongArray();
-    if (firstPageOffset != 0) {
+    if (shift != 0) {
       for (int i = 0, n = offsets.length; i < n; ++i) {
-        offsets[i] += firstPageOffset;
+        offsets[i] += shift;
       }
     }
     OffsetIndexImpl offsetIndex = new OffsetIndexImpl();
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/ColumnEncryptionProperties.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/ColumnEncryptionProperties.java
index 19419fc..fd89b8c 100755
--- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/ColumnEncryptionProperties.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/ColumnEncryptionProperties.java
@@ -86,7 +86,16 @@ public class ColumnEncryptionProperties {
     return builder(path, true);
   }
 
-  static Builder builder(ColumnPath path, boolean encrypt) {
+  /**
+   * Builder for encrypted columns.
+   * To make sure column path is not misspelled or misplaced,
+   * file writer will verify this column is in file schema.
+   *
+   * @param path Column path
+   * @param encrypt whether or not this column to be encrypted
+   * @return Builder
+   */
+  public static Builder builder(ColumnPath path, boolean encrypt) {
     return new Builder(path, encrypt);
   }
 
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index 3c6e32c..96980a4 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -1791,14 +1791,14 @@ public class ParquetMetadataConverter {
       org.apache.parquet.column.Encoding valuesEncoding,
       OutputStream to,
       BlockCipher.Encryptor blockEncryptor,
-      byte[] AAD) throws IOException {
+      byte[] pageHeaderAAD) throws IOException {
     writePageHeader(newDataPageHeader(uncompressedSize,
                                       compressedSize,
                                       valueCount,
                                       rlEncoding,
                                       dlEncoding,
                                       valuesEncoding),
-                    to, blockEncryptor, AAD);
+                    to, blockEncryptor, pageHeaderAAD);
   }
 
   public void writeDataPageV1Header(
@@ -1824,7 +1824,7 @@ public class ParquetMetadataConverter {
       int crc,
       OutputStream to,
       BlockCipher.Encryptor blockEncryptor,
-      byte[] AAD) throws IOException {
+      byte[] pageHeaderAAD) throws IOException {
     writePageHeader(newDataPageHeader(uncompressedSize,
                                       compressedSize,
                                       valueCount,
@@ -1832,7 +1832,7 @@ public class ParquetMetadataConverter {
                                       dlEncoding,
                                       valuesEncoding,
                                       crc),
-                    to, blockEncryptor, AAD);
+                    to, blockEncryptor, pageHeaderAAD);
   }
 
   public void writeDataPageV2Header(
@@ -1852,13 +1852,13 @@ public class ParquetMetadataConverter {
       org.apache.parquet.column.Encoding dataEncoding,
       int rlByteLength, int dlByteLength,
       OutputStream to, BlockCipher.Encryptor blockEncryptor,
-      byte[] AAD) throws IOException {
+      byte[] pageHeaderAAD) throws IOException {
     writePageHeader(
         newDataPageV2Header(
             uncompressedSize, compressedSize,
             valueCount, nullCount, rowCount,
             dataEncoding,
-            rlByteLength, dlByteLength), to, blockEncryptor, AAD);
+            rlByteLength, dlByteLength), to, blockEncryptor, pageHeaderAAD);
   }
 
   private PageHeader newDataPageV2Header(
@@ -1886,10 +1886,10 @@ public class ParquetMetadataConverter {
   public void writeDictionaryPageHeader(
       int uncompressedSize, int compressedSize, int valueCount,
       org.apache.parquet.column.Encoding valuesEncoding, OutputStream to,
-      BlockCipher.Encryptor blockEncryptor, byte[] AAD) throws IOException {
+      BlockCipher.Encryptor blockEncryptor, byte[] pageHeaderAAD) throws IOException {
     PageHeader pageHeader = new PageHeader(PageType.DICTIONARY_PAGE, uncompressedSize, compressedSize);
     pageHeader.setDictionary_page_header(new DictionaryPageHeader(valueCount, getEncoding(valuesEncoding)));
-    writePageHeader(pageHeader, to, blockEncryptor, AAD);
+    writePageHeader(pageHeader, to, blockEncryptor, pageHeaderAAD);
   }
 
   public void writeDictionaryPageHeader(
@@ -1902,11 +1902,11 @@ public class ParquetMetadataConverter {
   public void writeDictionaryPageHeader(
       int uncompressedSize, int compressedSize, int valueCount,
       org.apache.parquet.column.Encoding valuesEncoding, int crc, OutputStream to,
-      BlockCipher.Encryptor blockEncryptor, byte[] AAD) throws IOException {
+      BlockCipher.Encryptor blockEncryptor, byte[] pageHeaderAAD) throws IOException {
     PageHeader pageHeader = new PageHeader(PageType.DICTIONARY_PAGE, uncompressedSize, compressedSize);
     pageHeader.setCrc(crc);
     pageHeader.setDictionary_page_header(new DictionaryPageHeader(valueCount, getEncoding(valuesEncoding)));
-    writePageHeader(pageHeader, to, blockEncryptor, AAD);
+    writePageHeader(pageHeader, to, blockEncryptor, pageHeaderAAD);
   }
 
   private static BoundaryOrder toParquetBoundaryOrder(
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 3a68e01..63a22d1 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -523,7 +523,7 @@ public class ParquetFileReader implements Closeable {
     }
   }
 
-  private static final ParquetMetadata readFooter(InputFile file, ParquetReadOptions options, SeekableInputStream f) throws IOException {
+  public static final ParquetMetadata readFooter(InputFile file, ParquetReadOptions options, SeekableInputStream f) throws IOException {
     ParquetMetadataConverter converter = new ParquetMetadataConverter(options);
     return readFooter(file, options, f, converter);
   }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 2e5d55c..210d6c4 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -394,7 +394,7 @@ public class ParquetFileWriter {
     out.write(magic);
   }
 
-  InternalFileEncryptor getEncryptor() {
+  public InternalFileEncryptor getEncryptor() {
     return fileEncryptor;
   }
 
@@ -563,7 +563,7 @@ public class ParquetFileWriter {
     // We are unable to build indexes without rowCount so skip them for this column
     offsetIndexBuilder = OffsetIndexBuilder.getNoOpBuilder();
     columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder();
-    innerWriteDataPage(valueCount, uncompressedPageSize, bytes, statistics, rlEncoding, dlEncoding, valuesEncoding);
+    innerWriteDataPage(valueCount, uncompressedPageSize, bytes, statistics, rlEncoding, dlEncoding, valuesEncoding, null, null);
   }
 
   /**
@@ -579,16 +579,42 @@ public class ParquetFileWriter {
    * @throws IOException if any I/O error occurs during writing the file
    */
   public void writeDataPage(
+    int valueCount, int uncompressedPageSize,
+    BytesInput bytes,
+    Statistics statistics,
+    long rowCount,
+    Encoding rlEncoding,
+    Encoding dlEncoding,
+    Encoding valuesEncoding) throws IOException {
+    writeDataPage(valueCount, uncompressedPageSize, bytes, statistics, rowCount, rlEncoding, dlEncoding, valuesEncoding, null, null);
+  }
+
+  /**
+   * Writes a single page
+   * @param valueCount count of values
+   * @param uncompressedPageSize the size of the data once uncompressed
+   * @param bytes the compressed data for the page without header
+   * @param statistics the statistics of the page
+   * @param rowCount the number of rows in the page
+   * @param rlEncoding encoding of the repetition level
+   * @param dlEncoding encoding of the definition level
+   * @param valuesEncoding encoding of values
+   * @param metadataBlockEncryptor encryptor for block data
+   * @param pageHeaderAAD pageHeader AAD
+   * @throws IOException if any I/O error occurs during writing the file
+   */
+  public void writeDataPage(
       int valueCount, int uncompressedPageSize,
       BytesInput bytes,
       Statistics statistics,
       long rowCount,
       Encoding rlEncoding,
       Encoding dlEncoding,
-      Encoding valuesEncoding) throws IOException {
+      Encoding valuesEncoding,
+      BlockCipher.Encryptor metadataBlockEncryptor,
+      byte[] pageHeaderAAD) throws IOException {
     long beforeHeader = out.getPos();
-    innerWriteDataPage(valueCount, uncompressedPageSize, bytes, statistics, rlEncoding, dlEncoding, valuesEncoding);
-
+    innerWriteDataPage(valueCount, uncompressedPageSize, bytes, statistics, rlEncoding, dlEncoding, valuesEncoding, metadataBlockEncryptor, pageHeaderAAD);
     offsetIndexBuilder.add((int) (out.getPos() - beforeHeader), rowCount);
   }
 
@@ -598,7 +624,34 @@ public class ParquetFileWriter {
       Statistics statistics,
       Encoding rlEncoding,
       Encoding dlEncoding,
-      Encoding valuesEncoding) throws IOException {
+      Encoding valuesEncoding,
+      BlockCipher.Encryptor metadataBlockEncryptor,
+      byte[] pageHeaderAAD) throws IOException {
+    writeDataPage(valueCount, uncompressedPageSize, bytes, statistics, rlEncoding, dlEncoding, valuesEncoding, metadataBlockEncryptor, pageHeaderAAD);
+  }
+
+  /**
+   * writes a single page
+   * @param valueCount count of values
+   * @param uncompressedPageSize the size of the data once uncompressed
+   * @param bytes the compressed data for the page without header
+   * @param statistics statistics for the page
+   * @param rlEncoding encoding of the repetition level
+   * @param dlEncoding encoding of the definition level
+   * @param valuesEncoding encoding of values
+   * @param metadataBlockEncryptor encryptor for block data
+   * @param pageHeaderAAD pageHeader AAD
+   * @throws IOException if there is an error while writing
+   */
+  public void writeDataPage(
+    int valueCount, int uncompressedPageSize,
+    BytesInput bytes,
+    Statistics statistics,
+    Encoding rlEncoding,
+    Encoding dlEncoding,
+    Encoding valuesEncoding,
+    BlockCipher.Encryptor metadataBlockEncryptor,
+    byte[] pageHeaderAAD) throws IOException {
     state = state.write();
     long beforeHeader = out.getPos();
     if (currentChunkFirstDataPage < 0) {
@@ -616,7 +669,9 @@ public class ParquetFileWriter {
         dlEncoding,
         valuesEncoding,
         (int) crc.getValue(),
-        out);
+        out,
+        metadataBlockEncryptor,
+        pageHeaderAAD);
     } else {
       metadataConverter.writeDataPageV1Header(
         uncompressedPageSize, compressedPageSize,
@@ -624,7 +679,9 @@ public class ParquetFileWriter {
         rlEncoding,
         dlEncoding,
         valuesEncoding,
-        out);
+        out,
+        metadataBlockEncryptor,
+        pageHeaderAAD);
     }
     long headerSize = out.getPos() - beforeHeader;
     this.uncompressedLength += uncompressedPageSize + headerSize;
@@ -1035,6 +1092,12 @@ public class ParquetFileWriter {
     long length = chunk.getTotalSize();
     long newChunkStart = out.getPos();
 
+    if (newChunkStart != start) {
+      offsetIndex = OffsetIndexBuilder.getBuilder()
+        .fromOffsetIndex(offsetIndex)
+        .build(newChunkStart - start);
+    }
+
     copy(from, out, start, length);
 
     currentBloomFilters.put(String.join(".", descriptor.getPath()), bloomFilter);
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
index ed26618..f149b1b 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
@@ -112,7 +112,7 @@ abstract public class ColumnChunkMetaData {
       long valueCount,
       long totalSize,
       long totalUncompressedSize) {
-    
+
     return get(path, Types.optional(type).named("fake_type"), codec, encodingStats, encodings, statistics,
         firstDataPage, dictionaryPageOffset, valueCount, totalSize, totalUncompressedSize);
   }
@@ -157,14 +157,14 @@ abstract public class ColumnChunkMetaData {
           totalUncompressedSize);
     }
   }
-  
+
   // In sensitive columns, the ColumnMetaData structure is encrypted (with column-specific keys), making the fields like Statistics invisible.
   // Decryption is not performed pro-actively, due to performance and authorization reasons.
   // This method creates an a shell ColumnChunkMetaData object that keeps the encrypted metadata and the decryption tools.
   // These tools will activated later - when/if the column is projected.
-  public static ColumnChunkMetaData getWithEncryptedMetadata(ParquetMetadataConverter parquetMetadataConverter, ColumnPath path, 
+  public static ColumnChunkMetaData getWithEncryptedMetadata(ParquetMetadataConverter parquetMetadataConverter, ColumnPath path,
       PrimitiveType type, byte[] encryptedMetadata, byte[] columnKeyMetadata,
-      InternalFileDecryptor fileDecryptor, int rowGroupOrdinal, int columnOrdinal, 
+      InternalFileDecryptor fileDecryptor, int rowGroupOrdinal, int columnOrdinal,
       String createdBy) {
     return new EncryptedColumnChunkMetaData(parquetMetadataConverter, path, type, encryptedMetadata, columnKeyMetadata,
         fileDecryptor, rowGroupOrdinal, columnOrdinal, createdBy);
@@ -359,8 +359,8 @@ abstract public class ColumnChunkMetaData {
     decryptIfNeeded();
     return "ColumnMetaData{" + properties.toString() + ", " + getFirstDataPageOffset() + "}";
   }
-  
-  public boolean hasDictionaryPage() { 
+
+  public boolean hasDictionaryPage() {
     EncodingStats stats = getEncodingStats();
     if (stats != null) {
       // ensure there is a dictionary page and that it is used to encode data pages
@@ -370,6 +370,14 @@ abstract public class ColumnChunkMetaData {
     Set<Encoding> encodings = getEncodings();
     return (encodings.contains(PLAIN_DICTIONARY) || encodings.contains(RLE_DICTIONARY));
   }
+
+  /**
+   * @return whether or not this column is encrypted
+   */
+  @Private
+  public boolean isEncrypted() {
+    return false;
+  }
 }
 
 class IntColumnChunkMetaData extends ColumnChunkMetaData {
@@ -567,7 +575,7 @@ class EncryptedColumnChunkMetaData extends ColumnChunkMetaData {
   private final ParquetMetadataConverter parquetMetadataConverter;
   private final byte[] encryptedMetadata;
   private final byte[] columnKeyMetadata;
-  private final InternalFileDecryptor fileDecryptor; 
+  private final InternalFileDecryptor fileDecryptor;
 
   private final int columnOrdinal;
   private final PrimitiveType primitiveType;
@@ -577,7 +585,7 @@ class EncryptedColumnChunkMetaData extends ColumnChunkMetaData {
   private boolean decrypted;
   private ColumnChunkMetaData shadowColumnChunkMetaData;
 
-  EncryptedColumnChunkMetaData(ParquetMetadataConverter parquetMetadataConverter, ColumnPath path, PrimitiveType type, 
+  EncryptedColumnChunkMetaData(ParquetMetadataConverter parquetMetadataConverter, ColumnPath path, PrimitiveType type,
       byte[] encryptedMetadata, byte[] columnKeyMetadata,
       InternalFileDecryptor fileDecryptor, int rowGroupOrdinal, int columnOrdinal, String createdBy) {
     super((EncodingStats) null, (ColumnChunkProperties) null);
@@ -603,12 +611,12 @@ class EncryptedColumnChunkMetaData extends ColumnChunkMetaData {
     }
 
     // Decrypt the ColumnMetaData
-    InternalColumnDecryptionSetup columnDecryptionSetup = fileDecryptor.setColumnCryptoMetadata(path, true, false, 
+    InternalColumnDecryptionSetup columnDecryptionSetup = fileDecryptor.setColumnCryptoMetadata(path, true, false,
         columnKeyMetadata, columnOrdinal);
-    
+
     ColumnMetaData metaData;
     ByteArrayInputStream tempInputStream = new ByteArrayInputStream(encryptedMetadata);
-    byte[] columnMetaDataAAD = AesCipher.createModuleAAD(fileDecryptor.getFileAAD(), ModuleType.ColumnMetaData, 
+    byte[] columnMetaDataAAD = AesCipher.createModuleAAD(fileDecryptor.getFileAAD(), ModuleType.ColumnMetaData,
         rowGroupOrdinal, columnOrdinal, -1);
     try {
       metaData = readColumnMetaData(tempInputStream, columnDecryptionSetup.getMetaDataDecryptor(), columnMetaDataAAD);
@@ -664,4 +672,12 @@ class EncryptedColumnChunkMetaData extends ColumnChunkMetaData {
     decryptIfNeeded();
     return shadowColumnChunkMetaData.getStatistics();
   }
+
+  /**
+   * @return whether or not this column is encrypted
+   */
+  @Override
+  public boolean isEncrypted() {
+    return true;
+  }
 }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnEncryptor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnEncryptor.java
new file mode 100644
index 0000000..32ac49a
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnEncryptor.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.InternalColumnEncryptionSetup;
+import org.apache.parquet.crypto.InternalFileEncryptor;
+import org.apache.parquet.format.BlockCipher;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH;
+import static org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
+import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;
+
+/**
+ * This class is for fast rewriting existing file with column encryption
+ *
+ * For columns to be encrypted, all the pages of those columns are read, but decompression/decoding,
+ * it is encrypted immediately and write back.
+ *
+ * For columns not to be encrypted, the whole column chunk will be appended directly to writer.
+ */
+public class ColumnEncryptor {
+  private static class EncryptorRunTime {
+    private final InternalColumnEncryptionSetup colEncrSetup;
+    private final BlockCipher.Encryptor dataEncryptor;
+    private final BlockCipher.Encryptor metaDataEncryptor;
+    private final byte[] fileAAD ;
+
+    private byte[] dataPageHeaderAAD;
+    private byte[] dataPageAAD;
+    private byte[] dictPageHeaderAAD;
+    private byte[] dictPageAAD;
+
+    public EncryptorRunTime(InternalFileEncryptor fileEncryptor, ColumnChunkMetaData chunk,
+                            int blockId, int columnId) throws IOException  {
+      if (fileEncryptor == null) {
+        this.colEncrSetup = null;
+        this.dataEncryptor =  null;
+        this.metaDataEncryptor =  null;
+
+        this.fileAAD =  null;
+        this.dataPageHeaderAAD =  null;
+        this.dataPageAAD =  null;
+        this.dictPageHeaderAAD =  null;
+        this.dictPageAAD =  null;
+      } else {
+        this.colEncrSetup = fileEncryptor.getColumnSetup(chunk.getPath(), true, columnId);
+        this.dataEncryptor = colEncrSetup.getDataEncryptor();
+        this.metaDataEncryptor = colEncrSetup.getMetaDataEncryptor();
+
+        this.fileAAD = fileEncryptor.getFileAAD();
+        this.dataPageHeaderAAD = createAAD(colEncrSetup, ModuleType.DataPageHeader, blockId, columnId);
+        this.dataPageAAD = createAAD(colEncrSetup, ModuleType.DataPage, blockId, columnId);
+        this.dictPageHeaderAAD = createAAD(colEncrSetup, ModuleType.DictionaryPageHeader, blockId, columnId);
+        this.dictPageAAD = createAAD(colEncrSetup, ModuleType.DictionaryPage, blockId, columnId);
+      }
+    }
+
+    private byte[] createAAD(InternalColumnEncryptionSetup colEncrSetup, ModuleType moduleType, int blockId, int columnId) {
+      if (colEncrSetup != null && colEncrSetup.isEncrypted()) {
+        return AesCipher.createModuleAAD(fileAAD, moduleType, blockId, columnId, 0);
+      }
+      return null;
+    }
+
+    public BlockCipher.Encryptor getDataEncryptor() {
+      return this.dataEncryptor;
+    }
+
+    public BlockCipher.Encryptor getMetaDataEncryptor() {
+      return this.metaDataEncryptor;
+    }
+
+    public byte[] getDataPageHeaderAAD() {
+      return this.dataPageHeaderAAD;
+    }
+
+    public byte[] getDataPageAAD() {
+      return this.dataPageAAD;
+    }
+
+    public byte[] getDictPageHeaderAAD() {
+      return this.dictPageHeaderAAD;
+    }
+
+    public byte[] getDictPageAAD() {
+      return this.dictPageAAD;
+    }
+  }
+
+  private Configuration conf;
+
+  public ColumnEncryptor(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Given the input file, encrypt the columns specified by paths, and output the file.
+   * The encryption settings can be specified in the parameter of fileEncryptionProperties
+   * @param inputFile Input file
+   * @param outputFile Output file
+   * @param paths columns to be encrypted
+   * @param fileEncryptionProperties FileEncryptionProperties of the file
+   * @throws IOException
+   */
+  public void encryptColumns(String inputFile, String outputFile, List<String> paths, FileEncryptionProperties fileEncryptionProperties) throws IOException {
+    Path inPath = new Path(inputFile);
+    Path outPath = new Path(outputFile);
+
+    ParquetMetadata metaData = ParquetFileReader.readFooter(conf, inPath, NO_FILTER);
+    MessageType schema = metaData.getFileMetaData().getSchema();
+
+    ParquetFileWriter writer = new ParquetFileWriter(HadoopOutputFile.fromPath(outPath, conf), schema, ParquetFileWriter.Mode.OVERWRITE,
+      DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT, DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, DEFAULT_STATISTICS_TRUNCATE_LENGTH,
+      ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED, fileEncryptionProperties);
+    writer.start();
+
+    try (TransParquetFileReader reader = new TransParquetFileReader(HadoopInputFile.fromPath(inPath, conf), HadoopReadOptions.builder(conf).build())) {
+      processBlocks(reader, writer, metaData, schema, paths);
+    }
+    writer.end(metaData.getFileMetaData().getKeyValueMetaData());
+  }
+
+  private void processBlocks(TransParquetFileReader reader, ParquetFileWriter writer, ParquetMetadata meta,
+                            MessageType schema, List<String> encryptPaths) throws IOException {
+    Set<ColumnPath> encryptColumnsPath = convertToColumnPaths(encryptPaths);
+    int blockId = 0;
+    PageReadStore store = reader.readNextRowGroup();
+
+    while (store != null) {
+      writer.startBlock(store.getRowCount());
+
+      List<ColumnChunkMetaData> columnsInOrder = meta.getBlocks().get(blockId).getColumns();
+      Map<ColumnPath, ColumnDescriptor> descriptorsMap = schema.getColumns().stream().collect(
+        Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
+
+      for (int i = 0; i < columnsInOrder.size(); i += 1) {
+        ColumnChunkMetaData chunk = columnsInOrder.get(i);
+        // If a column is encrypted, we simply throw exception.
+        // Later we can add a feature to trans-encrypt it with different keys
+        if (chunk.isEncrypted()) {
+          throw new IOException("Column " + chunk.getPath().toDotString() + " is already encrypted");
+        }
+        ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
+        processChunk(descriptor, chunk, reader, writer, encryptColumnsPath, blockId, i, meta.getFileMetaData().getCreatedBy());
+      }
+
+      writer.endBlock();
+      store = reader.readNextRowGroup();
+      blockId++;
+    }
+  }
+
+  private void processChunk(ColumnDescriptor descriptor, ColumnChunkMetaData chunk, TransParquetFileReader reader, ParquetFileWriter writer,
+                            Set<ColumnPath> encryptPaths, int blockId, int columnId, String createdBy) throws IOException {
+    reader.setStreamPosition(chunk.getStartingPos());
+    writer.startColumn(descriptor, chunk.getValueCount(), chunk.getCodec());
+    processPages(reader, chunk, writer, createdBy, blockId, columnId, encryptPaths.contains(chunk.getPath()));
+    writer.endColumn();
+  }
+
+  private void processPages(TransParquetFileReader reader, ColumnChunkMetaData chunk, ParquetFileWriter writer,
+                            String createdBy, int blockId, int columnId, boolean encrypt) throws IOException {
+    int pageOrdinal = 0;
+    EncryptorRunTime encryptorRunTime = new EncryptorRunTime(writer.getEncryptor(), chunk, blockId, columnId);
+    DictionaryPage dictionaryPage = null;
+    long readValues = 0;
+    ParquetMetadataConverter converter = new ParquetMetadataConverter();
+    OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
+    reader.setStreamPosition(chunk.getStartingPos());
+    long totalChunkValues = chunk.getValueCount();
+    while (readValues < totalChunkValues) {
+      PageHeader pageHeader = reader.readPageHeader();
+      byte[] pageLoad;
+      switch (pageHeader.type) {
+        case DICTIONARY_PAGE:
+          if (dictionaryPage != null) {
+            throw new IOException("has more than one dictionary page in column chunk");
+          }
+          //No quickUpdatePageAAD needed for dictionary page
+          DictionaryPageHeader dictPageHeader = pageHeader.dictionary_page_header;
+          pageLoad = processPayload(reader, pageHeader.getCompressed_page_size(), encryptorRunTime.getDataEncryptor(), encryptorRunTime.getDictPageAAD(), encrypt);
+          writer.writeDictionaryPage(new DictionaryPage(BytesInput.from(pageLoad),
+                                        pageHeader.getUncompressed_page_size(),
+                                        dictPageHeader.getNum_values(),
+                                        converter.getEncoding(dictPageHeader.getEncoding())),
+            encryptorRunTime.getMetaDataEncryptor(), encryptorRunTime.getDictPageHeaderAAD());
+          break;
+        case DATA_PAGE:
+          if (encrypt) {
+            AesCipher.quickUpdatePageAAD(encryptorRunTime.getDataPageHeaderAAD(), pageOrdinal);
+            AesCipher.quickUpdatePageAAD(encryptorRunTime.getDataPageAAD(), pageOrdinal);
+          }
+          DataPageHeader headerV1 = pageHeader.data_page_header;
+          pageLoad = processPayload(reader, pageHeader.getCompressed_page_size(), encryptorRunTime.getDataEncryptor(), encryptorRunTime.getDataPageAAD(), encrypt);
+          readValues += headerV1.getNum_values();
+          if (offsetIndex != null) {
+            long rowCount = 1 + offsetIndex.getLastRowIndex(pageOrdinal, totalChunkValues) - offsetIndex.getFirstRowIndex(pageOrdinal);
+            writer.writeDataPage(Math.toIntExact(headerV1.getNum_values()),
+              pageHeader.getUncompressed_page_size(),
+              BytesInput.from(pageLoad),
+              converter.fromParquetStatistics(createdBy, headerV1.getStatistics(), chunk.getPrimitiveType()),
+              rowCount,
+              converter.getEncoding(headerV1.getRepetition_level_encoding()),
+              converter.getEncoding(headerV1.getDefinition_level_encoding()),
+              converter.getEncoding(headerV1.getEncoding()),
+              encryptorRunTime.getMetaDataEncryptor(),
+              encryptorRunTime.getDataPageHeaderAAD());
+          } else {
+            writer.writeDataPage(Math.toIntExact(headerV1.getNum_values()),
+              pageHeader.getUncompressed_page_size(),
+              BytesInput.from(pageLoad),
+              converter.fromParquetStatistics(createdBy, headerV1.getStatistics(), chunk.getPrimitiveType()),
+              converter.getEncoding(headerV1.getRepetition_level_encoding()),
+              converter.getEncoding(headerV1.getDefinition_level_encoding()),
+              converter.getEncoding(headerV1.getEncoding()),
+              encryptorRunTime.getMetaDataEncryptor(),
+              encryptorRunTime.getDataPageHeaderAAD());
+          }
+          pageOrdinal++;
+          break;
+        case DATA_PAGE_V2:
+          if (encrypt) {
+            AesCipher.quickUpdatePageAAD(encryptorRunTime.getDataPageHeaderAAD(), pageOrdinal);
+            AesCipher.quickUpdatePageAAD(encryptorRunTime.getDataPageAAD(), pageOrdinal);
+          }
+          DataPageHeaderV2 headerV2 = pageHeader.data_page_header_v2;
+          int rlLength = headerV2.getRepetition_levels_byte_length();
+          BytesInput rlLevels = readBlockAllocate(rlLength, reader);
+          int dlLength = headerV2.getDefinition_levels_byte_length();
+          BytesInput dlLevels = readBlockAllocate(dlLength, reader);
+          int payLoadLength = pageHeader.getCompressed_page_size() - rlLength - dlLength;
+          int rawDataLength = pageHeader.getUncompressed_page_size() - rlLength - dlLength;
+          pageLoad = processPayload(reader, payLoadLength, encryptorRunTime.getDataEncryptor(), encryptorRunTime.getDataPageAAD(), encrypt);
+          readValues += headerV2.getNum_values();
+          writer.writeDataPageV2(headerV2.getNum_rows(),
+            headerV2.getNum_nulls(),
+            headerV2.getNum_values(),
+            rlLevels,
+            dlLevels,
+            converter.getEncoding(headerV2.getEncoding()),
+            BytesInput.from(pageLoad),
+            rawDataLength,
+            converter.fromParquetStatistics(createdBy, headerV2.getStatistics(), chunk.getPrimitiveType()));
+          pageOrdinal++;
+          break;
+        default:
+        break;
+      }
+    }
+  }
+
+  private byte[] processPayload(TransParquetFileReader reader, int payloadLength, BlockCipher.Encryptor dataEncryptor,
+                                byte[] AAD, boolean encrypt) throws IOException {
+    byte[] data = readBlock(payloadLength, reader);
+    if (!encrypt) {
+      return data;
+    }
+    return dataEncryptor.encrypt(data, AAD);
+  }
+
+  public byte[] readBlock(int length, TransParquetFileReader reader) throws IOException {
+    byte[] data = new byte[length];
+    reader.blockRead(data, 0, length);
+    return data;
+  }
+
+  public BytesInput readBlockAllocate(int length, TransParquetFileReader reader) throws IOException {
+    byte[] data = new byte[length];
+    reader.blockRead(data, 0, length);
+    return BytesInput.from(data, 0, length);
+  }
+
+  public static Set<ColumnPath> convertToColumnPaths(List<String> cols) {
+    Set<ColumnPath> prunePaths = new HashSet<>();
+    for (String col : cols) {
+      prunePaths.add(ColumnPath.fromDotString(col));
+    }
+    return prunePaths;
+  }
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnMasker.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnMasker.java
index 278e3a7..74bc534 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnMasker.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnMasker.java
@@ -123,7 +123,7 @@ public class ColumnMasker {
       .build();
     CodecFactory codecFactory = new CodecFactory(new Configuration(), props.getPageSizeThreshold());
     CodecFactory.BytesCompressor compressor =	codecFactory.getCompressor(chunk.getCodec());
-    
+
     // Create new schema that only has the current column
     MessageType newSchema = newSchema(schema, descriptor);
     ColumnChunkPageWriteStore cPageStore = new ColumnChunkPageWriteStore(compressor, newSchema, props.getAllocator(), props.getColumnIndexTruncateLength());
@@ -154,7 +154,7 @@ public class ColumnMasker {
     cStore.close();
     cWriter.close();
   }
-  
+
   private MessageType newSchema(MessageType schema, ColumnDescriptor descriptor) {
     String[] path = descriptor.getPath();
     Type type = schema.getType(path);
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java
index 0694558..2994ca8 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java
@@ -21,6 +21,7 @@ package org.apache.parquet.hadoop.util;
 
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.parquet.io.DelegatingSeekableInputStream;
+
 import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/ColumnEncryptorTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/ColumnEncryptorTest.java
new file mode 100644
index 0000000..670f6e2
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/ColumnEncryptorTest.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.crypto.ParquetCipher;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.SeekableInputStream;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ColumnEncryptorTest {
+
+  private Configuration conf = new Configuration();
+  private Map<String, String> extraMeta = ImmutableMap.of("key1", "value1", "key2", "value2");
+  private ColumnEncryptor columnEncryptor = null;
+  private final int numRecord = 100000;
+  private String inputFile = null;
+  private String outputFile = null;
+  private TestFileHelper.TestDocs testDocs = null;
+
+  private void testSetup(String compression) throws IOException {
+    columnEncryptor = new ColumnEncryptor(conf);
+    testDocs = new TestFileHelper.TestDocs(numRecord);
+    inputFile = TestFileHelper.createParquetFile(conf, extraMeta, numRecord, "input", compression,
+      ParquetProperties.WriterVersion.PARQUET_1_0, ParquetProperties.DEFAULT_PAGE_SIZE, testDocs);
+    outputFile = TestFileHelper.createTempFile("test");
+  }
+
+  @Test
+  public void testFlatColumn() throws IOException {
+    testSetup("GZIP");
+    String[] encryptColumns = {"DocId"};
+    columnEncryptor.encryptColumns(inputFile, outputFile, Arrays.asList(encryptColumns),
+      EncDecProperties.getFileEncryptionProperties(encryptColumns, ParquetCipher.AES_GCM_CTR_V1, false));
+
+    verifyResultDecryptionWithValidKey();
+  }
+
+  @Test
+  public void testNestedColumn() throws IOException {
+    testSetup("GZIP");
+    String[] encryptColumns = {"Links.Forward"};
+    columnEncryptor.encryptColumns(inputFile, outputFile, Arrays.asList(encryptColumns),
+      EncDecProperties.getFileEncryptionProperties(encryptColumns, ParquetCipher.AES_GCM_CTR_V1, false));
+    verifyResultDecryptionWithValidKey();
+  }
+
+  @Test
+  public void testNoEncryption() throws IOException {
+    testSetup("GZIP");
+    String[] encryptColumns = {};
+    columnEncryptor.encryptColumns(inputFile, outputFile, Arrays.asList(encryptColumns),
+      EncDecProperties.getFileEncryptionProperties(encryptColumns, ParquetCipher.AES_GCM_CTR_V1, false));
+    verifyResultDecryptionWithValidKey();
+  }
+
+  @Test
+  public void testEncryptAllColumns() throws IOException {
+    testSetup("GZIP");
+    String[] encryptColumns = {"DocId", "Name", "Gender", "Links.Forward", "Links.Backward"};
+    columnEncryptor.encryptColumns(inputFile, outputFile, Arrays.asList(encryptColumns),
+      EncDecProperties.getFileEncryptionProperties(encryptColumns, ParquetCipher.AES_GCM_CTR_V1, false));
+    verifyResultDecryptionWithValidKey();
+  }
+
+  @Test
+  public void testEncryptSomeColumns() throws IOException {
+    testSetup("GZIP");
+    String[] encryptColumns = {"DocId", "Name", "Links.Forward"};
+    columnEncryptor.encryptColumns(inputFile, outputFile, Arrays.asList(encryptColumns),
+      EncDecProperties.getFileEncryptionProperties(encryptColumns, ParquetCipher.AES_GCM_CTR_V1, false));
+
+    ParquetMetadata metaData = getParquetMetadata(EncDecProperties.getFileDecryptionProperties());
+    assertTrue(metaData.getBlocks().size() > 0);
+    List<ColumnChunkMetaData> columns = metaData.getBlocks().get(0).getColumns();
+    Set<String> set = new HashSet<>(Arrays.asList(encryptColumns));
+    for (ColumnChunkMetaData column : columns) {
+      if (set.contains(column.getPath().toDotString())) {
+        assertTrue(column.isEncrypted());
+      } else {
+        assertFalse(column.isEncrypted());
+      }
+    }
+  }
+
+  @Test
+  public void testFooterEncryption() throws IOException {
+    testSetup("GZIP");
+    String[] encryptColumns = {"DocId"};
+    columnEncryptor.encryptColumns(inputFile, outputFile, Arrays.asList(encryptColumns),
+      EncDecProperties.getFileEncryptionProperties(encryptColumns, ParquetCipher.AES_GCM_CTR_V1, true));
+
+    verifyResultDecryptionWithValidKey();
+  }
+
+  @Test
+  public void testAesGcm() throws IOException {
+    testSetup("GZIP");
+    String[] encryptColumns = {"DocId"};
+    columnEncryptor.encryptColumns(inputFile, outputFile, Arrays.asList(encryptColumns),
+      EncDecProperties.getFileEncryptionProperties(encryptColumns, ParquetCipher.AES_GCM_V1, true));
+
+    verifyResultDecryptionWithValidKey();
+  }
+
+  @Test
+  public void testColumnIndex() throws IOException {
+    testSetup("GZIP");
+    String[] encryptColumns = {"Name"};
+    columnEncryptor.encryptColumns(inputFile, outputFile, Arrays.asList(encryptColumns),
+      EncDecProperties.getFileEncryptionProperties(encryptColumns, ParquetCipher.AES_GCM_V1, false));
+
+    verifyResultDecryptionWithValidKey();
+    verifyOffsetIndexes();
+  }
+
+  @Test
+  public void testDifferentCompression() throws IOException {
+    String[] compressions = {"GZIP", "ZSTD", "SNAPPY", "UNCOMPRESSED"};
+    for (String compression : compressions)  {
+      testSetup(compression);
+    }
+    String[] encryptColumns = {"Links.Forward"};
+    columnEncryptor.encryptColumns(inputFile, outputFile, Arrays.asList(encryptColumns),
+      EncDecProperties.getFileEncryptionProperties(encryptColumns, ParquetCipher.AES_GCM_CTR_V1, false));
+    verifyResultDecryptionWithValidKey();
+
+  }
+
+  private void verifyResultDecryptionWithValidKey() throws IOException  {
+    ParquetReader<Group> reader = createReader(outputFile);
+    for (int i = 0; i < numRecord; i++) {
+      Group group = reader.read();
+      assertTrue(group.getLong("DocId", 0) == testDocs.docId[i]);
+      assertArrayEquals(group.getBinary("Name", 0).getBytes(), testDocs.name[i].getBytes());
+      assertArrayEquals(group.getBinary("Gender", 0).getBytes(), testDocs.gender[i].getBytes());
+      Group subGroup = group.getGroup("Links", 0);
+      assertArrayEquals(subGroup.getBinary("Forward", 0).getBytes(), testDocs.linkForward[i].getBytes());
+      assertArrayEquals(subGroup.getBinary("Backward", 0).getBytes(), testDocs.linkBackward[i].getBytes());
+    }
+    reader.close();
+  }
+
+  private void verifyOffsetIndexes() throws IOException {
+    ParquetReadOptions readOptions = HadoopReadOptions.builder(conf)
+      .withDecryption(EncDecProperties.getFileDecryptionProperties())
+      .build();
+
+    try (TransParquetFileReader inReader = createFileReader(inputFile);
+         TransParquetFileReader outReader = createFileReader(outputFile)) {
+      ParquetMetadata inMetaData = getMetadata(readOptions, inputFile, inReader);
+      ParquetMetadata outMetaData = getMetadata(readOptions, outputFile, outReader);
+      compareOffsetIndexes(inReader, outReader, inMetaData, outMetaData);
+    }
+  }
+
+  private ParquetMetadata getMetadata(ParquetReadOptions readOptions, String file, TransParquetFileReader reader) throws IOException {
+    return ParquetFileReader.readFooter(HadoopInputFile.fromPath(new Path(file), conf),
+                                        readOptions,
+                                        reader.getStream());
+  }
+
+  private void compareOffsetIndexes(TransParquetFileReader inReader, TransParquetFileReader outReader,
+                                    ParquetMetadata inMetaData, ParquetMetadata outMetaData) throws IOException {
+
+    PageReadStore inStore = inReader.readNextRowGroup();
+    PageReadStore outStore = outReader.readNextRowGroup();
+    int blockIndex = 0;
+    while (inStore != null && outStore != null) {
+      List<ColumnChunkMetaData> inColumns = inMetaData.getBlocks().get(blockIndex).getColumns();
+      List<ColumnChunkMetaData> outColumns = outMetaData.getBlocks().get(blockIndex).getColumns();
+      assertEquals(inColumns.size(), outColumns.size());
+      validateColumns(inReader, outReader, inColumns, outColumns);
+      inStore = inReader.readNextRowGroup();
+      outStore = outReader.readNextRowGroup();
+      blockIndex++;
+      if (inStore != null || outStore != null) {
+        throw new IOException("Number of row groups are not equal");
+      }
+    }
+  }
+
+  private void validateColumns(TransParquetFileReader inReader, TransParquetFileReader outReader,
+                               List<ColumnChunkMetaData> inColumns, List<ColumnChunkMetaData> outColumns) throws IOException {
+    for (int i = 0; i < inColumns.size(); i ++) {
+      ColumnChunkMetaData inChunk = inColumns.get(i);
+      ColumnChunkMetaData outChunk = outColumns.get(i);
+      OffsetIndex inOffsetIndex = inReader.readOffsetIndex(inChunk);
+      OffsetIndex outOffsetIndex = outReader.readOffsetIndex(outChunk);
+      assertEquals(inOffsetIndex.getPageCount(), outOffsetIndex.getPageCount());
+      if (outChunk.isEncrypted()) {
+        continue;
+      }
+      validatePages(inReader, outReader, inOffsetIndex, outOffsetIndex);
+    }
+  }
+
+  private void validatePages(TransParquetFileReader inReader, TransParquetFileReader outReader,
+                         OffsetIndex inOffsetIndex, OffsetIndex outOffsetIndex) throws IOException {
+      for (int pageId = 0; pageId < inOffsetIndex.getPageCount(); pageId ++) {
+        long inPageOffset = inOffsetIndex.getOffset(pageId);
+        inReader.setStreamPosition(inPageOffset);
+        long outPageOffset = outOffsetIndex.getOffset(pageId);
+        outReader.setStreamPosition(outPageOffset);
+        PageHeader inPageHeader = inReader.readPageHeader();
+        PageHeader outPageHeader = outReader.readPageHeader();
+        assertEquals(inPageHeader, outPageHeader);
+        DataPageHeader inHeaderV1 = inPageHeader.data_page_header;
+        DataPageHeader outHeaderV1 = outPageHeader.data_page_header;
+        assertEquals(inHeaderV1, outHeaderV1);
+        BytesInput inPageLoad = readBlockAllocate(inReader, inPageHeader.compressed_page_size);
+        BytesInput outPageLoad = readBlockAllocate(outReader, outPageHeader.compressed_page_size);
+        assertEquals(inPageLoad.toByteBuffer(), outPageLoad.toByteBuffer());
+      }
+  }
+
+  private BytesInput readBlockAllocate(TransParquetFileReader reader, int length) throws IOException {
+    byte[] data = new byte[length];
+    reader.blockRead(data, 0, length);
+    return BytesInput.from(data, 0, length);
+  }
+
+  private TransParquetFileReader createFileReader(String path) throws IOException {
+    return new TransParquetFileReader(HadoopInputFile.fromPath(new Path(path), conf),
+      HadoopReadOptions.builder(conf)
+        .withDecryption(EncDecProperties.getFileDecryptionProperties())
+        .build());
+  }
+
+  private ParquetReader<Group> createReader(String path) throws IOException {
+    return ParquetReader.builder(new GroupReadSupport(),
+      new Path(path)).withConf(conf).withDecryption(EncDecProperties.getFileDecryptionProperties()).build();
+  }
+
+  private ParquetMetadata getParquetMetadata(FileDecryptionProperties decryptionProperties) throws IOException {
+    ParquetMetadata metaData;
+    ParquetReadOptions readOptions = ParquetReadOptions.builder()
+      .withDecryption(decryptionProperties)
+      .build();
+    InputFile file = HadoopInputFile.fromPath(new Path(outputFile), conf);
+    try (SeekableInputStream in = file.newStream()) {
+      metaData  = ParquetFileReader.readFooter(file, readOptions, in);
+    }
+    return metaData;
+  }
+}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConveterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConveterTest.java
index fefa5e4..218abb9 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConveterTest.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConveterTest.java
@@ -22,15 +22,11 @@ import com.google.common.collect.ImmutableMap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.HadoopReadOptions;
-import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.column.ParquetProperties;
-import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.example.data.Group;
 import org.apache.parquet.example.data.simple.SimpleGroup;
-import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
 import org.apache.parquet.format.DataPageHeader;
 import org.apache.parquet.format.DataPageHeaderV2;
-import org.apache.parquet.format.DictionaryPageHeader;
 import org.apache.parquet.format.PageHeader;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.ParquetFileWriter;
@@ -46,9 +42,6 @@ import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader;
 import org.apache.parquet.internal.column.columnindex.ColumnIndex;
 import org.apache.parquet.internal.column.columnindex.OffsetIndex;
-import org.apache.parquet.io.ColumnIOFactory;
-import org.apache.parquet.io.MessageColumnIO;
-import org.apache.parquet.io.RecordReader;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.PrimitiveType;
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/EncDecProperties.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/EncDecProperties.java
new file mode 100644
index 0000000..ac877a2
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/EncDecProperties.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util;
+
+import org.apache.parquet.crypto.ColumnEncryptionProperties;
+import org.apache.parquet.crypto.DecryptionKeyRetriever;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.ParquetCipher;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class EncDecProperties {
+
+  public static class DecryptionKeyRetrieverMock implements DecryptionKeyRetriever {
+    private final Map<String, byte[]> keyMap = new HashMap<>();
+
+    public DecryptionKeyRetrieverMock putKey(String keyId, byte[] keyBytes) {
+      keyMap.put(keyId, keyBytes);
+      return this;
+    }
+
+    @Override
+    public byte[] getKey(byte[] keyMetaData) {
+      String keyId = new String(keyMetaData, StandardCharsets.UTF_8);
+      return keyMap.get(keyId);
+    }
+  }
+
+  private static final byte[] FOOTER_KEY = {0x01, 0x02, 0x03, 0x4, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a,
+    0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x10};
+  private static final byte[] FOOTER_KEY_METADATA = "footkey".getBytes(StandardCharsets.UTF_8);
+  private static final byte[] COL_KEY = {0x02, 0x03, 0x4, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b,
+    0x0c, 0x0d, 0x0e, 0x0f, 0x10, 0x11};
+  private static final byte[] COL_KEY_METADATA = "col".getBytes(StandardCharsets.UTF_8);
+
+  public static FileDecryptionProperties getFileDecryptionProperties() throws IOException {
+    DecryptionKeyRetrieverMock keyRetriever = new DecryptionKeyRetrieverMock();
+    keyRetriever.putKey("footkey", FOOTER_KEY);
+    keyRetriever.putKey("col", COL_KEY);
+    return FileDecryptionProperties.builder().withPlaintextFilesAllowed().withKeyRetriever(keyRetriever).build();
+  }
+
+  public static FileEncryptionProperties getFileEncryptionProperties(String[] encrCols, ParquetCipher cipher, Boolean encryptFooter) {
+    if (encrCols.length == 0) {
+      return null;
+    }
+    Map<ColumnPath, ColumnEncryptionProperties> columnPropertyMap = new HashMap<>();
+    Set<String> paths = new HashSet<>(Arrays.asList(encrCols));
+
+    for (ColumnPath columnPath : TestFileHelper.getPaths()) {
+      ColumnEncryptionProperties colEncProp;
+      if (paths.contains(columnPath.toDotString())) {
+        colEncProp = ColumnEncryptionProperties.builder(columnPath, true)
+          .withKey(COL_KEY)
+          .withKeyMetaData(COL_KEY_METADATA)
+          .build();
+      }  else {
+        colEncProp = ColumnEncryptionProperties.builder(columnPath, false).build();
+      }
+      columnPropertyMap.put(columnPath, colEncProp);
+    }
+
+    FileEncryptionProperties.Builder encryptionPropertiesBuilder =
+      FileEncryptionProperties.builder(FOOTER_KEY)
+        .withFooterKeyMetadata(FOOTER_KEY_METADATA)
+        .withAlgorithm(cipher)
+        .withEncryptedColumns(columnPropertyMap);
+
+    if(!encryptFooter) {
+      encryptionPropertiesBuilder.withPlaintextFooter();
+    }
+
+    return encryptionPropertiesBuilder.build();
+  }
+}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileHelper.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileHelper.java
new file mode 100644
index 0000000..f661035
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileHelper.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroup;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
+import static org.apache.parquet.schema.Type.Repetition.REPEATED;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
+
+public class TestFileHelper {
+
+  public static MessageType schema = new MessageType("schema",
+    new PrimitiveType(OPTIONAL, INT64, "DocId"),
+    new PrimitiveType(REQUIRED, BINARY, "Name"),
+    new PrimitiveType(OPTIONAL, BINARY, "Gender"),
+    new GroupType(OPTIONAL, "Links",
+      new PrimitiveType(REPEATED, BINARY, "Backward"),
+      new PrimitiveType(REPEATED, BINARY, "Forward")));
+
+  private static Random rnd = new Random(5);
+
+  public static String createParquetFile(Configuration conf, Map<String, String> extraMeta, int numRecord, String prefix, String codec,
+                                         ParquetProperties.WriterVersion writerVersion, int pageSize, TestDocs testDocs) throws IOException {
+    conf.set(GroupWriteSupport.PARQUET_EXAMPLE_SCHEMA, schema.toString());
+
+    String file = createTempFile(prefix);
+    ExampleParquetWriter.Builder builder = ExampleParquetWriter.builder(new Path(file))
+      .withConf(conf)
+      .withWriterVersion(writerVersion)
+      .withExtraMetaData(extraMeta)
+      .withValidation(true)
+      .withPageSize(pageSize)
+      .withCompressionCodec(CompressionCodecName.valueOf(codec));
+    try (ParquetWriter writer = builder.build()) {
+      for (int i = 0; i < numRecord; i++) {
+        SimpleGroup g = new SimpleGroup(schema);
+        g.add("DocId", testDocs.docId[i]);
+        g.add("Name", testDocs.name[i]);
+        g.add("Gender", testDocs.gender[i]);
+        Group links = g.addGroup("Links");
+        links.add(0, testDocs.linkBackward[i]);
+        links.add(1, testDocs.linkForward[i]);
+        writer.write(g);
+      }
+    }
+    return file;
+  }
+
+  private static long getLong() {
+    return ThreadLocalRandom.current().nextLong(1000);
+  }
+
+  private static String getString() {
+    char[] chars = {'a', 'b', 'c', 'd', 'e', 'f', 'g', 'x', 'z', 'y'};
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < 100; i++) {
+      sb.append(chars[rnd.nextInt(10)]);
+    }
+    return sb.toString();
+  }
+
+  public static String createTempFile(String prefix) {
+    try {
+      return Files.createTempDirectory(prefix).toAbsolutePath().toString() + "/test.parquet";
+    } catch (IOException e) {
+      throw new AssertionError("Unable to create temporary file", e);
+    }
+  }
+
+  public static List<ColumnPath> getPaths() {
+    List<ColumnPath> paths = new ArrayList<>();
+    paths.add(ColumnPath.fromDotString("DocId"));
+    paths.add(ColumnPath.fromDotString("Name"));
+    paths.add(ColumnPath.fromDotString("Gender"));
+    paths.add(ColumnPath.fromDotString("Links.Backward"));
+    paths.add(ColumnPath.fromDotString("Links.Forward"));
+    return paths;
+  }
+
+  public static class TestDocs {
+    public long[] docId;
+    public String[] name;
+    public String[] gender;
+    public String[] linkBackward;
+    public String[] linkForward;
+
+    public TestDocs(int numRecord) {
+      docId = new long[numRecord];
+      for (int i = 0; i < numRecord; i++) {
+        docId[i] = getLong();
+      }
+
+      name = new String[numRecord];
+      for (int i = 0; i < numRecord; i++) {
+        name[i] = getString();
+      }
+
+      gender = new String[numRecord];
+      for (int i = 0; i < numRecord; i++) {
+        gender[i] = getString();
+      }
+
+      linkBackward = new String[numRecord];
+      for (int i = 0; i < numRecord; i++) {
+        linkBackward[i] = getString();
+      }
+
+      linkForward = new String[numRecord];
+      for (int i = 0; i < numRecord; i++) {
+        linkForward[i] = getString();
+      }
+    }
+  }
+}