You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by GitBox <gi...@apache.org> on 2021/09/19 12:02:44 UTC

[GitHub] [parquet-mr] ggershinsky commented on a change in pull request #928: PARQUET-2081: Encryption translation tool - Parquet-hadoop

ggershinsky commented on a change in pull request #928:
URL: https://github.com/apache/parquet-mr/pull/928#discussion_r711729880



##########
File path: parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnEncryptor.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.InternalColumnEncryptionSetup;
+import org.apache.parquet.crypto.InternalFileEncryptor;
+import org.apache.parquet.format.BlockCipher;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH;
+import static org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
+import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;
+
+/**
+ * This class is for fast rewriting existing file with column encryption
+ *
+ * For columns to be encrypted, all the pages of those columns are read, but decompression/decoding,
+ * it is encrypted immediately and write back.
+ *
+ * For columns not to be encrypted, the whole column chunk will be appended directly to writer.
+ */
+public class ColumnEncryptor {
+  private static final Logger LOG = LoggerFactory.getLogger(ColumnEncryptor.class);
+
+  private static class EncryptorRunTime {
+    private final InternalColumnEncryptionSetup colEncrSetup;
+    private final BlockCipher.Encryptor dataEncryptor;
+    private final BlockCipher.Encryptor metaDataEncryptor;
+    private final byte[] fileAAD ;
+
+    private byte[] dataPageHeaderAAD;
+    private byte[] dataPageAAD;
+    private byte[] dictPageHeaderAAD;
+    private byte[] dictPageAAD;
+
+    public EncryptorRunTime(InternalFileEncryptor fileEncryptor, ColumnChunkMetaData chunk,
+                            int blockId, int columnId) throws IOException  {
+      this.colEncrSetup = fileEncryptor.getColumnSetup(chunk.getPath(), true, toShortWithCheck(columnId));
+      this.dataEncryptor = colEncrSetup.getDataEncryptor();
+      this.metaDataEncryptor = colEncrSetup.getMetaDataEncryptor();
+
+      this.fileAAD = fileEncryptor.getFileAAD();
+      this.dataPageHeaderAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DataPageHeader,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+      this.dataPageAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DataPage,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+      this.dictPageHeaderAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DictionaryPageHeader,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+      this.dictPageAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DictionaryPage,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+    }
+
+    public InternalColumnEncryptionSetup getColEncrSetup() {
+      return this.colEncrSetup;
+    }
+
+    public BlockCipher.Encryptor getDataEncryptor() {
+      return this.dataEncryptor;
+    }
+
+    public BlockCipher.Encryptor getMetaDataEncryptor() {
+      return this.metaDataEncryptor;
+    }
+
+    public byte[] getFileAAD() {
+      return this.fileAAD;
+    }
+
+    public byte[] getDataPageHeaderAAD() {
+      return this.dataPageHeaderAAD;
+    }
+
+    public byte[] getDataPageAAD() {
+      return this.dataPageAAD;
+    }
+
+    public byte[] getDictPageHeaderAAD() {
+      return this.dictPageHeaderAAD;
+    }
+
+    public byte[] getDictPageAAD() {
+      return this.dictPageAAD;
+    }
+  }
+
+  private final int PAGE_BUFFER_SIZE = ParquetProperties.DEFAULT_PAGE_SIZE * 2;
+  private byte[] pageBuffer;
+  private Configuration conf;
+
+  public ColumnEncryptor(Configuration conf) {
+    this.pageBuffer = new byte[PAGE_BUFFER_SIZE];
+    this.conf = conf;
+  }
+
+  /**
+   * Given the input file, encrypt the columns specified by paths, and output the file.
+   * The encryption settings can be specified in the parameter of fileEncryptionProperties
+   * @param inputFile Input file
+   * @param outputFile Output file
+   * @param paths columns to be encrypted
+   * @param fileEncryptionProperties FileEncryptionProperties of the file
+   * @throws IOException
+   */
+  public void encryptColumns(String inputFile, String outputFile, List<String> paths, FileEncryptionProperties fileEncryptionProperties) throws IOException {
+    Path inPath = new Path(inputFile);
+    Path outPath = new Path(outputFile);
+
+    ParquetMetadata metaData = ParquetFileReader.readFooter(conf, inPath, NO_FILTER);
+    MessageType schema = metaData.getFileMetaData().getSchema();
+
+    ParquetFileWriter writer = new ParquetFileWriter(HadoopOutputFile.fromPath(outPath, conf), schema, ParquetFileWriter.Mode.OVERWRITE,
+      DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT, DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, DEFAULT_STATISTICS_TRUNCATE_LENGTH,
+      ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED, fileEncryptionProperties);
+    writer.start();
+
+    try (TransParquetFileReader reader = new TransParquetFileReader(HadoopInputFile.fromPath(inPath, conf), HadoopReadOptions.builder(conf).build())) {
+      processBlocks(reader, writer, metaData, schema, paths);
+    } catch (Exception e) {
+      LOG.error("Exception happened while processing blocks: ", e);

Review comment:
       why an exception is not thrown here?

##########
File path: parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnEncryptor.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.InternalColumnEncryptionSetup;
+import org.apache.parquet.crypto.InternalFileEncryptor;
+import org.apache.parquet.format.BlockCipher;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH;
+import static org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
+import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;
+
+/**
+ * This class is for fast rewriting existing file with column encryption
+ *
+ * For columns to be encrypted, all the pages of those columns are read, but decompression/decoding,
+ * it is encrypted immediately and write back.
+ *
+ * For columns not to be encrypted, the whole column chunk will be appended directly to writer.
+ */
+public class ColumnEncryptor {
+  private static final Logger LOG = LoggerFactory.getLogger(ColumnEncryptor.class);
+
+  private static class EncryptorRunTime {
+    private final InternalColumnEncryptionSetup colEncrSetup;
+    private final BlockCipher.Encryptor dataEncryptor;
+    private final BlockCipher.Encryptor metaDataEncryptor;
+    private final byte[] fileAAD ;
+
+    private byte[] dataPageHeaderAAD;
+    private byte[] dataPageAAD;
+    private byte[] dictPageHeaderAAD;
+    private byte[] dictPageAAD;
+
+    public EncryptorRunTime(InternalFileEncryptor fileEncryptor, ColumnChunkMetaData chunk,
+                            int blockId, int columnId) throws IOException  {
+      this.colEncrSetup = fileEncryptor.getColumnSetup(chunk.getPath(), true, toShortWithCheck(columnId));
+      this.dataEncryptor = colEncrSetup.getDataEncryptor();
+      this.metaDataEncryptor = colEncrSetup.getMetaDataEncryptor();
+
+      this.fileAAD = fileEncryptor.getFileAAD();
+      this.dataPageHeaderAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DataPageHeader,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+      this.dataPageAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DataPage,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+      this.dictPageHeaderAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DictionaryPageHeader,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+      this.dictPageAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DictionaryPage,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+    }
+
+    public InternalColumnEncryptionSetup getColEncrSetup() {
+      return this.colEncrSetup;
+    }
+
+    public BlockCipher.Encryptor getDataEncryptor() {
+      return this.dataEncryptor;
+    }
+
+    public BlockCipher.Encryptor getMetaDataEncryptor() {
+      return this.metaDataEncryptor;
+    }
+
+    public byte[] getFileAAD() {
+      return this.fileAAD;
+    }
+
+    public byte[] getDataPageHeaderAAD() {
+      return this.dataPageHeaderAAD;
+    }
+
+    public byte[] getDataPageAAD() {
+      return this.dataPageAAD;
+    }
+
+    public byte[] getDictPageHeaderAAD() {
+      return this.dictPageHeaderAAD;
+    }
+
+    public byte[] getDictPageAAD() {
+      return this.dictPageAAD;
+    }
+  }
+
+  private final int PAGE_BUFFER_SIZE = ParquetProperties.DEFAULT_PAGE_SIZE * 2;
+  private byte[] pageBuffer;
+  private Configuration conf;
+
+  public ColumnEncryptor(Configuration conf) {
+    this.pageBuffer = new byte[PAGE_BUFFER_SIZE];
+    this.conf = conf;
+  }
+
+  /**
+   * Given the input file, encrypt the columns specified by paths, and output the file.
+   * The encryption settings can be specified in the parameter of fileEncryptionProperties
+   * @param inputFile Input file
+   * @param outputFile Output file
+   * @param paths columns to be encrypted
+   * @param fileEncryptionProperties FileEncryptionProperties of the file
+   * @throws IOException
+   */
+  public void encryptColumns(String inputFile, String outputFile, List<String> paths, FileEncryptionProperties fileEncryptionProperties) throws IOException {

Review comment:
       this method should check if the input file is already encrypted, and throw an exception then.

##########
File path: parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
##########
@@ -503,7 +503,7 @@ private void addRowGroup(ParquetMetadata parquetMetadata, List<RowGroup> rowGrou
       ColumnPath path = columnMetaData.getPath();
       if (null != fileEncryptor) {
         columnOrdinal++;
-        columnSetup = fileEncryptor.getColumnSetup(path, false, columnOrdinal);
+        columnSetup = fileEncryptor.getColumnSetup(path, true, columnOrdinal);

Review comment:
       This is a flow checker, that verifies that column encryption setup already exists when writing the file footer. The setup should be created earlier, when encrypting the pages, therefore `true` should be used only there. Here, in footer writing, this should be called with `false`. 

##########
File path: parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/EncDecProperties.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util;
+
+import org.apache.parquet.crypto.ColumnEncryptionProperties;
+import org.apache.parquet.crypto.DecryptionKeyRetriever;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.ParquetCipher;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class EncDecProperties {
+
+  public static class DecryptionKeyRetrieverMock implements DecryptionKeyRetriever {
+    private final Map<String, byte[]> keyMap = new HashMap<>();
+
+    public DecryptionKeyRetrieverMock putKey(String keyId, byte[] keyBytes) {
+      keyMap.put(keyId, keyBytes);
+      return this;
+    }
+
+    @Override
+    public byte[] getKey(byte[] keyMetaData) {
+      String keyId = new String(keyMetaData, StandardCharsets.UTF_8);
+      return keyMap.get(keyId);
+    }
+  }
+
+  private static final byte[] FOOTER_KEY = {0x01, 0x02, 0x03, 0x4, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a,
+    0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x10};
+  private static final byte[] FOOTER_KEY_METADATA = "footkey".getBytes(StandardCharsets.UTF_8);
+  private static final byte[] COL_KEY = {0x02, 0x03, 0x4, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b,
+    0x0c, 0x0d, 0x0e, 0x0f, 0x10, 0x11};
+  private static final byte[] COL_KEY_METADATA = "col".getBytes(StandardCharsets.UTF_8);
+
+  public static FileDecryptionProperties getFileDecryptionProperties() throws IOException {
+    DecryptionKeyRetrieverMock keyRetriever = new DecryptionKeyRetrieverMock();
+    keyRetriever.putKey("footkey", FOOTER_KEY);
+    keyRetriever.putKey("col", COL_KEY);
+    return FileDecryptionProperties.builder().withPlaintextFilesAllowed().withoutFooterSignatureVerification().withKeyRetriever(keyRetriever).build();
+  }
+
+  public static FileDecryptionProperties getFileDecryptionPropertiesNonExistKey() throws IOException {

Review comment:
       this function is not used (?)

##########
File path: parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
##########
@@ -1125,7 +1154,7 @@ private static void serializeColumnIndexes(
         BlockCipher.Encryptor columnIndexEncryptor = null;
         byte[] columnIndexAAD = null;
         if (null != fileEncryptor) {
-          InternalColumnEncryptionSetup columnEncryptionSetup = fileEncryptor.getColumnSetup(column.getPath(), false, cIndex);
+          InternalColumnEncryptionSetup columnEncryptionSetup = fileEncryptor.getColumnSetup(column.getPath(), true, cIndex);

Review comment:
       same as before - this is done for column indexes, after the pages are encrypted, therefore the column encryption setup should already exist and this method should  be called with 'false`

##########
File path: parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
##########
@@ -599,6 +599,31 @@ private void innerWriteDataPage(
       Encoding rlEncoding,
       Encoding dlEncoding,
       Encoding valuesEncoding) throws IOException {
+    writeDataPage(valueCount, uncompressedPageSize, bytes, statistics, rlEncoding, dlEncoding, valuesEncoding, null, null);
+  }
+
+  /**
+   * writes a single page
+   * @param valueCount count of values
+   * @param uncompressedPageSize the size of the data once uncompressed
+   * @param bytes the compressed data for the page without header
+   * @param statistics statistics for the page
+   * @param rlEncoding encoding of the repetition level
+   * @param dlEncoding encoding of the definition level
+   * @param valuesEncoding encoding of values
+   * @param blockEncryptor encryptor for block data

Review comment:
       please rename this parameter to `metadataBlockEncryptor` or `pageHeaderEncryptor`, this will help to differentiate it from data encryptor (used for pages).

##########
File path: parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
##########
@@ -599,6 +599,31 @@ private void innerWriteDataPage(
       Encoding rlEncoding,
       Encoding dlEncoding,
       Encoding valuesEncoding) throws IOException {
+    writeDataPage(valueCount, uncompressedPageSize, bytes, statistics, rlEncoding, dlEncoding, valuesEncoding, null, null);
+  }
+
+  /**
+   * writes a single page
+   * @param valueCount count of values
+   * @param uncompressedPageSize the size of the data once uncompressed
+   * @param bytes the compressed data for the page without header
+   * @param statistics statistics for the page
+   * @param rlEncoding encoding of the repetition level
+   * @param dlEncoding encoding of the definition level
+   * @param valuesEncoding encoding of values
+   * @param blockEncryptor encryptor for block data
+   * @param AAD AAD

Review comment:
       `AAD` is very general, please rename this parameter to `pageHeaderAAD`

##########
File path: parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnEncryptor.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.InternalColumnEncryptionSetup;
+import org.apache.parquet.crypto.InternalFileEncryptor;
+import org.apache.parquet.format.BlockCipher;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH;
+import static org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
+import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;
+
+/**
+ * This class is for fast rewriting existing file with column encryption
+ *
+ * For columns to be encrypted, all the pages of those columns are read, but decompression/decoding,
+ * it is encrypted immediately and write back.
+ *
+ * For columns not to be encrypted, the whole column chunk will be appended directly to writer.
+ */
+public class ColumnEncryptor {
+  private static final Logger LOG = LoggerFactory.getLogger(ColumnEncryptor.class);
+
+  private static class EncryptorRunTime {
+    private final InternalColumnEncryptionSetup colEncrSetup;
+    private final BlockCipher.Encryptor dataEncryptor;
+    private final BlockCipher.Encryptor metaDataEncryptor;
+    private final byte[] fileAAD ;
+
+    private byte[] dataPageHeaderAAD;
+    private byte[] dataPageAAD;
+    private byte[] dictPageHeaderAAD;
+    private byte[] dictPageAAD;
+
+    public EncryptorRunTime(InternalFileEncryptor fileEncryptor, ColumnChunkMetaData chunk,
+                            int blockId, int columnId) throws IOException  {
+      this.colEncrSetup = fileEncryptor.getColumnSetup(chunk.getPath(), true, toShortWithCheck(columnId));
+      this.dataEncryptor = colEncrSetup.getDataEncryptor();
+      this.metaDataEncryptor = colEncrSetup.getMetaDataEncryptor();
+
+      this.fileAAD = fileEncryptor.getFileAAD();
+      this.dataPageHeaderAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DataPageHeader,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+      this.dataPageAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DataPage,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+      this.dictPageHeaderAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DictionaryPageHeader,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+      this.dictPageAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DictionaryPage,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+    }
+
+    public InternalColumnEncryptionSetup getColEncrSetup() {
+      return this.colEncrSetup;
+    }
+
+    public BlockCipher.Encryptor getDataEncryptor() {
+      return this.dataEncryptor;
+    }
+
+    public BlockCipher.Encryptor getMetaDataEncryptor() {
+      return this.metaDataEncryptor;
+    }
+
+    public byte[] getFileAAD() {
+      return this.fileAAD;
+    }
+
+    public byte[] getDataPageHeaderAAD() {
+      return this.dataPageHeaderAAD;
+    }
+
+    public byte[] getDataPageAAD() {
+      return this.dataPageAAD;
+    }
+
+    public byte[] getDictPageHeaderAAD() {
+      return this.dictPageHeaderAAD;
+    }
+
+    public byte[] getDictPageAAD() {
+      return this.dictPageAAD;
+    }
+  }
+
+  private final int PAGE_BUFFER_SIZE = ParquetProperties.DEFAULT_PAGE_SIZE * 2;
+  private byte[] pageBuffer;
+  private Configuration conf;
+
+  public ColumnEncryptor(Configuration conf) {
+    this.pageBuffer = new byte[PAGE_BUFFER_SIZE];
+    this.conf = conf;
+  }
+
+  /**
+   * Given the input file, encrypt the columns specified by paths, and output the file.
+   * The encryption settings can be specified in the parameter of fileEncryptionProperties
+   * @param inputFile Input file
+   * @param outputFile Output file
+   * @param paths columns to be encrypted
+   * @param fileEncryptionProperties FileEncryptionProperties of the file
+   * @throws IOException
+   */
+  public void encryptColumns(String inputFile, String outputFile, List<String> paths, FileEncryptionProperties fileEncryptionProperties) throws IOException {
+    Path inPath = new Path(inputFile);
+    Path outPath = new Path(outputFile);
+
+    ParquetMetadata metaData = ParquetFileReader.readFooter(conf, inPath, NO_FILTER);
+    MessageType schema = metaData.getFileMetaData().getSchema();
+
+    ParquetFileWriter writer = new ParquetFileWriter(HadoopOutputFile.fromPath(outPath, conf), schema, ParquetFileWriter.Mode.OVERWRITE,
+      DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT, DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, DEFAULT_STATISTICS_TRUNCATE_LENGTH,
+      ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED, fileEncryptionProperties);
+    writer.start();
+
+    try (TransParquetFileReader reader = new TransParquetFileReader(HadoopInputFile.fromPath(inPath, conf), HadoopReadOptions.builder(conf).build())) {
+      processBlocks(reader, writer, metaData, schema, paths);
+    } catch (Exception e) {
+      LOG.error("Exception happened while processing blocks: ", e);
+    } finally {
+      try {
+        writer.end(metaData.getFileMetaData().getKeyValueMetaData());
+      } catch (Exception e) {
+        LOG.error("Exception happened when ending the writer: ", e);
+      }
+    }
+  }
+
+  private void processBlocks(TransParquetFileReader reader, ParquetFileWriter writer, ParquetMetadata meta,
+                            MessageType schema, List<String> paths) throws IOException {
+    Set<ColumnPath> nullifyColumns = convertToColumnPaths(paths);
+    int blockId = 0;
+    PageReadStore store = reader.readNextRowGroup();
+
+    while (store != null) {
+      writer.startBlock(store.getRowCount());
+
+      List<ColumnChunkMetaData> columnsInOrder = meta.getBlocks().get(blockId).getColumns();
+      Map<ColumnPath, ColumnDescriptor> descriptorsMap = schema.getColumns().stream().collect(
+        Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
+
+      for (int i = 0; i < columnsInOrder.size(); i += 1) {
+        ColumnChunkMetaData chunk = columnsInOrder.get(i);
+        ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
+        processChunk(descriptor, chunk, reader, writer, nullifyColumns, blockId, i, meta.getFileMetaData().getCreatedBy());
+      }
+
+      writer.endBlock();
+      store = reader.readNextRowGroup();
+      blockId++;
+    }
+  }
+
+  private void processChunk(ColumnDescriptor descriptor, ColumnChunkMetaData chunk, TransParquetFileReader reader, ParquetFileWriter writer,
+                            Set<ColumnPath> paths, int blockId, int columnId, String createdBy) throws IOException {

Review comment:
       as above, `paths` is too general

##########
File path: parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnEncryptor.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.InternalColumnEncryptionSetup;
+import org.apache.parquet.crypto.InternalFileEncryptor;
+import org.apache.parquet.format.BlockCipher;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH;
+import static org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
+import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;
+
+/**
+ * This class is for fast rewriting existing file with column encryption
+ *
+ * For columns to be encrypted, all the pages of those columns are read, but decompression/decoding,
+ * it is encrypted immediately and write back.
+ *
+ * For columns not to be encrypted, the whole column chunk will be appended directly to writer.
+ */
+public class ColumnEncryptor {
+  private static final Logger LOG = LoggerFactory.getLogger(ColumnEncryptor.class);
+
+  private static class EncryptorRunTime {
+    private final InternalColumnEncryptionSetup colEncrSetup;
+    private final BlockCipher.Encryptor dataEncryptor;
+    private final BlockCipher.Encryptor metaDataEncryptor;
+    private final byte[] fileAAD ;
+
+    private byte[] dataPageHeaderAAD;
+    private byte[] dataPageAAD;
+    private byte[] dictPageHeaderAAD;
+    private byte[] dictPageAAD;
+
+    public EncryptorRunTime(InternalFileEncryptor fileEncryptor, ColumnChunkMetaData chunk,
+                            int blockId, int columnId) throws IOException  {
+      this.colEncrSetup = fileEncryptor.getColumnSetup(chunk.getPath(), true, toShortWithCheck(columnId));
+      this.dataEncryptor = colEncrSetup.getDataEncryptor();
+      this.metaDataEncryptor = colEncrSetup.getMetaDataEncryptor();
+
+      this.fileAAD = fileEncryptor.getFileAAD();
+      this.dataPageHeaderAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DataPageHeader,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+      this.dataPageAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DataPage,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+      this.dictPageHeaderAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DictionaryPageHeader,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+      this.dictPageAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DictionaryPage,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+    }
+
+    public InternalColumnEncryptionSetup getColEncrSetup() {
+      return this.colEncrSetup;
+    }
+
+    public BlockCipher.Encryptor getDataEncryptor() {
+      return this.dataEncryptor;
+    }
+
+    public BlockCipher.Encryptor getMetaDataEncryptor() {
+      return this.metaDataEncryptor;
+    }
+
+    public byte[] getFileAAD() {
+      return this.fileAAD;
+    }
+
+    public byte[] getDataPageHeaderAAD() {
+      return this.dataPageHeaderAAD;
+    }
+
+    public byte[] getDataPageAAD() {
+      return this.dataPageAAD;
+    }
+
+    public byte[] getDictPageHeaderAAD() {
+      return this.dictPageHeaderAAD;
+    }
+
+    public byte[] getDictPageAAD() {
+      return this.dictPageAAD;
+    }
+  }
+
+  private final int PAGE_BUFFER_SIZE = ParquetProperties.DEFAULT_PAGE_SIZE * 2;
+  private byte[] pageBuffer;
+  private Configuration conf;
+
+  public ColumnEncryptor(Configuration conf) {
+    this.pageBuffer = new byte[PAGE_BUFFER_SIZE];
+    this.conf = conf;
+  }
+
+  /**
+   * Given the input file, encrypt the columns specified by paths, and output the file.
+   * The encryption settings can be specified in the parameter of fileEncryptionProperties
+   * @param inputFile Input file
+   * @param outputFile Output file
+   * @param paths columns to be encrypted
+   * @param fileEncryptionProperties FileEncryptionProperties of the file
+   * @throws IOException
+   */
+  public void encryptColumns(String inputFile, String outputFile, List<String> paths, FileEncryptionProperties fileEncryptionProperties) throws IOException {
+    Path inPath = new Path(inputFile);
+    Path outPath = new Path(outputFile);
+
+    ParquetMetadata metaData = ParquetFileReader.readFooter(conf, inPath, NO_FILTER);
+    MessageType schema = metaData.getFileMetaData().getSchema();
+
+    ParquetFileWriter writer = new ParquetFileWriter(HadoopOutputFile.fromPath(outPath, conf), schema, ParquetFileWriter.Mode.OVERWRITE,
+      DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT, DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, DEFAULT_STATISTICS_TRUNCATE_LENGTH,
+      ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED, fileEncryptionProperties);
+    writer.start();
+
+    try (TransParquetFileReader reader = new TransParquetFileReader(HadoopInputFile.fromPath(inPath, conf), HadoopReadOptions.builder(conf).build())) {
+      processBlocks(reader, writer, metaData, schema, paths);
+    } catch (Exception e) {
+      LOG.error("Exception happened while processing blocks: ", e);
+    } finally {
+      try {
+        writer.end(metaData.getFileMetaData().getKeyValueMetaData());
+      } catch (Exception e) {
+        LOG.error("Exception happened when ending the writer: ", e);

Review comment:
       why an exception is not thrown here?

##########
File path: parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnEncryptor.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.InternalColumnEncryptionSetup;
+import org.apache.parquet.crypto.InternalFileEncryptor;
+import org.apache.parquet.format.BlockCipher;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH;
+import static org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
+import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;
+
+/**
+ * This class is for fast rewriting existing file with column encryption
+ *
+ * For columns to be encrypted, all the pages of those columns are read, but decompression/decoding,
+ * it is encrypted immediately and write back.
+ *
+ * For columns not to be encrypted, the whole column chunk will be appended directly to writer.
+ */
+public class ColumnEncryptor {
+  private static final Logger LOG = LoggerFactory.getLogger(ColumnEncryptor.class);
+
+  private static class EncryptorRunTime {
+    private final InternalColumnEncryptionSetup colEncrSetup;
+    private final BlockCipher.Encryptor dataEncryptor;
+    private final BlockCipher.Encryptor metaDataEncryptor;
+    private final byte[] fileAAD ;
+
+    private byte[] dataPageHeaderAAD;
+    private byte[] dataPageAAD;
+    private byte[] dictPageHeaderAAD;
+    private byte[] dictPageAAD;
+
+    public EncryptorRunTime(InternalFileEncryptor fileEncryptor, ColumnChunkMetaData chunk,
+                            int blockId, int columnId) throws IOException  {
+      this.colEncrSetup = fileEncryptor.getColumnSetup(chunk.getPath(), true, toShortWithCheck(columnId));
+      this.dataEncryptor = colEncrSetup.getDataEncryptor();
+      this.metaDataEncryptor = colEncrSetup.getMetaDataEncryptor();
+
+      this.fileAAD = fileEncryptor.getFileAAD();
+      this.dataPageHeaderAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DataPageHeader,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+      this.dataPageAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DataPage,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+      this.dictPageHeaderAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DictionaryPageHeader,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+      this.dictPageAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DictionaryPage,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+    }
+
+    public InternalColumnEncryptionSetup getColEncrSetup() {
+      return this.colEncrSetup;
+    }
+
+    public BlockCipher.Encryptor getDataEncryptor() {
+      return this.dataEncryptor;
+    }
+
+    public BlockCipher.Encryptor getMetaDataEncryptor() {
+      return this.metaDataEncryptor;
+    }
+
+    public byte[] getFileAAD() {
+      return this.fileAAD;
+    }
+
+    public byte[] getDataPageHeaderAAD() {
+      return this.dataPageHeaderAAD;
+    }
+
+    public byte[] getDataPageAAD() {
+      return this.dataPageAAD;
+    }
+
+    public byte[] getDictPageHeaderAAD() {
+      return this.dictPageHeaderAAD;
+    }
+
+    public byte[] getDictPageAAD() {
+      return this.dictPageAAD;
+    }
+  }
+
+  private final int PAGE_BUFFER_SIZE = ParquetProperties.DEFAULT_PAGE_SIZE * 2;
+  private byte[] pageBuffer;
+  private Configuration conf;
+
+  public ColumnEncryptor(Configuration conf) {
+    this.pageBuffer = new byte[PAGE_BUFFER_SIZE];
+    this.conf = conf;
+  }
+
+  /**
+   * Given the input file, encrypt the columns specified by paths, and output the file.
+   * The encryption settings can be specified in the parameter of fileEncryptionProperties
+   * @param inputFile Input file
+   * @param outputFile Output file
+   * @param paths columns to be encrypted
+   * @param fileEncryptionProperties FileEncryptionProperties of the file
+   * @throws IOException
+   */
+  public void encryptColumns(String inputFile, String outputFile, List<String> paths, FileEncryptionProperties fileEncryptionProperties) throws IOException {
+    Path inPath = new Path(inputFile);
+    Path outPath = new Path(outputFile);
+
+    ParquetMetadata metaData = ParquetFileReader.readFooter(conf, inPath, NO_FILTER);
+    MessageType schema = metaData.getFileMetaData().getSchema();
+
+    ParquetFileWriter writer = new ParquetFileWriter(HadoopOutputFile.fromPath(outPath, conf), schema, ParquetFileWriter.Mode.OVERWRITE,
+      DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT, DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, DEFAULT_STATISTICS_TRUNCATE_LENGTH,
+      ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED, fileEncryptionProperties);
+    writer.start();
+
+    try (TransParquetFileReader reader = new TransParquetFileReader(HadoopInputFile.fromPath(inPath, conf), HadoopReadOptions.builder(conf).build())) {
+      processBlocks(reader, writer, metaData, schema, paths);
+    } catch (Exception e) {
+      LOG.error("Exception happened while processing blocks: ", e);
+    } finally {
+      try {
+        writer.end(metaData.getFileMetaData().getKeyValueMetaData());
+      } catch (Exception e) {
+        LOG.error("Exception happened when ending the writer: ", e);
+      }
+    }
+  }
+
+  private void processBlocks(TransParquetFileReader reader, ParquetFileWriter writer, ParquetMetadata meta,
+                            MessageType schema, List<String> paths) throws IOException {
+    Set<ColumnPath> nullifyColumns = convertToColumnPaths(paths);
+    int blockId = 0;
+    PageReadStore store = reader.readNextRowGroup();
+
+    while (store != null) {
+      writer.startBlock(store.getRowCount());
+
+      List<ColumnChunkMetaData> columnsInOrder = meta.getBlocks().get(blockId).getColumns();
+      Map<ColumnPath, ColumnDescriptor> descriptorsMap = schema.getColumns().stream().collect(
+        Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
+
+      for (int i = 0; i < columnsInOrder.size(); i += 1) {
+        ColumnChunkMetaData chunk = columnsInOrder.get(i);
+        ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
+        processChunk(descriptor, chunk, reader, writer, nullifyColumns, blockId, i, meta.getFileMetaData().getCreatedBy());
+      }
+
+      writer.endBlock();
+      store = reader.readNextRowGroup();
+      blockId++;
+    }
+  }
+
+  private void processChunk(ColumnDescriptor descriptor, ColumnChunkMetaData chunk, TransParquetFileReader reader, ParquetFileWriter writer,
+                            Set<ColumnPath> paths, int blockId, int columnId, String createdBy) throws IOException {
+    reader.setStreamPosition(chunk.getStartingPos());
+    if (paths.contains(chunk.getPath())) {
+      writer.startColumn(descriptor, chunk.getValueCount(), chunk.getCodec());
+      encryptPages(reader, chunk, writer, createdBy, blockId, columnId);
+      writer.endColumn();
+    } else {
+      BloomFilter bloomFilter = reader.readBloomFilter(chunk);
+      ColumnIndex columnIndex = reader.readColumnIndex(chunk);
+      OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
+      writer.appendColumnChunk(descriptor, reader.getStream(), chunk, bloomFilter, columnIndex, offsetIndex);
+    }
+  }
+
+  private void encryptPages(TransParquetFileReader reader, ColumnChunkMetaData chunk,
+                             ParquetFileWriter writer, String createdBy, int blockId, int columnId) throws IOException {
+    short pageOrdinal = 0;

Review comment:
       all ordinals were changed from short to int. the current AAD functions work with ints (and checks them for not exceeding max short value).

##########
File path: parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnEncryptor.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.InternalColumnEncryptionSetup;
+import org.apache.parquet.crypto.InternalFileEncryptor;
+import org.apache.parquet.format.BlockCipher;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH;
+import static org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
+import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;
+
+/**
+ * This class is for fast rewriting existing file with column encryption
+ *
+ * For columns to be encrypted, all the pages of those columns are read, but decompression/decoding,
+ * it is encrypted immediately and write back.
+ *
+ * For columns not to be encrypted, the whole column chunk will be appended directly to writer.
+ */
+public class ColumnEncryptor {
+  private static final Logger LOG = LoggerFactory.getLogger(ColumnEncryptor.class);
+
+  private static class EncryptorRunTime {
+    private final InternalColumnEncryptionSetup colEncrSetup;
+    private final BlockCipher.Encryptor dataEncryptor;
+    private final BlockCipher.Encryptor metaDataEncryptor;
+    private final byte[] fileAAD ;
+
+    private byte[] dataPageHeaderAAD;
+    private byte[] dataPageAAD;
+    private byte[] dictPageHeaderAAD;
+    private byte[] dictPageAAD;
+
+    public EncryptorRunTime(InternalFileEncryptor fileEncryptor, ColumnChunkMetaData chunk,
+                            int blockId, int columnId) throws IOException  {
+      this.colEncrSetup = fileEncryptor.getColumnSetup(chunk.getPath(), true, toShortWithCheck(columnId));
+      this.dataEncryptor = colEncrSetup.getDataEncryptor();
+      this.metaDataEncryptor = colEncrSetup.getMetaDataEncryptor();
+
+      this.fileAAD = fileEncryptor.getFileAAD();
+      this.dataPageHeaderAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DataPageHeader,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+      this.dataPageAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DataPage,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+      this.dictPageHeaderAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DictionaryPageHeader,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+      this.dictPageAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DictionaryPage,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+    }
+
+    public InternalColumnEncryptionSetup getColEncrSetup() {
+      return this.colEncrSetup;
+    }
+
+    public BlockCipher.Encryptor getDataEncryptor() {
+      return this.dataEncryptor;
+    }
+
+    public BlockCipher.Encryptor getMetaDataEncryptor() {
+      return this.metaDataEncryptor;
+    }
+
+    public byte[] getFileAAD() {
+      return this.fileAAD;
+    }
+
+    public byte[] getDataPageHeaderAAD() {
+      return this.dataPageHeaderAAD;
+    }
+
+    public byte[] getDataPageAAD() {
+      return this.dataPageAAD;
+    }
+
+    public byte[] getDictPageHeaderAAD() {
+      return this.dictPageHeaderAAD;
+    }
+
+    public byte[] getDictPageAAD() {
+      return this.dictPageAAD;
+    }
+  }
+
+  private final int PAGE_BUFFER_SIZE = ParquetProperties.DEFAULT_PAGE_SIZE * 2;
+  private byte[] pageBuffer;
+  private Configuration conf;
+
+  public ColumnEncryptor(Configuration conf) {
+    this.pageBuffer = new byte[PAGE_BUFFER_SIZE];
+    this.conf = conf;
+  }
+
+  /**
+   * Given the input file, encrypt the columns specified by paths, and output the file.
+   * The encryption settings can be specified in the parameter of fileEncryptionProperties
+   * @param inputFile Input file
+   * @param outputFile Output file
+   * @param paths columns to be encrypted
+   * @param fileEncryptionProperties FileEncryptionProperties of the file
+   * @throws IOException
+   */
+  public void encryptColumns(String inputFile, String outputFile, List<String> paths, FileEncryptionProperties fileEncryptionProperties) throws IOException {
+    Path inPath = new Path(inputFile);
+    Path outPath = new Path(outputFile);
+
+    ParquetMetadata metaData = ParquetFileReader.readFooter(conf, inPath, NO_FILTER);
+    MessageType schema = metaData.getFileMetaData().getSchema();
+
+    ParquetFileWriter writer = new ParquetFileWriter(HadoopOutputFile.fromPath(outPath, conf), schema, ParquetFileWriter.Mode.OVERWRITE,
+      DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT, DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, DEFAULT_STATISTICS_TRUNCATE_LENGTH,
+      ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED, fileEncryptionProperties);
+    writer.start();
+
+    try (TransParquetFileReader reader = new TransParquetFileReader(HadoopInputFile.fromPath(inPath, conf), HadoopReadOptions.builder(conf).build())) {
+      processBlocks(reader, writer, metaData, schema, paths);
+    } catch (Exception e) {
+      LOG.error("Exception happened while processing blocks: ", e);
+    } finally {
+      try {
+        writer.end(metaData.getFileMetaData().getKeyValueMetaData());
+      } catch (Exception e) {
+        LOG.error("Exception happened when ending the writer: ", e);
+      }
+    }
+  }
+
+  private void processBlocks(TransParquetFileReader reader, ParquetFileWriter writer, ParquetMetadata meta,
+                            MessageType schema, List<String> paths) throws IOException {
+    Set<ColumnPath> nullifyColumns = convertToColumnPaths(paths);
+    int blockId = 0;
+    PageReadStore store = reader.readNextRowGroup();
+
+    while (store != null) {
+      writer.startBlock(store.getRowCount());
+
+      List<ColumnChunkMetaData> columnsInOrder = meta.getBlocks().get(blockId).getColumns();
+      Map<ColumnPath, ColumnDescriptor> descriptorsMap = schema.getColumns().stream().collect(
+        Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
+
+      for (int i = 0; i < columnsInOrder.size(); i += 1) {
+        ColumnChunkMetaData chunk = columnsInOrder.get(i);
+        ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
+        processChunk(descriptor, chunk, reader, writer, nullifyColumns, blockId, i, meta.getFileMetaData().getCreatedBy());
+      }
+
+      writer.endBlock();
+      store = reader.readNextRowGroup();
+      blockId++;
+    }
+  }
+
+  private void processChunk(ColumnDescriptor descriptor, ColumnChunkMetaData chunk, TransParquetFileReader reader, ParquetFileWriter writer,
+                            Set<ColumnPath> paths, int blockId, int columnId, String createdBy) throws IOException {
+    reader.setStreamPosition(chunk.getStartingPos());
+    if (paths.contains(chunk.getPath())) {

Review comment:
       a quick comment that explains this if/else logic?

##########
File path: parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnEncryptor.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.InternalColumnEncryptionSetup;
+import org.apache.parquet.crypto.InternalFileEncryptor;
+import org.apache.parquet.format.BlockCipher;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH;
+import static org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
+import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;
+
+/**
+ * This class is for fast rewriting existing file with column encryption
+ *
+ * For columns to be encrypted, all the pages of those columns are read, but decompression/decoding,
+ * it is encrypted immediately and write back.
+ *
+ * For columns not to be encrypted, the whole column chunk will be appended directly to writer.
+ */
+public class ColumnEncryptor {
+  private static final Logger LOG = LoggerFactory.getLogger(ColumnEncryptor.class);
+
+  private static class EncryptorRunTime {
+    private final InternalColumnEncryptionSetup colEncrSetup;
+    private final BlockCipher.Encryptor dataEncryptor;
+    private final BlockCipher.Encryptor metaDataEncryptor;
+    private final byte[] fileAAD ;
+
+    private byte[] dataPageHeaderAAD;
+    private byte[] dataPageAAD;
+    private byte[] dictPageHeaderAAD;
+    private byte[] dictPageAAD;
+
+    public EncryptorRunTime(InternalFileEncryptor fileEncryptor, ColumnChunkMetaData chunk,
+                            int blockId, int columnId) throws IOException  {
+      this.colEncrSetup = fileEncryptor.getColumnSetup(chunk.getPath(), true, toShortWithCheck(columnId));
+      this.dataEncryptor = colEncrSetup.getDataEncryptor();
+      this.metaDataEncryptor = colEncrSetup.getMetaDataEncryptor();
+
+      this.fileAAD = fileEncryptor.getFileAAD();
+      this.dataPageHeaderAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DataPageHeader,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+      this.dataPageAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DataPage,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+      this.dictPageHeaderAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DictionaryPageHeader,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+      this.dictPageAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DictionaryPage,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+    }
+
+    public InternalColumnEncryptionSetup getColEncrSetup() {
+      return this.colEncrSetup;
+    }
+
+    public BlockCipher.Encryptor getDataEncryptor() {
+      return this.dataEncryptor;
+    }
+
+    public BlockCipher.Encryptor getMetaDataEncryptor() {
+      return this.metaDataEncryptor;
+    }
+
+    public byte[] getFileAAD() {
+      return this.fileAAD;
+    }
+
+    public byte[] getDataPageHeaderAAD() {
+      return this.dataPageHeaderAAD;
+    }
+
+    public byte[] getDataPageAAD() {
+      return this.dataPageAAD;
+    }
+
+    public byte[] getDictPageHeaderAAD() {
+      return this.dictPageHeaderAAD;
+    }
+
+    public byte[] getDictPageAAD() {
+      return this.dictPageAAD;
+    }
+  }
+
+  private final int PAGE_BUFFER_SIZE = ParquetProperties.DEFAULT_PAGE_SIZE * 2;
+  private byte[] pageBuffer;
+  private Configuration conf;
+
+  public ColumnEncryptor(Configuration conf) {
+    this.pageBuffer = new byte[PAGE_BUFFER_SIZE];
+    this.conf = conf;
+  }
+
+  /**
+   * Given the input file, encrypt the columns specified by paths, and output the file.
+   * The encryption settings can be specified in the parameter of fileEncryptionProperties
+   * @param inputFile Input file
+   * @param outputFile Output file
+   * @param paths columns to be encrypted
+   * @param fileEncryptionProperties FileEncryptionProperties of the file
+   * @throws IOException
+   */
+  public void encryptColumns(String inputFile, String outputFile, List<String> paths, FileEncryptionProperties fileEncryptionProperties) throws IOException {
+    Path inPath = new Path(inputFile);
+    Path outPath = new Path(outputFile);
+
+    ParquetMetadata metaData = ParquetFileReader.readFooter(conf, inPath, NO_FILTER);
+    MessageType schema = metaData.getFileMetaData().getSchema();
+
+    ParquetFileWriter writer = new ParquetFileWriter(HadoopOutputFile.fromPath(outPath, conf), schema, ParquetFileWriter.Mode.OVERWRITE,
+      DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT, DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, DEFAULT_STATISTICS_TRUNCATE_LENGTH,
+      ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED, fileEncryptionProperties);
+    writer.start();
+
+    try (TransParquetFileReader reader = new TransParquetFileReader(HadoopInputFile.fromPath(inPath, conf), HadoopReadOptions.builder(conf).build())) {
+      processBlocks(reader, writer, metaData, schema, paths);
+    } catch (Exception e) {
+      LOG.error("Exception happened while processing blocks: ", e);
+    } finally {
+      try {
+        writer.end(metaData.getFileMetaData().getKeyValueMetaData());
+      } catch (Exception e) {
+        LOG.error("Exception happened when ending the writer: ", e);
+      }
+    }
+  }
+
+  private void processBlocks(TransParquetFileReader reader, ParquetFileWriter writer, ParquetMetadata meta,
+                            MessageType schema, List<String> paths) throws IOException {
+    Set<ColumnPath> nullifyColumns = convertToColumnPaths(paths);

Review comment:
       `nullifyColumns` -> `encryptedColumns` ?

##########
File path: parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/ColumnEncryptorTest.java
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.crypto.ParquetCipher;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.SeekableInputStream;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ColumnEncryptorTest {
+
+  private Configuration conf = new Configuration();
+  private Map<String, String> extraMeta = ImmutableMap.of("key1", "value1", "key2", "value2");
+  private ColumnEncryptor columnEncryptor = null;
+  private final int numRecord = 100000;
+  private String inputFile = null;
+  private String outputFile = null;
+  private TestFileHelper.TestDocs testDocs = null;
+
+  @Before
+  public void testSetup() throws Exception {
+    columnEncryptor = new ColumnEncryptor(conf);
+    testDocs = new TestFileHelper.TestDocs(numRecord);
+    inputFile = TestFileHelper.createParquetFile(conf, extraMeta, numRecord, "input", "GZIP",
+      ParquetProperties.WriterVersion.PARQUET_1_0, ParquetProperties.DEFAULT_PAGE_SIZE, testDocs);
+    outputFile = TestFileHelper.createTempFile("test");
+  }
+
+  @Test
+  public void testFlatColumn() throws IOException {
+    String[] encryptColumns = {"DocId"};
+    columnEncryptor.encryptColumns(inputFile, outputFile, Arrays.asList(encryptColumns),
+      EncDecProperties.getFileEncryptionProperties(encryptColumns, ParquetCipher.AES_GCM_CTR_V1));

Review comment:
       Can you also add tests with the AES_GCM_V1 cipher?

##########
File path: parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnEncryptor.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.InternalColumnEncryptionSetup;
+import org.apache.parquet.crypto.InternalFileEncryptor;
+import org.apache.parquet.format.BlockCipher;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH;
+import static org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
+import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;
+
+/**
+ * This class is for fast rewriting existing file with column encryption
+ *
+ * For columns to be encrypted, all the pages of those columns are read, but decompression/decoding,
+ * it is encrypted immediately and write back.
+ *
+ * For columns not to be encrypted, the whole column chunk will be appended directly to writer.
+ */
+public class ColumnEncryptor {
+  private static final Logger LOG = LoggerFactory.getLogger(ColumnEncryptor.class);
+
+  private static class EncryptorRunTime {
+    private final InternalColumnEncryptionSetup colEncrSetup;
+    private final BlockCipher.Encryptor dataEncryptor;
+    private final BlockCipher.Encryptor metaDataEncryptor;
+    private final byte[] fileAAD ;
+
+    private byte[] dataPageHeaderAAD;
+    private byte[] dataPageAAD;
+    private byte[] dictPageHeaderAAD;
+    private byte[] dictPageAAD;
+
+    public EncryptorRunTime(InternalFileEncryptor fileEncryptor, ColumnChunkMetaData chunk,
+                            int blockId, int columnId) throws IOException  {
+      this.colEncrSetup = fileEncryptor.getColumnSetup(chunk.getPath(), true, toShortWithCheck(columnId));
+      this.dataEncryptor = colEncrSetup.getDataEncryptor();
+      this.metaDataEncryptor = colEncrSetup.getMetaDataEncryptor();
+
+      this.fileAAD = fileEncryptor.getFileAAD();
+      this.dataPageHeaderAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DataPageHeader,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+      this.dataPageAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DataPage,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+      this.dictPageHeaderAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DictionaryPageHeader,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+      this.dictPageAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DictionaryPage,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+    }
+
+    public InternalColumnEncryptionSetup getColEncrSetup() {
+      return this.colEncrSetup;
+    }
+
+    public BlockCipher.Encryptor getDataEncryptor() {
+      return this.dataEncryptor;
+    }
+
+    public BlockCipher.Encryptor getMetaDataEncryptor() {
+      return this.metaDataEncryptor;
+    }
+
+    public byte[] getFileAAD() {
+      return this.fileAAD;
+    }
+
+    public byte[] getDataPageHeaderAAD() {
+      return this.dataPageHeaderAAD;
+    }
+
+    public byte[] getDataPageAAD() {
+      return this.dataPageAAD;
+    }
+
+    public byte[] getDictPageHeaderAAD() {
+      return this.dictPageHeaderAAD;
+    }
+
+    public byte[] getDictPageAAD() {
+      return this.dictPageAAD;
+    }
+  }
+
+  private final int PAGE_BUFFER_SIZE = ParquetProperties.DEFAULT_PAGE_SIZE * 2;
+  private byte[] pageBuffer;
+  private Configuration conf;
+
+  public ColumnEncryptor(Configuration conf) {
+    this.pageBuffer = new byte[PAGE_BUFFER_SIZE];
+    this.conf = conf;
+  }
+
+  /**
+   * Given the input file, encrypt the columns specified by paths, and output the file.
+   * The encryption settings can be specified in the parameter of fileEncryptionProperties
+   * @param inputFile Input file
+   * @param outputFile Output file
+   * @param paths columns to be encrypted
+   * @param fileEncryptionProperties FileEncryptionProperties of the file
+   * @throws IOException
+   */
+  public void encryptColumns(String inputFile, String outputFile, List<String> paths, FileEncryptionProperties fileEncryptionProperties) throws IOException {
+    Path inPath = new Path(inputFile);
+    Path outPath = new Path(outputFile);
+
+    ParquetMetadata metaData = ParquetFileReader.readFooter(conf, inPath, NO_FILTER);
+    MessageType schema = metaData.getFileMetaData().getSchema();
+
+    ParquetFileWriter writer = new ParquetFileWriter(HadoopOutputFile.fromPath(outPath, conf), schema, ParquetFileWriter.Mode.OVERWRITE,
+      DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT, DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, DEFAULT_STATISTICS_TRUNCATE_LENGTH,
+      ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED, fileEncryptionProperties);
+    writer.start();
+
+    try (TransParquetFileReader reader = new TransParquetFileReader(HadoopInputFile.fromPath(inPath, conf), HadoopReadOptions.builder(conf).build())) {
+      processBlocks(reader, writer, metaData, schema, paths);
+    } catch (Exception e) {
+      LOG.error("Exception happened while processing blocks: ", e);
+    } finally {
+      try {
+        writer.end(metaData.getFileMetaData().getKeyValueMetaData());
+      } catch (Exception e) {
+        LOG.error("Exception happened when ending the writer: ", e);
+      }
+    }
+  }
+
+  private void processBlocks(TransParquetFileReader reader, ParquetFileWriter writer, ParquetMetadata meta,
+                            MessageType schema, List<String> paths) throws IOException {
+    Set<ColumnPath> nullifyColumns = convertToColumnPaths(paths);
+    int blockId = 0;
+    PageReadStore store = reader.readNextRowGroup();
+
+    while (store != null) {
+      writer.startBlock(store.getRowCount());
+
+      List<ColumnChunkMetaData> columnsInOrder = meta.getBlocks().get(blockId).getColumns();
+      Map<ColumnPath, ColumnDescriptor> descriptorsMap = schema.getColumns().stream().collect(
+        Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
+
+      for (int i = 0; i < columnsInOrder.size(); i += 1) {
+        ColumnChunkMetaData chunk = columnsInOrder.get(i);
+        ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
+        processChunk(descriptor, chunk, reader, writer, nullifyColumns, blockId, i, meta.getFileMetaData().getCreatedBy());
+      }
+
+      writer.endBlock();
+      store = reader.readNextRowGroup();
+      blockId++;
+    }
+  }
+
+  private void processChunk(ColumnDescriptor descriptor, ColumnChunkMetaData chunk, TransParquetFileReader reader, ParquetFileWriter writer,
+                            Set<ColumnPath> paths, int blockId, int columnId, String createdBy) throws IOException {
+    reader.setStreamPosition(chunk.getStartingPos());
+    if (paths.contains(chunk.getPath())) {
+      writer.startColumn(descriptor, chunk.getValueCount(), chunk.getCodec());
+      encryptPages(reader, chunk, writer, createdBy, blockId, columnId);
+      writer.endColumn();
+    } else {
+      BloomFilter bloomFilter = reader.readBloomFilter(chunk);
+      ColumnIndex columnIndex = reader.readColumnIndex(chunk);
+      OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
+      writer.appendColumnChunk(descriptor, reader.getStream(), chunk, bloomFilter, columnIndex, offsetIndex);
+    }
+  }
+
+  private void encryptPages(TransParquetFileReader reader, ColumnChunkMetaData chunk,
+                             ParquetFileWriter writer, String createdBy, int blockId, int columnId) throws IOException {
+    short pageOrdinal = 0;
+    EncryptorRunTime encryptorRunTime = new EncryptorRunTime(writer.getEncryptor(), chunk, blockId, columnId);
+    DictionaryPage dictionaryPage = null;
+    long readValues = 0;
+    ParquetMetadataConverter converter = new ParquetMetadataConverter();
+    long totalChunkValues = chunk.getValueCount();
+    while (readValues < totalChunkValues) {
+      if (Short.MAX_VALUE == pageOrdinal) {

Review comment:
       no need in this check.

##########
File path: parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/EncDecProperties.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util;
+
+import org.apache.parquet.crypto.ColumnEncryptionProperties;
+import org.apache.parquet.crypto.DecryptionKeyRetriever;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.ParquetCipher;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class EncDecProperties {
+
+  public static class DecryptionKeyRetrieverMock implements DecryptionKeyRetriever {
+    private final Map<String, byte[]> keyMap = new HashMap<>();
+
+    public DecryptionKeyRetrieverMock putKey(String keyId, byte[] keyBytes) {
+      keyMap.put(keyId, keyBytes);
+      return this;
+    }
+
+    @Override
+    public byte[] getKey(byte[] keyMetaData) {
+      String keyId = new String(keyMetaData, StandardCharsets.UTF_8);
+      return keyMap.get(keyId);
+    }
+  }
+
+  private static final byte[] FOOTER_KEY = {0x01, 0x02, 0x03, 0x4, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a,
+    0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x10};
+  private static final byte[] FOOTER_KEY_METADATA = "footkey".getBytes(StandardCharsets.UTF_8);
+  private static final byte[] COL_KEY = {0x02, 0x03, 0x4, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b,
+    0x0c, 0x0d, 0x0e, 0x0f, 0x10, 0x11};
+  private static final byte[] COL_KEY_METADATA = "col".getBytes(StandardCharsets.UTF_8);
+
+  public static FileDecryptionProperties getFileDecryptionProperties() throws IOException {
+    DecryptionKeyRetrieverMock keyRetriever = new DecryptionKeyRetrieverMock();
+    keyRetriever.putKey("footkey", FOOTER_KEY);
+    keyRetriever.putKey("col", COL_KEY);
+    return FileDecryptionProperties.builder().withPlaintextFilesAllowed().withoutFooterSignatureVerification().withKeyRetriever(keyRetriever).build();

Review comment:
       why testing `withoutFooterSignatureVerification()`?

##########
File path: parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnEncryptor.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.InternalColumnEncryptionSetup;
+import org.apache.parquet.crypto.InternalFileEncryptor;
+import org.apache.parquet.format.BlockCipher;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH;
+import static org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
+import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;
+
+/**
+ * This class is for fast rewriting existing file with column encryption
+ *
+ * For columns to be encrypted, all the pages of those columns are read, but decompression/decoding,
+ * it is encrypted immediately and write back.
+ *
+ * For columns not to be encrypted, the whole column chunk will be appended directly to writer.
+ */
+public class ColumnEncryptor {
+  private static final Logger LOG = LoggerFactory.getLogger(ColumnEncryptor.class);
+
+  private static class EncryptorRunTime {
+    private final InternalColumnEncryptionSetup colEncrSetup;
+    private final BlockCipher.Encryptor dataEncryptor;
+    private final BlockCipher.Encryptor metaDataEncryptor;
+    private final byte[] fileAAD ;
+
+    private byte[] dataPageHeaderAAD;
+    private byte[] dataPageAAD;
+    private byte[] dictPageHeaderAAD;
+    private byte[] dictPageAAD;
+
+    public EncryptorRunTime(InternalFileEncryptor fileEncryptor, ColumnChunkMetaData chunk,
+                            int blockId, int columnId) throws IOException  {
+      this.colEncrSetup = fileEncryptor.getColumnSetup(chunk.getPath(), true, toShortWithCheck(columnId));
+      this.dataEncryptor = colEncrSetup.getDataEncryptor();
+      this.metaDataEncryptor = colEncrSetup.getMetaDataEncryptor();
+
+      this.fileAAD = fileEncryptor.getFileAAD();
+      this.dataPageHeaderAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DataPageHeader,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+      this.dataPageAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DataPage,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+      this.dictPageHeaderAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DictionaryPageHeader,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+      this.dictPageAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DictionaryPage,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+    }
+
+    public InternalColumnEncryptionSetup getColEncrSetup() {
+      return this.colEncrSetup;
+    }
+
+    public BlockCipher.Encryptor getDataEncryptor() {
+      return this.dataEncryptor;
+    }
+
+    public BlockCipher.Encryptor getMetaDataEncryptor() {
+      return this.metaDataEncryptor;
+    }
+
+    public byte[] getFileAAD() {
+      return this.fileAAD;
+    }
+
+    public byte[] getDataPageHeaderAAD() {
+      return this.dataPageHeaderAAD;
+    }
+
+    public byte[] getDataPageAAD() {
+      return this.dataPageAAD;
+    }
+
+    public byte[] getDictPageHeaderAAD() {
+      return this.dictPageHeaderAAD;
+    }
+
+    public byte[] getDictPageAAD() {
+      return this.dictPageAAD;
+    }
+  }
+
+  private final int PAGE_BUFFER_SIZE = ParquetProperties.DEFAULT_PAGE_SIZE * 2;
+  private byte[] pageBuffer;
+  private Configuration conf;
+
+  public ColumnEncryptor(Configuration conf) {
+    this.pageBuffer = new byte[PAGE_BUFFER_SIZE];
+    this.conf = conf;
+  }
+
+  /**
+   * Given the input file, encrypt the columns specified by paths, and output the file.
+   * The encryption settings can be specified in the parameter of fileEncryptionProperties
+   * @param inputFile Input file
+   * @param outputFile Output file
+   * @param paths columns to be encrypted
+   * @param fileEncryptionProperties FileEncryptionProperties of the file
+   * @throws IOException
+   */
+  public void encryptColumns(String inputFile, String outputFile, List<String> paths, FileEncryptionProperties fileEncryptionProperties) throws IOException {
+    Path inPath = new Path(inputFile);
+    Path outPath = new Path(outputFile);
+
+    ParquetMetadata metaData = ParquetFileReader.readFooter(conf, inPath, NO_FILTER);
+    MessageType schema = metaData.getFileMetaData().getSchema();
+
+    ParquetFileWriter writer = new ParquetFileWriter(HadoopOutputFile.fromPath(outPath, conf), schema, ParquetFileWriter.Mode.OVERWRITE,
+      DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT, DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, DEFAULT_STATISTICS_TRUNCATE_LENGTH,
+      ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED, fileEncryptionProperties);
+    writer.start();
+
+    try (TransParquetFileReader reader = new TransParquetFileReader(HadoopInputFile.fromPath(inPath, conf), HadoopReadOptions.builder(conf).build())) {
+      processBlocks(reader, writer, metaData, schema, paths);
+    } catch (Exception e) {
+      LOG.error("Exception happened while processing blocks: ", e);
+    } finally {
+      try {
+        writer.end(metaData.getFileMetaData().getKeyValueMetaData());
+      } catch (Exception e) {
+        LOG.error("Exception happened when ending the writer: ", e);
+      }
+    }
+  }
+
+  private void processBlocks(TransParquetFileReader reader, ParquetFileWriter writer, ParquetMetadata meta,
+                            MessageType schema, List<String> paths) throws IOException {
+    Set<ColumnPath> nullifyColumns = convertToColumnPaths(paths);
+    int blockId = 0;
+    PageReadStore store = reader.readNextRowGroup();
+
+    while (store != null) {
+      writer.startBlock(store.getRowCount());
+
+      List<ColumnChunkMetaData> columnsInOrder = meta.getBlocks().get(blockId).getColumns();
+      Map<ColumnPath, ColumnDescriptor> descriptorsMap = schema.getColumns().stream().collect(
+        Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
+
+      for (int i = 0; i < columnsInOrder.size(); i += 1) {
+        ColumnChunkMetaData chunk = columnsInOrder.get(i);
+        ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
+        processChunk(descriptor, chunk, reader, writer, nullifyColumns, blockId, i, meta.getFileMetaData().getCreatedBy());
+      }
+
+      writer.endBlock();
+      store = reader.readNextRowGroup();
+      blockId++;
+    }
+  }
+
+  private void processChunk(ColumnDescriptor descriptor, ColumnChunkMetaData chunk, TransParquetFileReader reader, ParquetFileWriter writer,
+                            Set<ColumnPath> paths, int blockId, int columnId, String createdBy) throws IOException {
+    reader.setStreamPosition(chunk.getStartingPos());
+    if (paths.contains(chunk.getPath())) {
+      writer.startColumn(descriptor, chunk.getValueCount(), chunk.getCodec());
+      encryptPages(reader, chunk, writer, createdBy, blockId, columnId);
+      writer.endColumn();
+    } else {
+      BloomFilter bloomFilter = reader.readBloomFilter(chunk);
+      ColumnIndex columnIndex = reader.readColumnIndex(chunk);
+      OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);

Review comment:
       do offsets need to be updated in these chunks? If encryption was applied on previous chunks, their page headers and pages become larger, so everything shifts.

##########
File path: parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnEncryptor.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.InternalColumnEncryptionSetup;
+import org.apache.parquet.crypto.InternalFileEncryptor;
+import org.apache.parquet.format.BlockCipher;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH;
+import static org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
+import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;
+
+/**
+ * This class is for fast rewriting existing file with column encryption
+ *
+ * For columns to be encrypted, all the pages of those columns are read, but decompression/decoding,
+ * it is encrypted immediately and write back.
+ *
+ * For columns not to be encrypted, the whole column chunk will be appended directly to writer.
+ */
+public class ColumnEncryptor {
+  private static final Logger LOG = LoggerFactory.getLogger(ColumnEncryptor.class);
+
+  private static class EncryptorRunTime {
+    private final InternalColumnEncryptionSetup colEncrSetup;
+    private final BlockCipher.Encryptor dataEncryptor;
+    private final BlockCipher.Encryptor metaDataEncryptor;
+    private final byte[] fileAAD ;
+
+    private byte[] dataPageHeaderAAD;
+    private byte[] dataPageAAD;
+    private byte[] dictPageHeaderAAD;
+    private byte[] dictPageAAD;
+
+    public EncryptorRunTime(InternalFileEncryptor fileEncryptor, ColumnChunkMetaData chunk,
+                            int blockId, int columnId) throws IOException  {
+      this.colEncrSetup = fileEncryptor.getColumnSetup(chunk.getPath(), true, toShortWithCheck(columnId));
+      this.dataEncryptor = colEncrSetup.getDataEncryptor();
+      this.metaDataEncryptor = colEncrSetup.getMetaDataEncryptor();
+
+      this.fileAAD = fileEncryptor.getFileAAD();
+      this.dataPageHeaderAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DataPageHeader,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+      this.dataPageAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DataPage,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+      this.dictPageHeaderAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DictionaryPageHeader,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+      this.dictPageAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DictionaryPage,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+    }
+
+    public InternalColumnEncryptionSetup getColEncrSetup() {
+      return this.colEncrSetup;
+    }
+
+    public BlockCipher.Encryptor getDataEncryptor() {
+      return this.dataEncryptor;
+    }
+
+    public BlockCipher.Encryptor getMetaDataEncryptor() {
+      return this.metaDataEncryptor;
+    }
+
+    public byte[] getFileAAD() {
+      return this.fileAAD;
+    }
+
+    public byte[] getDataPageHeaderAAD() {
+      return this.dataPageHeaderAAD;
+    }
+
+    public byte[] getDataPageAAD() {
+      return this.dataPageAAD;
+    }
+
+    public byte[] getDictPageHeaderAAD() {
+      return this.dictPageHeaderAAD;
+    }
+
+    public byte[] getDictPageAAD() {
+      return this.dictPageAAD;
+    }
+  }
+
+  private final int PAGE_BUFFER_SIZE = ParquetProperties.DEFAULT_PAGE_SIZE * 2;
+  private byte[] pageBuffer;
+  private Configuration conf;
+
+  public ColumnEncryptor(Configuration conf) {
+    this.pageBuffer = new byte[PAGE_BUFFER_SIZE];
+    this.conf = conf;
+  }
+
+  /**
+   * Given the input file, encrypt the columns specified by paths, and output the file.
+   * The encryption settings can be specified in the parameter of fileEncryptionProperties
+   * @param inputFile Input file
+   * @param outputFile Output file
+   * @param paths columns to be encrypted
+   * @param fileEncryptionProperties FileEncryptionProperties of the file
+   * @throws IOException
+   */
+  public void encryptColumns(String inputFile, String outputFile, List<String> paths, FileEncryptionProperties fileEncryptionProperties) throws IOException {
+    Path inPath = new Path(inputFile);
+    Path outPath = new Path(outputFile);
+
+    ParquetMetadata metaData = ParquetFileReader.readFooter(conf, inPath, NO_FILTER);
+    MessageType schema = metaData.getFileMetaData().getSchema();
+
+    ParquetFileWriter writer = new ParquetFileWriter(HadoopOutputFile.fromPath(outPath, conf), schema, ParquetFileWriter.Mode.OVERWRITE,
+      DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT, DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, DEFAULT_STATISTICS_TRUNCATE_LENGTH,
+      ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED, fileEncryptionProperties);
+    writer.start();
+
+    try (TransParquetFileReader reader = new TransParquetFileReader(HadoopInputFile.fromPath(inPath, conf), HadoopReadOptions.builder(conf).build())) {
+      processBlocks(reader, writer, metaData, schema, paths);
+    } catch (Exception e) {
+      LOG.error("Exception happened while processing blocks: ", e);
+    } finally {
+      try {
+        writer.end(metaData.getFileMetaData().getKeyValueMetaData());
+      } catch (Exception e) {
+        LOG.error("Exception happened when ending the writer: ", e);
+      }
+    }
+  }
+
+  private void processBlocks(TransParquetFileReader reader, ParquetFileWriter writer, ParquetMetadata meta,
+                            MessageType schema, List<String> paths) throws IOException {
+    Set<ColumnPath> nullifyColumns = convertToColumnPaths(paths);
+    int blockId = 0;
+    PageReadStore store = reader.readNextRowGroup();
+
+    while (store != null) {
+      writer.startBlock(store.getRowCount());
+
+      List<ColumnChunkMetaData> columnsInOrder = meta.getBlocks().get(blockId).getColumns();
+      Map<ColumnPath, ColumnDescriptor> descriptorsMap = schema.getColumns().stream().collect(
+        Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
+
+      for (int i = 0; i < columnsInOrder.size(); i += 1) {
+        ColumnChunkMetaData chunk = columnsInOrder.get(i);
+        ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
+        processChunk(descriptor, chunk, reader, writer, nullifyColumns, blockId, i, meta.getFileMetaData().getCreatedBy());
+      }
+
+      writer.endBlock();
+      store = reader.readNextRowGroup();
+      blockId++;
+    }
+  }
+
+  private void processChunk(ColumnDescriptor descriptor, ColumnChunkMetaData chunk, TransParquetFileReader reader, ParquetFileWriter writer,
+                            Set<ColumnPath> paths, int blockId, int columnId, String createdBy) throws IOException {
+    reader.setStreamPosition(chunk.getStartingPos());
+    if (paths.contains(chunk.getPath())) {
+      writer.startColumn(descriptor, chunk.getValueCount(), chunk.getCodec());
+      encryptPages(reader, chunk, writer, createdBy, blockId, columnId);
+      writer.endColumn();
+    } else {
+      BloomFilter bloomFilter = reader.readBloomFilter(chunk);
+      ColumnIndex columnIndex = reader.readColumnIndex(chunk);
+      OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
+      writer.appendColumnChunk(descriptor, reader.getStream(), chunk, bloomFilter, columnIndex, offsetIndex);
+    }
+  }
+
+  private void encryptPages(TransParquetFileReader reader, ColumnChunkMetaData chunk,
+                             ParquetFileWriter writer, String createdBy, int blockId, int columnId) throws IOException {
+    short pageOrdinal = 0;
+    EncryptorRunTime encryptorRunTime = new EncryptorRunTime(writer.getEncryptor(), chunk, blockId, columnId);
+    DictionaryPage dictionaryPage = null;
+    long readValues = 0;
+    ParquetMetadataConverter converter = new ParquetMetadataConverter();
+    long totalChunkValues = chunk.getValueCount();
+    while (readValues < totalChunkValues) {
+      if (Short.MAX_VALUE == pageOrdinal) {
+        throw new RuntimeException("Number of pages exceeds maximum: " + Short.MAX_VALUE);
+      }
+      PageHeader pageHeader = reader.readPageHeader();
+      byte[] pageLoad;
+      switch (pageHeader.type) {
+        case DICTIONARY_PAGE:
+          if (dictionaryPage != null) {
+            throw new IOException("has more than one dictionary page in column chunk");
+          }
+          //No quickUpdatePageAAD needed for dictionary page
+          DictionaryPageHeader dictPageHeader = pageHeader.dictionary_page_header;
+          pageLoad = encryptPageLoad(reader, pageHeader.getCompressed_page_size(), encryptorRunTime.getDataEncryptor(), encryptorRunTime.getDictPageAAD());
+          writer.writeDictionaryPage(new DictionaryPage(BytesInput.from(pageLoad),
+                                        pageHeader.getUncompressed_page_size(),
+                                        dictPageHeader.getNum_values(),
+                                        converter.getEncoding(dictPageHeader.getEncoding())),
+            encryptorRunTime.getMetaDataEncryptor(), encryptorRunTime.getDictPageHeaderAAD());
+          break;
+        case DATA_PAGE:
+          AesCipher.quickUpdatePageAAD(encryptorRunTime.getDataPageHeaderAAD(), pageOrdinal);
+          AesCipher.quickUpdatePageAAD(encryptorRunTime.getDataPageAAD(), pageOrdinal);
+          DataPageHeader headerV1 = pageHeader.data_page_header;
+          pageLoad = encryptPageLoad(reader, pageHeader.getCompressed_page_size(), encryptorRunTime.getDataEncryptor(), encryptorRunTime.getDataPageAAD());
+          readValues += headerV1.getNum_values();
+          writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()),
+              pageHeader.getUncompressed_page_size(),
+              BytesInput.from(pageLoad),
+              converter.fromParquetStatistics(createdBy, headerV1.getStatistics(), chunk.getPrimitiveType()),
+              converter.getEncoding(headerV1.getRepetition_level_encoding()),
+              converter.getEncoding(headerV1.getDefinition_level_encoding()),
+              converter.getEncoding(headerV1.getEncoding()),
+            encryptorRunTime.getMetaDataEncryptor(),
+            encryptorRunTime.getDataPageHeaderAAD());
+          pageOrdinal++;
+          break;
+        case DATA_PAGE_V2:
+          AesCipher.quickUpdatePageAAD(encryptorRunTime.getDataPageHeaderAAD(), pageOrdinal);
+          AesCipher.quickUpdatePageAAD(encryptorRunTime.getDataPageAAD(), pageOrdinal);
+          DataPageHeaderV2 headerV2 = pageHeader.data_page_header_v2;
+          int rlLength = headerV2.getRepetition_levels_byte_length();
+          BytesInput rlLevels = readBlockAllocate(rlLength, reader);
+          int dlLength = headerV2.getDefinition_levels_byte_length();
+          BytesInput dlLevels = readBlockAllocate(dlLength, reader);
+          int payLoadLength = pageHeader.getCompressed_page_size() - rlLength - dlLength;
+          int rawDataLength = pageHeader.getUncompressed_page_size() - rlLength - dlLength;
+          pageLoad = encryptPageLoad(reader, payLoadLength, encryptorRunTime.getDataEncryptor(), encryptorRunTime.getDataPageAAD());
+          readValues += headerV2.getNum_values();
+          writer.writeDataPageV2(headerV2.getNum_rows(),
+            headerV2.getNum_nulls(),
+            headerV2.getNum_values(),
+            rlLevels,
+            dlLevels,
+            converter.getEncoding(headerV2.getEncoding()),
+            BytesInput.from(pageLoad),
+            rawDataLength,
+            converter.fromParquetStatistics(createdBy, headerV2.getStatistics(), chunk.getPrimitiveType()));
+          pageOrdinal++;
+          break;
+        default:
+        break;
+      }
+    }
+  }
+
+  private byte[] encryptPageLoad(TransParquetFileReader reader, int payloadLength, BlockCipher.Encryptor dataEncryptor, byte[] AAD) throws IOException {
+    byte[] data = readBlock(payloadLength, reader);
+    return dataEncryptor.encrypt(data, AAD);
+  }
+
+  public byte[] readBlock(int length, TransParquetFileReader reader) throws IOException {
+    byte[] data;
+    if (length > PAGE_BUFFER_SIZE) {
+      data = new byte[length];
+    } else {
+      data = pageBuffer;
+    }
+    reader.blockRead(data, 0, length);
+    return data;
+  }
+
+  public BytesInput readBlockAllocate(int length, TransParquetFileReader reader) throws IOException {
+    byte[] data = new byte[length];
+    reader.blockRead(data, 0, length);
+    return BytesInput.from(data, 0, length);
+  }
+
+  private int toIntWithCheck(long size) {
+    if ((int)size != size) {
+      throw new ParquetEncodingException("size is bigger than " + Integer.MAX_VALUE + " bytes: " + size);
+    }
+    return (int)size;
+  }
+
+  private static short toShortWithCheck(int size) {

Review comment:
       needed?

##########
File path: parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnEncryptor.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.InternalColumnEncryptionSetup;
+import org.apache.parquet.crypto.InternalFileEncryptor;
+import org.apache.parquet.format.BlockCipher;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH;
+import static org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
+import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;
+
+/**
+ * This class is for fast rewriting existing file with column encryption
+ *
+ * For columns to be encrypted, all the pages of those columns are read, but decompression/decoding,
+ * it is encrypted immediately and write back.
+ *
+ * For columns not to be encrypted, the whole column chunk will be appended directly to writer.
+ */
+public class ColumnEncryptor {
+  private static final Logger LOG = LoggerFactory.getLogger(ColumnEncryptor.class);
+
+  private static class EncryptorRunTime {
+    private final InternalColumnEncryptionSetup colEncrSetup;
+    private final BlockCipher.Encryptor dataEncryptor;
+    private final BlockCipher.Encryptor metaDataEncryptor;
+    private final byte[] fileAAD ;
+
+    private byte[] dataPageHeaderAAD;
+    private byte[] dataPageAAD;
+    private byte[] dictPageHeaderAAD;
+    private byte[] dictPageAAD;
+
+    public EncryptorRunTime(InternalFileEncryptor fileEncryptor, ColumnChunkMetaData chunk,
+                            int blockId, int columnId) throws IOException  {
+      this.colEncrSetup = fileEncryptor.getColumnSetup(chunk.getPath(), true, toShortWithCheck(columnId));
+      this.dataEncryptor = colEncrSetup.getDataEncryptor();
+      this.metaDataEncryptor = colEncrSetup.getMetaDataEncryptor();
+
+      this.fileAAD = fileEncryptor.getFileAAD();
+      this.dataPageHeaderAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DataPageHeader,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+      this.dataPageAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DataPage,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+      this.dictPageHeaderAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DictionaryPageHeader,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+      this.dictPageAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DictionaryPage,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+    }
+
+    public InternalColumnEncryptionSetup getColEncrSetup() {
+      return this.colEncrSetup;
+    }
+
+    public BlockCipher.Encryptor getDataEncryptor() {
+      return this.dataEncryptor;
+    }
+
+    public BlockCipher.Encryptor getMetaDataEncryptor() {
+      return this.metaDataEncryptor;
+    }
+
+    public byte[] getFileAAD() {
+      return this.fileAAD;
+    }
+
+    public byte[] getDataPageHeaderAAD() {
+      return this.dataPageHeaderAAD;
+    }
+
+    public byte[] getDataPageAAD() {
+      return this.dataPageAAD;
+    }
+
+    public byte[] getDictPageHeaderAAD() {
+      return this.dictPageHeaderAAD;
+    }
+
+    public byte[] getDictPageAAD() {
+      return this.dictPageAAD;
+    }
+  }
+
+  private final int PAGE_BUFFER_SIZE = ParquetProperties.DEFAULT_PAGE_SIZE * 2;
+  private byte[] pageBuffer;
+  private Configuration conf;
+
+  public ColumnEncryptor(Configuration conf) {
+    this.pageBuffer = new byte[PAGE_BUFFER_SIZE];
+    this.conf = conf;
+  }
+
+  /**
+   * Given the input file, encrypt the columns specified by paths, and output the file.
+   * The encryption settings can be specified in the parameter of fileEncryptionProperties
+   * @param inputFile Input file
+   * @param outputFile Output file
+   * @param paths columns to be encrypted
+   * @param fileEncryptionProperties FileEncryptionProperties of the file
+   * @throws IOException
+   */
+  public void encryptColumns(String inputFile, String outputFile, List<String> paths, FileEncryptionProperties fileEncryptionProperties) throws IOException {
+    Path inPath = new Path(inputFile);
+    Path outPath = new Path(outputFile);
+
+    ParquetMetadata metaData = ParquetFileReader.readFooter(conf, inPath, NO_FILTER);
+    MessageType schema = metaData.getFileMetaData().getSchema();
+
+    ParquetFileWriter writer = new ParquetFileWriter(HadoopOutputFile.fromPath(outPath, conf), schema, ParquetFileWriter.Mode.OVERWRITE,
+      DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT, DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, DEFAULT_STATISTICS_TRUNCATE_LENGTH,
+      ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED, fileEncryptionProperties);
+    writer.start();
+
+    try (TransParquetFileReader reader = new TransParquetFileReader(HadoopInputFile.fromPath(inPath, conf), HadoopReadOptions.builder(conf).build())) {
+      processBlocks(reader, writer, metaData, schema, paths);
+    } catch (Exception e) {
+      LOG.error("Exception happened while processing blocks: ", e);
+    } finally {
+      try {
+        writer.end(metaData.getFileMetaData().getKeyValueMetaData());
+      } catch (Exception e) {
+        LOG.error("Exception happened when ending the writer: ", e);
+      }
+    }
+  }
+
+  private void processBlocks(TransParquetFileReader reader, ParquetFileWriter writer, ParquetMetadata meta,
+                            MessageType schema, List<String> paths) throws IOException {
+    Set<ColumnPath> nullifyColumns = convertToColumnPaths(paths);
+    int blockId = 0;
+    PageReadStore store = reader.readNextRowGroup();
+
+    while (store != null) {
+      writer.startBlock(store.getRowCount());
+
+      List<ColumnChunkMetaData> columnsInOrder = meta.getBlocks().get(blockId).getColumns();
+      Map<ColumnPath, ColumnDescriptor> descriptorsMap = schema.getColumns().stream().collect(
+        Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
+
+      for (int i = 0; i < columnsInOrder.size(); i += 1) {
+        ColumnChunkMetaData chunk = columnsInOrder.get(i);
+        ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
+        processChunk(descriptor, chunk, reader, writer, nullifyColumns, blockId, i, meta.getFileMetaData().getCreatedBy());
+      }
+
+      writer.endBlock();
+      store = reader.readNextRowGroup();
+      blockId++;
+    }
+  }
+
+  private void processChunk(ColumnDescriptor descriptor, ColumnChunkMetaData chunk, TransParquetFileReader reader, ParquetFileWriter writer,
+                            Set<ColumnPath> paths, int blockId, int columnId, String createdBy) throws IOException {
+    reader.setStreamPosition(chunk.getStartingPos());
+    if (paths.contains(chunk.getPath())) {
+      writer.startColumn(descriptor, chunk.getValueCount(), chunk.getCodec());
+      encryptPages(reader, chunk, writer, createdBy, blockId, columnId);
+      writer.endColumn();
+    } else {
+      BloomFilter bloomFilter = reader.readBloomFilter(chunk);
+      ColumnIndex columnIndex = reader.readColumnIndex(chunk);
+      OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
+      writer.appendColumnChunk(descriptor, reader.getStream(), chunk, bloomFilter, columnIndex, offsetIndex);
+    }
+  }
+
+  private void encryptPages(TransParquetFileReader reader, ColumnChunkMetaData chunk,
+                             ParquetFileWriter writer, String createdBy, int blockId, int columnId) throws IOException {
+    short pageOrdinal = 0;
+    EncryptorRunTime encryptorRunTime = new EncryptorRunTime(writer.getEncryptor(), chunk, blockId, columnId);
+    DictionaryPage dictionaryPage = null;
+    long readValues = 0;
+    ParquetMetadataConverter converter = new ParquetMetadataConverter();
+    long totalChunkValues = chunk.getValueCount();
+    while (readValues < totalChunkValues) {
+      if (Short.MAX_VALUE == pageOrdinal) {
+        throw new RuntimeException("Number of pages exceeds maximum: " + Short.MAX_VALUE);
+      }
+      PageHeader pageHeader = reader.readPageHeader();
+      byte[] pageLoad;
+      switch (pageHeader.type) {
+        case DICTIONARY_PAGE:
+          if (dictionaryPage != null) {
+            throw new IOException("has more than one dictionary page in column chunk");
+          }
+          //No quickUpdatePageAAD needed for dictionary page
+          DictionaryPageHeader dictPageHeader = pageHeader.dictionary_page_header;
+          pageLoad = encryptPageLoad(reader, pageHeader.getCompressed_page_size(), encryptorRunTime.getDataEncryptor(), encryptorRunTime.getDictPageAAD());
+          writer.writeDictionaryPage(new DictionaryPage(BytesInput.from(pageLoad),
+                                        pageHeader.getUncompressed_page_size(),
+                                        dictPageHeader.getNum_values(),
+                                        converter.getEncoding(dictPageHeader.getEncoding())),
+            encryptorRunTime.getMetaDataEncryptor(), encryptorRunTime.getDictPageHeaderAAD());
+          break;
+        case DATA_PAGE:
+          AesCipher.quickUpdatePageAAD(encryptorRunTime.getDataPageHeaderAAD(), pageOrdinal);
+          AesCipher.quickUpdatePageAAD(encryptorRunTime.getDataPageAAD(), pageOrdinal);

Review comment:
       this function expects int ordinal.

##########
File path: parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/EncDecProperties.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util;
+
+import org.apache.parquet.crypto.ColumnEncryptionProperties;
+import org.apache.parquet.crypto.DecryptionKeyRetriever;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.ParquetCipher;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class EncDecProperties {
+
+  public static class DecryptionKeyRetrieverMock implements DecryptionKeyRetriever {
+    private final Map<String, byte[]> keyMap = new HashMap<>();
+
+    public DecryptionKeyRetrieverMock putKey(String keyId, byte[] keyBytes) {
+      keyMap.put(keyId, keyBytes);
+      return this;
+    }
+
+    @Override
+    public byte[] getKey(byte[] keyMetaData) {
+      String keyId = new String(keyMetaData, StandardCharsets.UTF_8);
+      return keyMap.get(keyId);
+    }
+  }
+
+  private static final byte[] FOOTER_KEY = {0x01, 0x02, 0x03, 0x4, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a,
+    0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x10};
+  private static final byte[] FOOTER_KEY_METADATA = "footkey".getBytes(StandardCharsets.UTF_8);
+  private static final byte[] COL_KEY = {0x02, 0x03, 0x4, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b,
+    0x0c, 0x0d, 0x0e, 0x0f, 0x10, 0x11};
+  private static final byte[] COL_KEY_METADATA = "col".getBytes(StandardCharsets.UTF_8);
+
+  public static FileDecryptionProperties getFileDecryptionProperties() throws IOException {
+    DecryptionKeyRetrieverMock keyRetriever = new DecryptionKeyRetrieverMock();
+    keyRetriever.putKey("footkey", FOOTER_KEY);
+    keyRetriever.putKey("col", COL_KEY);
+    return FileDecryptionProperties.builder().withPlaintextFilesAllowed().withoutFooterSignatureVerification().withKeyRetriever(keyRetriever).build();
+  }
+
+  public static FileDecryptionProperties getFileDecryptionPropertiesNonExistKey() throws IOException {
+    DecryptionKeyRetrieverMock keyRetriever = new DecryptionKeyRetrieverMock();
+    keyRetriever.putKey("footkey", FOOTER_KEY);
+    keyRetriever.putKey("col_non_exist", COL_KEY);
+    return FileDecryptionProperties.builder().withPlaintextFilesAllowed().withoutFooterSignatureVerification().withKeyRetriever(keyRetriever).build();
+  }
+
+  public static FileEncryptionProperties getFileEncryptionProperties(String[] encrCols, ParquetCipher cipher) {
+    if (encrCols.length == 0) {
+      return null;
+    }
+
+    Map<ColumnPath, ColumnEncryptionProperties> columnPropertyMap = new HashMap<>();
+    for (String encrCol : encrCols) {
+      ColumnPath columnPath = ColumnPath.fromDotString(encrCol);
+      ColumnEncryptionProperties colEncProp = ColumnEncryptionProperties.builder(columnPath)
+        .withKey(COL_KEY)
+        .withKeyMetaData(COL_KEY_METADATA)
+        .build();
+      columnPropertyMap.put(columnPath, colEncProp);
+    }
+
+    FileEncryptionProperties.Builder encryptionPropertiesBuilder =
+      FileEncryptionProperties.builder(FOOTER_KEY)
+        .withFooterKeyMetadata(FOOTER_KEY_METADATA)
+        .withAlgorithm(cipher)
+        .withEncryptedColumns(columnPropertyMap)
+        .withPlaintextFooter();

Review comment:
       Can you also add tests with encrypted footers? (not calling `.withPlaintextFooter()` here)

##########
File path: parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnEncryptor.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.InternalColumnEncryptionSetup;
+import org.apache.parquet.crypto.InternalFileEncryptor;
+import org.apache.parquet.format.BlockCipher;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH;
+import static org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
+import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;
+
+/**
+ * This class is for fast rewriting existing file with column encryption
+ *
+ * For columns to be encrypted, all the pages of those columns are read, but decompression/decoding,
+ * it is encrypted immediately and write back.
+ *
+ * For columns not to be encrypted, the whole column chunk will be appended directly to writer.
+ */
+public class ColumnEncryptor {
+  private static final Logger LOG = LoggerFactory.getLogger(ColumnEncryptor.class);
+
+  private static class EncryptorRunTime {
+    private final InternalColumnEncryptionSetup colEncrSetup;
+    private final BlockCipher.Encryptor dataEncryptor;
+    private final BlockCipher.Encryptor metaDataEncryptor;
+    private final byte[] fileAAD ;
+
+    private byte[] dataPageHeaderAAD;
+    private byte[] dataPageAAD;
+    private byte[] dictPageHeaderAAD;
+    private byte[] dictPageAAD;
+
+    public EncryptorRunTime(InternalFileEncryptor fileEncryptor, ColumnChunkMetaData chunk,
+                            int blockId, int columnId) throws IOException  {
+      this.colEncrSetup = fileEncryptor.getColumnSetup(chunk.getPath(), true, toShortWithCheck(columnId));
+      this.dataEncryptor = colEncrSetup.getDataEncryptor();
+      this.metaDataEncryptor = colEncrSetup.getMetaDataEncryptor();
+
+      this.fileAAD = fileEncryptor.getFileAAD();
+      this.dataPageHeaderAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DataPageHeader,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+      this.dataPageAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DataPage,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+      this.dictPageHeaderAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DictionaryPageHeader,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+      this.dictPageAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DictionaryPage,
+        toShortWithCheck(blockId), toShortWithCheck(columnId), (short) 0);
+    }
+
+    public InternalColumnEncryptionSetup getColEncrSetup() {
+      return this.colEncrSetup;
+    }
+
+    public BlockCipher.Encryptor getDataEncryptor() {
+      return this.dataEncryptor;
+    }
+
+    public BlockCipher.Encryptor getMetaDataEncryptor() {
+      return this.metaDataEncryptor;
+    }
+
+    public byte[] getFileAAD() {
+      return this.fileAAD;
+    }
+
+    public byte[] getDataPageHeaderAAD() {
+      return this.dataPageHeaderAAD;
+    }
+
+    public byte[] getDataPageAAD() {
+      return this.dataPageAAD;
+    }
+
+    public byte[] getDictPageHeaderAAD() {
+      return this.dictPageHeaderAAD;
+    }
+
+    public byte[] getDictPageAAD() {
+      return this.dictPageAAD;
+    }
+  }
+
+  private final int PAGE_BUFFER_SIZE = ParquetProperties.DEFAULT_PAGE_SIZE * 2;
+  private byte[] pageBuffer;
+  private Configuration conf;
+
+  public ColumnEncryptor(Configuration conf) {
+    this.pageBuffer = new byte[PAGE_BUFFER_SIZE];
+    this.conf = conf;
+  }
+
+  /**
+   * Given the input file, encrypt the columns specified by paths, and output the file.
+   * The encryption settings can be specified in the parameter of fileEncryptionProperties
+   * @param inputFile Input file
+   * @param outputFile Output file
+   * @param paths columns to be encrypted
+   * @param fileEncryptionProperties FileEncryptionProperties of the file
+   * @throws IOException
+   */
+  public void encryptColumns(String inputFile, String outputFile, List<String> paths, FileEncryptionProperties fileEncryptionProperties) throws IOException {
+    Path inPath = new Path(inputFile);
+    Path outPath = new Path(outputFile);
+
+    ParquetMetadata metaData = ParquetFileReader.readFooter(conf, inPath, NO_FILTER);
+    MessageType schema = metaData.getFileMetaData().getSchema();
+
+    ParquetFileWriter writer = new ParquetFileWriter(HadoopOutputFile.fromPath(outPath, conf), schema, ParquetFileWriter.Mode.OVERWRITE,
+      DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT, DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, DEFAULT_STATISTICS_TRUNCATE_LENGTH,
+      ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED, fileEncryptionProperties);
+    writer.start();
+
+    try (TransParquetFileReader reader = new TransParquetFileReader(HadoopInputFile.fromPath(inPath, conf), HadoopReadOptions.builder(conf).build())) {
+      processBlocks(reader, writer, metaData, schema, paths);
+    } catch (Exception e) {
+      LOG.error("Exception happened while processing blocks: ", e);
+    } finally {
+      try {
+        writer.end(metaData.getFileMetaData().getKeyValueMetaData());
+      } catch (Exception e) {
+        LOG.error("Exception happened when ending the writer: ", e);
+      }
+    }
+  }
+
+  private void processBlocks(TransParquetFileReader reader, ParquetFileWriter writer, ParquetMetadata meta,
+                            MessageType schema, List<String> paths) throws IOException {
+    Set<ColumnPath> nullifyColumns = convertToColumnPaths(paths);

Review comment:
       also, `paths` seems too general. `encryptedColumnPaths`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@parquet.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org