You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2023/01/05 07:46:00 UTC

[jira] [Commented] (PARQUET-2075) Unified Rewriter Tool

    [ https://issues.apache.org/jira/browse/PARQUET-2075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17654840#comment-17654840 ] 

ASF GitHub Bot commented on PARQUET-2075:
-----------------------------------------

wgtmac commented on code in PR #1014:
URL: https://github.com/apache/parquet-mr/pull/1014#discussion_r1062185479


##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.rewrite;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+import java.util.List;
+import java.util.Map;
+
+public class RewriteOptions {
+
+  final Configuration conf;
+  final Path inputFile;
+  final Path outputFile;
+  final List<String> pruneColumns;
+  final CompressionCodecName codecName;
+  final Map<String, MaskMode> maskColumns;
+  final List<String> encryptColumns;
+  final FileEncryptionProperties fileEncryptionProperties;
+
+  private RewriteOptions(Configuration conf,
+                         Path inputFile,
+                         Path outputFile,
+                         List<String> pruneColumns,
+                         CompressionCodecName codecName,
+                         Map<String, MaskMode> maskColumns,
+                         List<String> encryptColumns,
+                         FileEncryptionProperties fileEncryptionProperties) {
+    this.conf = conf;
+    this.inputFile = inputFile;
+    this.outputFile = outputFile;
+    this.pruneColumns = pruneColumns;
+    this.codecName = codecName;
+    this.maskColumns = maskColumns;
+    this.encryptColumns = encryptColumns;
+    this.fileEncryptionProperties = fileEncryptionProperties;
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public Path getInputFile() {
+    return inputFile;
+  }
+
+  public Path getOutputFile() {
+    return outputFile;
+  }
+
+  public List<String> getPruneColumns() {
+    return pruneColumns;
+  }
+
+  public CompressionCodecName getCodecName() {
+    return codecName;
+  }
+
+  public Map<String, MaskMode> getMaskColumns() {
+    return maskColumns;
+  }
+
+  public List<String> getEncryptColumns() {
+    return encryptColumns;
+  }
+
+  public FileEncryptionProperties getFileEncryptionProperties() {
+    return fileEncryptionProperties;
+  }
+
+  public static class Builder {
+    private Configuration conf;
+    private Path inputFile;
+    private Path outputFile;
+    private List<String> pruneColumns;
+    private CompressionCodecName codecName;
+    private Map<String, MaskMode> maskColumns;
+    private List<String> encryptColumns;
+    private FileEncryptionProperties fileEncryptionProperties;
+
+    public Builder(Configuration conf, Path inputFile, Path outputFile) {
+      this.conf = conf;
+      this.inputFile = inputFile;
+      this.outputFile = outputFile;
+    }
+
+    public Builder prune(List<String> columns) {
+      this.pruneColumns = columns;
+      return this;
+    }
+
+    public Builder transform(CompressionCodecName codecName) {
+      this.codecName = codecName;
+      return this;
+    }
+
+    public Builder mask(Map<String, MaskMode> maskColumns) {
+      this.maskColumns = maskColumns;
+      return this;
+    }
+
+    public Builder encrypt(List<String> encryptColumns) {
+      this.encryptColumns = encryptColumns;
+      return this;
+    }
+
+    public Builder encryptionProperties(FileEncryptionProperties fileEncryptionProperties) {
+      this.fileEncryptionProperties = fileEncryptionProperties;
+      return this;
+    }
+
+    public RewriteOptions build() {
+      // TODO: validate any conflict setting

Review Comment:
   Fixed and removed the TODO



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.rewrite;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+import java.util.List;
+import java.util.Map;
+
+public class RewriteOptions {
+
+  final Configuration conf;
+  final Path inputFile;
+  final Path outputFile;
+  final List<String> pruneColumns;
+  final CompressionCodecName codecName;
+  final Map<String, MaskMode> maskColumns;
+  final List<String> encryptColumns;
+  final FileEncryptionProperties fileEncryptionProperties;
+
+  private RewriteOptions(Configuration conf,
+                         Path inputFile,
+                         Path outputFile,
+                         List<String> pruneColumns,
+                         CompressionCodecName codecName,

Review Comment:
   Fixed



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -0,0 +1,733 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.rewrite;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ColumnReader;
+import org.apache.parquet.column.ColumnWriteStore;
+import org.apache.parquet.column.ColumnWriter;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.impl.ColumnReadStoreImpl;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.InternalColumnEncryptionSetup;
+import org.apache.parquet.crypto.InternalFileEncryptor;
+import org.apache.parquet.format.BlockCipher;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.ColumnChunkPageWriteStore;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopCodecs;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH;
+import static org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
+import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;
+
+public class ParquetRewriter implements Closeable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ParquetRewriter.class);
+  private final int pageBufferSize = ParquetProperties.DEFAULT_PAGE_SIZE * 2;
+  private final byte[] pageBuffer = new byte[pageBufferSize];
+  private TransParquetFileReader reader;
+  private ParquetFileWriter writer;
+  private ParquetMetadata meta;
+  private MessageType schema;
+  private String createdBy;
+  private CompressionCodecName codecName = null;
+  private List<String> pruneColumns = null;
+  private Map<ColumnPath, MaskMode> maskColumns = null;
+  private Set<ColumnPath> encryptColumns = null;
+  private boolean encryptMode = false;
+
+  public ParquetRewriter(RewriteOptions options) throws IOException {
+    Path inPath = options.getInputFile();
+    Path outPath = options.getOutputFile();
+    Configuration conf = options.getConf();
+
+    // TODO: set more member variables
+    codecName = options.getCodecName();
+    pruneColumns = options.getPruneColumns();
+
+    // Get file metadata and full schema from the input file
+    meta = ParquetFileReader.readFooter(conf, inPath, NO_FILTER);
+    schema = meta.getFileMetaData().getSchema();
+    createdBy = meta.getFileMetaData().getCreatedBy();
+
+    // Prune columns if specified
+    if (pruneColumns != null && !pruneColumns.isEmpty()) {
+      List<String> paths = new ArrayList<>();
+      getPaths(schema, paths, null);
+      for (String col : pruneColumns) {
+        if (!paths.contains(col)) {
+          LOG.warn("Input column name {} doesn't show up in the schema of file {}", col, inPath.getName());
+        }
+      }
+
+      Set<ColumnPath> prunePaths = convertToColumnPaths(pruneColumns);
+      schema = pruneColumnsInSchema(schema, prunePaths);
+    }
+
+    if (options.getMaskColumns() != null) {
+      this.maskColumns = new HashMap<>();
+      for (Map.Entry<String, MaskMode> col : options.getMaskColumns().entrySet()) {
+        maskColumns.put(ColumnPath.fromDotString(col.getKey()), col.getValue());
+      }
+    }
+
+    if (options.getEncryptColumns() != null && options.getFileEncryptionProperties() != null) {
+      this.encryptColumns = convertToColumnPaths(options.getEncryptColumns());
+      this.encryptMode = true;
+      // TODO: make sure options.getFileEncryptionProperties() is set

Review Comment:
   I forgot to remove this TODO.



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -0,0 +1,733 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.rewrite;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ColumnReader;
+import org.apache.parquet.column.ColumnWriteStore;
+import org.apache.parquet.column.ColumnWriter;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.impl.ColumnReadStoreImpl;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.InternalColumnEncryptionSetup;
+import org.apache.parquet.crypto.InternalFileEncryptor;
+import org.apache.parquet.format.BlockCipher;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.ColumnChunkPageWriteStore;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopCodecs;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH;
+import static org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
+import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;
+
+public class ParquetRewriter implements Closeable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ParquetRewriter.class);
+  private final int pageBufferSize = ParquetProperties.DEFAULT_PAGE_SIZE * 2;
+  private final byte[] pageBuffer = new byte[pageBufferSize];
+  private TransParquetFileReader reader;
+  private ParquetFileWriter writer;
+  private ParquetMetadata meta;
+  private MessageType schema;
+  private String createdBy;
+  private CompressionCodecName codecName = null;
+  private List<String> pruneColumns = null;
+  private Map<ColumnPath, MaskMode> maskColumns = null;
+  private Set<ColumnPath> encryptColumns = null;
+  private boolean encryptMode = false;
+
+  public ParquetRewriter(RewriteOptions options) throws IOException {
+    Path inPath = options.getInputFile();
+    Path outPath = options.getOutputFile();
+    Configuration conf = options.getConf();
+
+    // TODO: set more member variables
+    codecName = options.getCodecName();
+    pruneColumns = options.getPruneColumns();
+
+    // Get file metadata and full schema from the input file
+    meta = ParquetFileReader.readFooter(conf, inPath, NO_FILTER);
+    schema = meta.getFileMetaData().getSchema();
+    createdBy = meta.getFileMetaData().getCreatedBy();
+
+    // Prune columns if specified
+    if (pruneColumns != null && !pruneColumns.isEmpty()) {
+      List<String> paths = new ArrayList<>();
+      getPaths(schema, paths, null);
+      for (String col : pruneColumns) {
+        if (!paths.contains(col)) {
+          LOG.warn("Input column name {} doesn't show up in the schema of file {}", col, inPath.getName());
+        }
+      }
+
+      Set<ColumnPath> prunePaths = convertToColumnPaths(pruneColumns);
+      schema = pruneColumnsInSchema(schema, prunePaths);
+    }
+
+    if (options.getMaskColumns() != null) {
+      this.maskColumns = new HashMap<>();
+      for (Map.Entry<String, MaskMode> col : options.getMaskColumns().entrySet()) {
+        maskColumns.put(ColumnPath.fromDotString(col.getKey()), col.getValue());
+      }
+    }
+
+    if (options.getEncryptColumns() != null && options.getFileEncryptionProperties() != null) {
+      this.encryptColumns = convertToColumnPaths(options.getEncryptColumns());
+      this.encryptMode = true;
+      // TODO: make sure options.getFileEncryptionProperties() is set
+    }
+
+    reader = new TransParquetFileReader(
+            HadoopInputFile.fromPath(inPath, conf), HadoopReadOptions.builder(conf).build());
+
+    ParquetFileWriter.Mode writerMode = ParquetFileWriter.Mode.CREATE;
+    writer = new ParquetFileWriter(HadoopOutputFile.fromPath(outPath, conf), schema, writerMode,
+            DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT, DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH,
+            DEFAULT_STATISTICS_TRUNCATE_LENGTH, ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED,
+            options.getFileEncryptionProperties());
+    writer.start();
+  }
+
+  // Ctor for legacy CompressionConverter and ColumnMasker
+  public ParquetRewriter(TransParquetFileReader reader,
+                         ParquetFileWriter writer,
+                         ParquetMetadata meta,
+                         MessageType schema,
+                         String createdBy,
+                         CompressionCodecName codecName,
+                         List<String> maskColumns,
+                         MaskMode maskMode) {
+    this.reader = reader;
+    this.writer = writer;
+    this.meta = meta;
+    this.schema = schema;
+    this.createdBy = createdBy == null ? meta.getFileMetaData().getCreatedBy() : createdBy;
+    this.codecName = codecName;
+    if (maskColumns != null && maskMode != null) {
+      this.maskColumns = new HashMap<>();
+      for (String col : maskColumns) {
+        this.maskColumns.put(ColumnPath.fromDotString(col), maskMode);
+      }
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    writer.end(meta.getFileMetaData().getKeyValueMetaData());
+  }
+
+  public void processBlocks() throws IOException {
+    PageReadStore store = reader.readNextRowGroup();
+    ColumnReadStoreImpl crStore = new ColumnReadStoreImpl(store, new DummyGroupConverter(), schema, createdBy);
+    Map<ColumnPath, ColumnDescriptor> descriptorsMap = schema.getColumns().stream().collect(
+            Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
+
+    int blockId = 0;
+    while (store != null) {
+      writer.startBlock(store.getRowCount());
+
+      BlockMetaData blockMetaData = meta.getBlocks().get(blockId);
+      List<ColumnChunkMetaData> columnsInOrder = blockMetaData.getColumns();
+
+      for (int i = 0, columnId = 0; i < columnsInOrder.size(); i += 1) {
+        ColumnChunkMetaData chunk = columnsInOrder.get(i);
+        ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
+
+        // This column has been pruned.
+        if (descriptor == null) {
+          continue;
+        }
+
+        // If a column is encrypted, we simply throw exception.
+        // Later we can add a feature to trans-encrypt it with different keys
+        if (chunk.isEncrypted()) {
+          throw new IOException("Column " + chunk.getPath().toDotString() + " is already encrypted");
+        }
+
+        reader.setStreamPosition(chunk.getStartingPos());
+        CompressionCodecName newCodecName = codecName == null ? chunk.getCodec() : codecName;
+        EncryptorRunTime encryptorRunTime = null;
+        if (this.encryptMode) {
+          encryptorRunTime = new EncryptorRunTime(writer.getEncryptor(), chunk, blockId, columnId);
+        }
+        boolean encryptColumn = encryptColumns != null && encryptColumns.contains(chunk.getPath());
+
+        if (maskColumns != null && maskColumns.containsKey(chunk.getPath())) {
+          // Mask column and compress it again.
+          MaskMode maskMode = maskColumns.get(chunk.getPath());
+          if (maskMode.equals(MaskMode.NULLIFY)) {
+            Type.Repetition repetition = descriptor.getPrimitiveType().getRepetition();
+            if (repetition.equals(Type.Repetition.REQUIRED)) {
+              throw new IOException(
+                      "Required column [" + descriptor.getPrimitiveType().getName() + "] cannot be nullified");
+            }
+            nullifyColumn(descriptor, chunk, crStore, writer, schema, newCodecName, encryptorRunTime, encryptColumn);
+          } else {
+            throw new UnsupportedOperationException("Only nullify is supported for now");
+          }
+        } else if (encryptMode || codecName != null) {
+          // Translate compression and/or encryption
+          writer.startColumn(descriptor, crStore.getColumnReader(descriptor).getTotalValueCount(), newCodecName);
+          processChunk(chunk, newCodecName, encryptorRunTime, encryptColumn);
+          writer.endColumn();
+        } else {
+          // Nothing changed, simply copy the binary data.
+          BloomFilter bloomFilter = reader.readBloomFilter(chunk);
+          ColumnIndex columnIndex = reader.readColumnIndex(chunk);
+          OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
+          writer.appendColumnChunk(descriptor, reader.getStream(), chunk, bloomFilter, columnIndex, offsetIndex);
+        }
+
+        columnId++;
+      }
+
+      writer.endBlock();
+      store = reader.readNextRowGroup();
+      blockId++;
+    }
+  }
+
+  private void processChunk(ColumnChunkMetaData chunk,
+                            CompressionCodecName newCodecName,
+                            EncryptorRunTime encryptorRunTime,
+                            boolean encryptColumn) throws IOException {
+    CompressionCodecFactory codecFactory = HadoopCodecs.newFactory(0);
+    CompressionCodecFactory.BytesInputDecompressor decompressor = null;
+    CompressionCodecFactory.BytesInputCompressor compressor = null;
+    if (!newCodecName.equals(chunk.getCodec())) {
+      // Re-compress only if a different codec has been specified
+      decompressor = codecFactory.getDecompressor(chunk.getCodec());
+      compressor = codecFactory.getCompressor(newCodecName);
+    }
+
+    // EncryptorRunTime is only provided when encryption is required
+    BlockCipher.Encryptor metaEncryptor = null;
+    BlockCipher.Encryptor dataEncryptor = null;
+    byte[] dictPageAAD = null;
+    byte[] dataPageAAD = null;
+    byte[] dictPageHeaderAAD = null;
+    byte[] dataPageHeaderAAD = null;
+    if (encryptorRunTime != null) {
+      metaEncryptor = encryptorRunTime.getMetaDataEncryptor();
+      dataEncryptor = encryptorRunTime.getDataEncryptor();
+      dictPageAAD = encryptorRunTime.getDictPageAAD();
+      dataPageAAD = encryptorRunTime.getDataPageAAD();
+      dictPageHeaderAAD = encryptorRunTime.getDictPageHeaderAAD();
+      dataPageHeaderAAD = encryptorRunTime.getDataPageHeaderAAD();
+    }
+
+    ColumnIndex columnIndex = reader.readColumnIndex(chunk);
+    OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
+
+    reader.setStreamPosition(chunk.getStartingPos());
+    DictionaryPage dictionaryPage = null;
+    long readValues = 0;
+    Statistics statistics = null;
+    ParquetMetadataConverter converter = new ParquetMetadataConverter();
+    int pageOrdinal = 0;
+    long totalChunkValues = chunk.getValueCount();
+    while (readValues < totalChunkValues) {
+      PageHeader pageHeader = reader.readPageHeader();
+      int compressedPageSize = pageHeader.getCompressed_page_size();
+      byte[] pageLoad;
+      switch (pageHeader.type) {
+        case DICTIONARY_PAGE:
+          if (dictionaryPage != null) {
+            throw new IOException("has more than one dictionary page in column chunk");
+          }
+          //No quickUpdatePageAAD needed for dictionary page
+          DictionaryPageHeader dictPageHeader = pageHeader.dictionary_page_header;
+          pageLoad = processPageLoad(reader,
+                  true,
+                  compressor,
+                  decompressor,
+                  pageHeader.getCompressed_page_size(),
+                  pageHeader.getUncompressed_page_size(),
+                  encryptColumn,
+                  dataEncryptor,
+                  dictPageAAD);
+          writer.writeDictionaryPage(new DictionaryPage(BytesInput.from(pageLoad),
+                  pageHeader.getUncompressed_page_size(),
+                  dictPageHeader.getNum_values(),
+                  converter.getEncoding(dictPageHeader.getEncoding())),
+                  metaEncryptor,
+                  dictPageHeaderAAD);
+          break;
+        case DATA_PAGE:
+          if (encryptColumn) {
+            AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
+            AesCipher.quickUpdatePageAAD(dataPageAAD, pageOrdinal);
+          }
+          DataPageHeader headerV1 = pageHeader.data_page_header;
+          pageLoad = processPageLoad(reader,
+                  true,
+                  compressor,
+                  decompressor,
+                  pageHeader.getCompressed_page_size(),
+                  pageHeader.getUncompressed_page_size(),
+                  encryptColumn,
+                  dataEncryptor,
+                  dataPageAAD);
+          statistics = convertStatistics(
+                  createdBy, chunk.getPrimitiveType(), headerV1.getStatistics(), columnIndex, pageOrdinal, converter);
+          readValues += headerV1.getNum_values();
+          if (offsetIndex != null) {
+            long rowCount = 1 + offsetIndex.getLastRowIndex(
+                    pageOrdinal, totalChunkValues) - offsetIndex.getFirstRowIndex(pageOrdinal);
+            writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()),
+                    pageHeader.getUncompressed_page_size(),
+                    BytesInput.from(pageLoad),
+                    statistics,
+                    toIntWithCheck(rowCount),
+                    converter.getEncoding(headerV1.getRepetition_level_encoding()),
+                    converter.getEncoding(headerV1.getDefinition_level_encoding()),
+                    converter.getEncoding(headerV1.getEncoding()),
+                    metaEncryptor,
+                    dataPageHeaderAAD);
+          } else {
+            writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()),
+                    pageHeader.getUncompressed_page_size(),
+                    BytesInput.from(pageLoad),
+                    statistics,
+                    converter.getEncoding(headerV1.getRepetition_level_encoding()),
+                    converter.getEncoding(headerV1.getDefinition_level_encoding()),
+                    converter.getEncoding(headerV1.getEncoding()),
+                    metaEncryptor,
+                    dataPageHeaderAAD);
+          }
+          pageOrdinal++;
+          break;
+        case DATA_PAGE_V2:
+          if (encryptColumn) {
+            AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
+            AesCipher.quickUpdatePageAAD(dataPageAAD, pageOrdinal);
+          }
+          DataPageHeaderV2 headerV2 = pageHeader.data_page_header_v2;
+          int rlLength = headerV2.getRepetition_levels_byte_length();
+          BytesInput rlLevels = readBlockAllocate(rlLength, reader);
+          int dlLength = headerV2.getDefinition_levels_byte_length();
+          BytesInput dlLevels = readBlockAllocate(dlLength, reader);
+          int payLoadLength = pageHeader.getCompressed_page_size() - rlLength - dlLength;
+          int rawDataLength = pageHeader.getUncompressed_page_size() - rlLength - dlLength;
+          pageLoad = processPageLoad(
+                  reader,
+                  headerV2.is_compressed,
+                  compressor,
+                  decompressor,
+                  payLoadLength,
+                  rawDataLength,
+                  encryptColumn,
+                  dataEncryptor,
+                  dataPageAAD);
+          statistics = convertStatistics(
+                  createdBy, chunk.getPrimitiveType(), headerV2.getStatistics(), columnIndex, pageOrdinal, converter);
+          readValues += headerV2.getNum_values();
+          writer.writeDataPageV2(headerV2.getNum_rows(),
+                  headerV2.getNum_nulls(),
+                  headerV2.getNum_values(),
+                  rlLevels,
+                  dlLevels,
+                  converter.getEncoding(headerV2.getEncoding()),
+                  BytesInput.from(pageLoad),
+                  rawDataLength,
+                  statistics);
+          pageOrdinal++;
+          break;
+        default:
+          LOG.debug("skipping page of type {} of size {}", pageHeader.getType(), compressedPageSize);
+          break;
+      }
+    }
+  }
+
+  private Statistics convertStatistics(String createdBy,
+                                       PrimitiveType type,
+                                       org.apache.parquet.format.Statistics pageStatistics,
+                                       ColumnIndex columnIndex,
+                                       int pageIndex,
+                                       ParquetMetadataConverter converter) throws IOException {
+    if (columnIndex != null) {
+      if (columnIndex.getNullPages() == null) {
+        throw new IOException("columnIndex has null variable 'nullPages' which indicates corrupted data for type: " +
+                type.getName());
+      }
+      if (pageIndex > columnIndex.getNullPages().size()) {
+        throw new IOException("There are more pages " + pageIndex + " found in the column than in the columnIndex " +
+                columnIndex.getNullPages().size());
+      }
+      org.apache.parquet.column.statistics.Statistics.Builder statsBuilder =
+              org.apache.parquet.column.statistics.Statistics.getBuilderForReading(type);
+      statsBuilder.withNumNulls(columnIndex.getNullCounts().get(pageIndex));
+
+      if (!columnIndex.getNullPages().get(pageIndex)) {
+        statsBuilder.withMin(columnIndex.getMinValues().get(pageIndex).array().clone());
+        statsBuilder.withMax(columnIndex.getMaxValues().get(pageIndex).array().clone());
+      }
+      return statsBuilder.build();
+    } else if (pageStatistics != null) {
+      return converter.fromParquetStatistics(createdBy, pageStatistics, type);
+    } else {
+      return null;
+    }
+  }
+
+  private byte[] processPageLoad(TransParquetFileReader reader,
+                                 boolean isCompressed,
+                                 CompressionCodecFactory.BytesInputCompressor compressor,
+                                 CompressionCodecFactory.BytesInputDecompressor decompressor,
+                                 int payloadLength,
+                                 int rawDataLength,
+                                 boolean encrypt,
+                                 BlockCipher.Encryptor dataEncryptor,
+                                 byte[] AAD) throws IOException {
+    BytesInput data = readBlock(payloadLength, reader);
+
+    // recompress page load
+    if (compressor != null) {
+      if (isCompressed) {
+        data = decompressor.decompress(data, rawDataLength);
+      }
+      data = compressor.compress(data);
+    }
+
+    if (!encrypt) {
+      return data.toByteArray();
+    }
+
+    // encrypt page load
+    return dataEncryptor.encrypt(data.toByteArray(), AAD);
+  }
+
+  public BytesInput readBlock(int length, TransParquetFileReader reader) throws IOException {
+    byte[] data;
+    if (length > pageBufferSize) {
+      data = new byte[length];
+    } else {
+      data = pageBuffer;
+    }
+    reader.blockRead(data, 0, length);
+    return BytesInput.from(data, 0, length);
+  }
+
+  public BytesInput readBlockAllocate(int length, TransParquetFileReader reader) throws IOException {
+    byte[] data = new byte[length];
+    reader.blockRead(data, 0, length);
+    return BytesInput.from(data, 0, length);
+  }
+
+  private int toIntWithCheck(long size) {
+    if ((int)size != size) {
+      throw new ParquetEncodingException("size is bigger than " + Integer.MAX_VALUE + " bytes: " + size);
+    }
+    return (int)size;
+  }
+
+  // We have to rewrite getPaths because MessageType only get level 0 paths
+  private void getPaths(GroupType schema, List<String> paths, String parent) {
+    List<Type> fields = schema.getFields();
+    String prefix = (parent == null) ? "" : parent + ".";
+    for (Type field : fields) {
+      paths.add(prefix + field.getName());
+      if (field instanceof GroupType) {
+        getPaths(field.asGroupType(), paths, prefix + field.getName());
+      }
+    }
+  }
+
+  private MessageType pruneColumnsInSchema(MessageType schema, Set<ColumnPath> prunePaths) {
+    List<Type> fields = schema.getFields();
+    List<String> currentPath = new ArrayList<>();
+    List<Type> prunedFields = pruneColumnsInFields(fields, currentPath, prunePaths);
+    MessageType newSchema = new MessageType(schema.getName(), prunedFields);
+    return newSchema;
+  }
+
+  private List<Type> pruneColumnsInFields(List<Type> fields, List<String> currentPath, Set<ColumnPath> prunePaths) {
+    List<Type> prunedFields = new ArrayList<>();
+    for (Type childField : fields) {
+      Type prunedChildField = pruneColumnsInField(childField, currentPath, prunePaths);
+      if (prunedChildField != null) {
+        prunedFields.add(prunedChildField);
+      }
+    }
+    return prunedFields;
+  }
+
+  private Type pruneColumnsInField(Type field, List<String> currentPath, Set<ColumnPath> prunePaths) {
+    String fieldName = field.getName();
+    currentPath.add(fieldName);
+    ColumnPath path = ColumnPath.get(currentPath.toArray(new String[0]));
+    Type prunedField = null;
+    if (!prunePaths.contains(path)) {
+      if (field.isPrimitive()) {
+        prunedField = field;
+      } else {
+        List<Type> childFields = ((GroupType) field).getFields();
+        List<Type> prunedFields = pruneColumnsInFields(childFields, currentPath, prunePaths);
+        if (prunedFields.size() > 0) {
+          prunedField = ((GroupType) field).withNewFields(prunedFields);
+        }
+      }
+    }
+
+    currentPath.remove(fieldName);

Review Comment:
   Fixed



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/MaskMode.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.rewrite;
+
+public enum MaskMode {
+  NULLIFY("nullify"),
+  HASH("hash"),
+  REDACT("redact");
+
+  private String mode;
+
+  MaskMode(String text) {
+    this.mode = text;
+  }
+
+  public String getMode() {
+    return this.mode;
+  }
+
+  public static MaskMode fromString(String mode) {

Review Comment:
   Fixed



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -0,0 +1,733 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.rewrite;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ColumnReader;
+import org.apache.parquet.column.ColumnWriteStore;
+import org.apache.parquet.column.ColumnWriter;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.impl.ColumnReadStoreImpl;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.InternalColumnEncryptionSetup;
+import org.apache.parquet.crypto.InternalFileEncryptor;
+import org.apache.parquet.format.BlockCipher;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.ColumnChunkPageWriteStore;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopCodecs;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH;
+import static org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
+import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;
+
+public class ParquetRewriter implements Closeable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ParquetRewriter.class);
+  private final int pageBufferSize = ParquetProperties.DEFAULT_PAGE_SIZE * 2;
+  private final byte[] pageBuffer = new byte[pageBufferSize];
+  private TransParquetFileReader reader;
+  private ParquetFileWriter writer;
+  private ParquetMetadata meta;
+  private MessageType schema;
+  private String createdBy;
+  private CompressionCodecName codecName = null;
+  private List<String> pruneColumns = null;
+  private Map<ColumnPath, MaskMode> maskColumns = null;
+  private Set<ColumnPath> encryptColumns = null;
+  private boolean encryptMode = false;
+
+  public ParquetRewriter(RewriteOptions options) throws IOException {
+    Path inPath = options.getInputFile();
+    Path outPath = options.getOutputFile();
+    Configuration conf = options.getConf();
+
+    // TODO: set more member variables
+    codecName = options.getCodecName();
+    pruneColumns = options.getPruneColumns();
+
+    // Get file metadata and full schema from the input file
+    meta = ParquetFileReader.readFooter(conf, inPath, NO_FILTER);
+    schema = meta.getFileMetaData().getSchema();
+    createdBy = meta.getFileMetaData().getCreatedBy();

Review Comment:
   I'd rather keep the same behavior for now and fix it in a separate JIRA.



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -0,0 +1,733 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.rewrite;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ColumnReader;
+import org.apache.parquet.column.ColumnWriteStore;
+import org.apache.parquet.column.ColumnWriter;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.impl.ColumnReadStoreImpl;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.InternalColumnEncryptionSetup;
+import org.apache.parquet.crypto.InternalFileEncryptor;
+import org.apache.parquet.format.BlockCipher;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.ColumnChunkPageWriteStore;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopCodecs;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH;
+import static org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
+import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;
+
+public class ParquetRewriter implements Closeable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ParquetRewriter.class);
+  private final int pageBufferSize = ParquetProperties.DEFAULT_PAGE_SIZE * 2;
+  private final byte[] pageBuffer = new byte[pageBufferSize];
+  private TransParquetFileReader reader;
+  private ParquetFileWriter writer;
+  private ParquetMetadata meta;
+  private MessageType schema;
+  private String createdBy;
+  private CompressionCodecName codecName = null;
+  private List<String> pruneColumns = null;
+  private Map<ColumnPath, MaskMode> maskColumns = null;
+  private Set<ColumnPath> encryptColumns = null;
+  private boolean encryptMode = false;
+
+  public ParquetRewriter(RewriteOptions options) throws IOException {
+    Path inPath = options.getInputFile();
+    Path outPath = options.getOutputFile();
+    Configuration conf = options.getConf();
+
+    // TODO: set more member variables
+    codecName = options.getCodecName();
+    pruneColumns = options.getPruneColumns();
+
+    // Get file metadata and full schema from the input file
+    meta = ParquetFileReader.readFooter(conf, inPath, NO_FILTER);
+    schema = meta.getFileMetaData().getSchema();
+    createdBy = meta.getFileMetaData().getCreatedBy();
+
+    // Prune columns if specified
+    if (pruneColumns != null && !pruneColumns.isEmpty()) {
+      List<String> paths = new ArrayList<>();
+      getPaths(schema, paths, null);
+      for (String col : pruneColumns) {
+        if (!paths.contains(col)) {
+          LOG.warn("Input column name {} doesn't show up in the schema of file {}", col, inPath.getName());
+        }
+      }
+
+      Set<ColumnPath> prunePaths = convertToColumnPaths(pruneColumns);
+      schema = pruneColumnsInSchema(schema, prunePaths);
+    }
+
+    if (options.getMaskColumns() != null) {
+      this.maskColumns = new HashMap<>();
+      for (Map.Entry<String, MaskMode> col : options.getMaskColumns().entrySet()) {
+        maskColumns.put(ColumnPath.fromDotString(col.getKey()), col.getValue());
+      }
+    }
+
+    if (options.getEncryptColumns() != null && options.getFileEncryptionProperties() != null) {
+      this.encryptColumns = convertToColumnPaths(options.getEncryptColumns());
+      this.encryptMode = true;
+      // TODO: make sure options.getFileEncryptionProperties() is set
+    }
+
+    reader = new TransParquetFileReader(
+            HadoopInputFile.fromPath(inPath, conf), HadoopReadOptions.builder(conf).build());
+
+    ParquetFileWriter.Mode writerMode = ParquetFileWriter.Mode.CREATE;
+    writer = new ParquetFileWriter(HadoopOutputFile.fromPath(outPath, conf), schema, writerMode,
+            DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT, DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH,
+            DEFAULT_STATISTICS_TRUNCATE_LENGTH, ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED,
+            options.getFileEncryptionProperties());
+    writer.start();
+  }
+
+  // Ctor for legacy CompressionConverter and ColumnMasker
+  public ParquetRewriter(TransParquetFileReader reader,
+                         ParquetFileWriter writer,
+                         ParquetMetadata meta,
+                         MessageType schema,
+                         String createdBy,
+                         CompressionCodecName codecName,
+                         List<String> maskColumns,
+                         MaskMode maskMode) {
+    this.reader = reader;
+    this.writer = writer;
+    this.meta = meta;
+    this.schema = schema;
+    this.createdBy = createdBy == null ? meta.getFileMetaData().getCreatedBy() : createdBy;
+    this.codecName = codecName;
+    if (maskColumns != null && maskMode != null) {
+      this.maskColumns = new HashMap<>();
+      for (String col : maskColumns) {
+        this.maskColumns.put(ColumnPath.fromDotString(col), maskMode);
+      }
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    writer.end(meta.getFileMetaData().getKeyValueMetaData());
+  }
+
+  public void processBlocks() throws IOException {
+    PageReadStore store = reader.readNextRowGroup();
+    ColumnReadStoreImpl crStore = new ColumnReadStoreImpl(store, new DummyGroupConverter(), schema, createdBy);
+    Map<ColumnPath, ColumnDescriptor> descriptorsMap = schema.getColumns().stream().collect(
+            Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
+
+    int blockId = 0;
+    while (store != null) {
+      writer.startBlock(store.getRowCount());
+
+      BlockMetaData blockMetaData = meta.getBlocks().get(blockId);
+      List<ColumnChunkMetaData> columnsInOrder = blockMetaData.getColumns();
+
+      for (int i = 0, columnId = 0; i < columnsInOrder.size(); i += 1) {

Review Comment:
   Fixed



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -0,0 +1,733 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.rewrite;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ColumnReader;
+import org.apache.parquet.column.ColumnWriteStore;
+import org.apache.parquet.column.ColumnWriter;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.impl.ColumnReadStoreImpl;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.InternalColumnEncryptionSetup;
+import org.apache.parquet.crypto.InternalFileEncryptor;
+import org.apache.parquet.format.BlockCipher;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.ColumnChunkPageWriteStore;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopCodecs;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH;
+import static org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
+import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;
+
+public class ParquetRewriter implements Closeable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ParquetRewriter.class);
+  private final int pageBufferSize = ParquetProperties.DEFAULT_PAGE_SIZE * 2;
+  private final byte[] pageBuffer = new byte[pageBufferSize];
+  private TransParquetFileReader reader;
+  private ParquetFileWriter writer;
+  private ParquetMetadata meta;
+  private MessageType schema;
+  private String createdBy;
+  private CompressionCodecName codecName = null;
+  private List<String> pruneColumns = null;
+  private Map<ColumnPath, MaskMode> maskColumns = null;
+  private Set<ColumnPath> encryptColumns = null;
+  private boolean encryptMode = false;
+
+  public ParquetRewriter(RewriteOptions options) throws IOException {
+    Path inPath = options.getInputFile();
+    Path outPath = options.getOutputFile();
+    Configuration conf = options.getConf();
+
+    // TODO: set more member variables
+    codecName = options.getCodecName();
+    pruneColumns = options.getPruneColumns();
+
+    // Get file metadata and full schema from the input file
+    meta = ParquetFileReader.readFooter(conf, inPath, NO_FILTER);
+    schema = meta.getFileMetaData().getSchema();
+    createdBy = meta.getFileMetaData().getCreatedBy();
+
+    // Prune columns if specified
+    if (pruneColumns != null && !pruneColumns.isEmpty()) {
+      List<String> paths = new ArrayList<>();
+      getPaths(schema, paths, null);
+      for (String col : pruneColumns) {
+        if (!paths.contains(col)) {
+          LOG.warn("Input column name {} doesn't show up in the schema of file {}", col, inPath.getName());
+        }
+      }
+
+      Set<ColumnPath> prunePaths = convertToColumnPaths(pruneColumns);
+      schema = pruneColumnsInSchema(schema, prunePaths);
+    }
+
+    if (options.getMaskColumns() != null) {
+      this.maskColumns = new HashMap<>();
+      for (Map.Entry<String, MaskMode> col : options.getMaskColumns().entrySet()) {
+        maskColumns.put(ColumnPath.fromDotString(col.getKey()), col.getValue());
+      }
+    }
+
+    if (options.getEncryptColumns() != null && options.getFileEncryptionProperties() != null) {
+      this.encryptColumns = convertToColumnPaths(options.getEncryptColumns());
+      this.encryptMode = true;
+      // TODO: make sure options.getFileEncryptionProperties() is set
+    }
+
+    reader = new TransParquetFileReader(
+            HadoopInputFile.fromPath(inPath, conf), HadoopReadOptions.builder(conf).build());
+
+    ParquetFileWriter.Mode writerMode = ParquetFileWriter.Mode.CREATE;
+    writer = new ParquetFileWriter(HadoopOutputFile.fromPath(outPath, conf), schema, writerMode,
+            DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT, DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH,
+            DEFAULT_STATISTICS_TRUNCATE_LENGTH, ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED,
+            options.getFileEncryptionProperties());
+    writer.start();
+  }
+
+  // Ctor for legacy CompressionConverter and ColumnMasker
+  public ParquetRewriter(TransParquetFileReader reader,
+                         ParquetFileWriter writer,
+                         ParquetMetadata meta,
+                         MessageType schema,
+                         String createdBy,
+                         CompressionCodecName codecName,
+                         List<String> maskColumns,
+                         MaskMode maskMode) {
+    this.reader = reader;
+    this.writer = writer;
+    this.meta = meta;
+    this.schema = schema;
+    this.createdBy = createdBy == null ? meta.getFileMetaData().getCreatedBy() : createdBy;
+    this.codecName = codecName;
+    if (maskColumns != null && maskMode != null) {
+      this.maskColumns = new HashMap<>();
+      for (String col : maskColumns) {
+        this.maskColumns.put(ColumnPath.fromDotString(col), maskMode);
+      }
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    writer.end(meta.getFileMetaData().getKeyValueMetaData());
+  }
+
+  public void processBlocks() throws IOException {
+    PageReadStore store = reader.readNextRowGroup();
+    ColumnReadStoreImpl crStore = new ColumnReadStoreImpl(store, new DummyGroupConverter(), schema, createdBy);
+    Map<ColumnPath, ColumnDescriptor> descriptorsMap = schema.getColumns().stream().collect(
+            Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
+
+    int blockId = 0;
+    while (store != null) {
+      writer.startBlock(store.getRowCount());
+
+      BlockMetaData blockMetaData = meta.getBlocks().get(blockId);
+      List<ColumnChunkMetaData> columnsInOrder = blockMetaData.getColumns();
+
+      for (int i = 0, columnId = 0; i < columnsInOrder.size(); i += 1) {
+        ColumnChunkMetaData chunk = columnsInOrder.get(i);
+        ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
+
+        // This column has been pruned.
+        if (descriptor == null) {
+          continue;
+        }
+
+        // If a column is encrypted, we simply throw exception.
+        // Later we can add a feature to trans-encrypt it with different keys
+        if (chunk.isEncrypted()) {
+          throw new IOException("Column " + chunk.getPath().toDotString() + " is already encrypted");
+        }
+
+        reader.setStreamPosition(chunk.getStartingPos());
+        CompressionCodecName newCodecName = codecName == null ? chunk.getCodec() : codecName;
+        EncryptorRunTime encryptorRunTime = null;
+        if (this.encryptMode) {
+          encryptorRunTime = new EncryptorRunTime(writer.getEncryptor(), chunk, blockId, columnId);
+        }
+        boolean encryptColumn = encryptColumns != null && encryptColumns.contains(chunk.getPath());

Review Comment:
   Fixed



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/MaskMode.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.rewrite;
+
+public enum MaskMode {
+  NULLIFY("nullify"),
+  HASH("hash"),
+  REDACT("redact");
+
+  private String mode;
+
+  MaskMode(String text) {
+    this.mode = text;

Review Comment:
   Fixed



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -0,0 +1,733 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.rewrite;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ColumnReader;
+import org.apache.parquet.column.ColumnWriteStore;
+import org.apache.parquet.column.ColumnWriter;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.impl.ColumnReadStoreImpl;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.InternalColumnEncryptionSetup;
+import org.apache.parquet.crypto.InternalFileEncryptor;
+import org.apache.parquet.format.BlockCipher;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.ColumnChunkPageWriteStore;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopCodecs;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH;
+import static org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
+import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;
+
+public class ParquetRewriter implements Closeable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ParquetRewriter.class);
+  private final int pageBufferSize = ParquetProperties.DEFAULT_PAGE_SIZE * 2;
+  private final byte[] pageBuffer = new byte[pageBufferSize];
+  private TransParquetFileReader reader;
+  private ParquetFileWriter writer;
+  private ParquetMetadata meta;
+  private MessageType schema;
+  private String createdBy;
+  private CompressionCodecName codecName = null;
+  private List<String> pruneColumns = null;
+  private Map<ColumnPath, MaskMode> maskColumns = null;
+  private Set<ColumnPath> encryptColumns = null;
+  private boolean encryptMode = false;
+
+  public ParquetRewriter(RewriteOptions options) throws IOException {
+    Path inPath = options.getInputFile();
+    Path outPath = options.getOutputFile();
+    Configuration conf = options.getConf();
+
+    // TODO: set more member variables
+    codecName = options.getCodecName();
+    pruneColumns = options.getPruneColumns();
+
+    // Get file metadata and full schema from the input file
+    meta = ParquetFileReader.readFooter(conf, inPath, NO_FILTER);
+    schema = meta.getFileMetaData().getSchema();
+    createdBy = meta.getFileMetaData().getCreatedBy();
+
+    // Prune columns if specified
+    if (pruneColumns != null && !pruneColumns.isEmpty()) {
+      List<String> paths = new ArrayList<>();
+      getPaths(schema, paths, null);
+      for (String col : pruneColumns) {
+        if (!paths.contains(col)) {
+          LOG.warn("Input column name {} doesn't show up in the schema of file {}", col, inPath.getName());
+        }
+      }
+
+      Set<ColumnPath> prunePaths = convertToColumnPaths(pruneColumns);
+      schema = pruneColumnsInSchema(schema, prunePaths);
+    }
+
+    if (options.getMaskColumns() != null) {
+      this.maskColumns = new HashMap<>();
+      for (Map.Entry<String, MaskMode> col : options.getMaskColumns().entrySet()) {
+        maskColumns.put(ColumnPath.fromDotString(col.getKey()), col.getValue());
+      }
+    }
+
+    if (options.getEncryptColumns() != null && options.getFileEncryptionProperties() != null) {
+      this.encryptColumns = convertToColumnPaths(options.getEncryptColumns());
+      this.encryptMode = true;
+      // TODO: make sure options.getFileEncryptionProperties() is set
+    }
+
+    reader = new TransParquetFileReader(
+            HadoopInputFile.fromPath(inPath, conf), HadoopReadOptions.builder(conf).build());
+
+    ParquetFileWriter.Mode writerMode = ParquetFileWriter.Mode.CREATE;
+    writer = new ParquetFileWriter(HadoopOutputFile.fromPath(outPath, conf), schema, writerMode,
+            DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT, DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH,
+            DEFAULT_STATISTICS_TRUNCATE_LENGTH, ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED,
+            options.getFileEncryptionProperties());
+    writer.start();
+  }
+
+  // Ctor for legacy CompressionConverter and ColumnMasker
+  public ParquetRewriter(TransParquetFileReader reader,
+                         ParquetFileWriter writer,
+                         ParquetMetadata meta,
+                         MessageType schema,
+                         String createdBy,
+                         CompressionCodecName codecName,
+                         List<String> maskColumns,
+                         MaskMode maskMode) {
+    this.reader = reader;
+    this.writer = writer;
+    this.meta = meta;
+    this.schema = schema;
+    this.createdBy = createdBy == null ? meta.getFileMetaData().getCreatedBy() : createdBy;
+    this.codecName = codecName;
+    if (maskColumns != null && maskMode != null) {
+      this.maskColumns = new HashMap<>();
+      for (String col : maskColumns) {
+        this.maskColumns.put(ColumnPath.fromDotString(col), maskMode);
+      }
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    writer.end(meta.getFileMetaData().getKeyValueMetaData());
+  }
+
+  public void processBlocks() throws IOException {
+    PageReadStore store = reader.readNextRowGroup();
+    ColumnReadStoreImpl crStore = new ColumnReadStoreImpl(store, new DummyGroupConverter(), schema, createdBy);
+    Map<ColumnPath, ColumnDescriptor> descriptorsMap = schema.getColumns().stream().collect(
+            Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
+
+    int blockId = 0;
+    while (store != null) {
+      writer.startBlock(store.getRowCount());
+
+      BlockMetaData blockMetaData = meta.getBlocks().get(blockId);
+      List<ColumnChunkMetaData> columnsInOrder = blockMetaData.getColumns();
+
+      for (int i = 0, columnId = 0; i < columnsInOrder.size(); i += 1) {
+        ColumnChunkMetaData chunk = columnsInOrder.get(i);
+        ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
+
+        // This column has been pruned.
+        if (descriptor == null) {
+          continue;
+        }
+
+        // If a column is encrypted, we simply throw exception.
+        // Later we can add a feature to trans-encrypt it with different keys
+        if (chunk.isEncrypted()) {
+          throw new IOException("Column " + chunk.getPath().toDotString() + " is already encrypted");
+        }
+
+        reader.setStreamPosition(chunk.getStartingPos());
+        CompressionCodecName newCodecName = codecName == null ? chunk.getCodec() : codecName;
+        EncryptorRunTime encryptorRunTime = null;
+        if (this.encryptMode) {
+          encryptorRunTime = new EncryptorRunTime(writer.getEncryptor(), chunk, blockId, columnId);
+        }
+        boolean encryptColumn = encryptColumns != null && encryptColumns.contains(chunk.getPath());
+
+        if (maskColumns != null && maskColumns.containsKey(chunk.getPath())) {
+          // Mask column and compress it again.
+          MaskMode maskMode = maskColumns.get(chunk.getPath());
+          if (maskMode.equals(MaskMode.NULLIFY)) {
+            Type.Repetition repetition = descriptor.getPrimitiveType().getRepetition();
+            if (repetition.equals(Type.Repetition.REQUIRED)) {
+              throw new IOException(
+                      "Required column [" + descriptor.getPrimitiveType().getName() + "] cannot be nullified");
+            }
+            nullifyColumn(descriptor, chunk, crStore, writer, schema, newCodecName, encryptorRunTime, encryptColumn);
+          } else {
+            throw new UnsupportedOperationException("Only nullify is supported for now");
+          }
+        } else if (encryptMode || codecName != null) {
+          // Translate compression and/or encryption
+          writer.startColumn(descriptor, crStore.getColumnReader(descriptor).getTotalValueCount(), newCodecName);
+          processChunk(chunk, newCodecName, encryptorRunTime, encryptColumn);
+          writer.endColumn();
+        } else {
+          // Nothing changed, simply copy the binary data.
+          BloomFilter bloomFilter = reader.readBloomFilter(chunk);
+          ColumnIndex columnIndex = reader.readColumnIndex(chunk);
+          OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
+          writer.appendColumnChunk(descriptor, reader.getStream(), chunk, bloomFilter, columnIndex, offsetIndex);
+        }
+
+        columnId++;
+      }
+
+      writer.endBlock();
+      store = reader.readNextRowGroup();
+      blockId++;
+    }
+  }
+
+  private void processChunk(ColumnChunkMetaData chunk,
+                            CompressionCodecName newCodecName,
+                            EncryptorRunTime encryptorRunTime,
+                            boolean encryptColumn) throws IOException {
+    CompressionCodecFactory codecFactory = HadoopCodecs.newFactory(0);
+    CompressionCodecFactory.BytesInputDecompressor decompressor = null;
+    CompressionCodecFactory.BytesInputCompressor compressor = null;
+    if (!newCodecName.equals(chunk.getCodec())) {
+      // Re-compress only if a different codec has been specified
+      decompressor = codecFactory.getDecompressor(chunk.getCodec());
+      compressor = codecFactory.getCompressor(newCodecName);
+    }
+
+    // EncryptorRunTime is only provided when encryption is required
+    BlockCipher.Encryptor metaEncryptor = null;
+    BlockCipher.Encryptor dataEncryptor = null;
+    byte[] dictPageAAD = null;
+    byte[] dataPageAAD = null;
+    byte[] dictPageHeaderAAD = null;
+    byte[] dataPageHeaderAAD = null;
+    if (encryptorRunTime != null) {
+      metaEncryptor = encryptorRunTime.getMetaDataEncryptor();
+      dataEncryptor = encryptorRunTime.getDataEncryptor();
+      dictPageAAD = encryptorRunTime.getDictPageAAD();
+      dataPageAAD = encryptorRunTime.getDataPageAAD();
+      dictPageHeaderAAD = encryptorRunTime.getDictPageHeaderAAD();
+      dataPageHeaderAAD = encryptorRunTime.getDataPageHeaderAAD();
+    }
+
+    ColumnIndex columnIndex = reader.readColumnIndex(chunk);
+    OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
+
+    reader.setStreamPosition(chunk.getStartingPos());
+    DictionaryPage dictionaryPage = null;
+    long readValues = 0;
+    Statistics statistics = null;
+    ParquetMetadataConverter converter = new ParquetMetadataConverter();
+    int pageOrdinal = 0;
+    long totalChunkValues = chunk.getValueCount();
+    while (readValues < totalChunkValues) {
+      PageHeader pageHeader = reader.readPageHeader();
+      int compressedPageSize = pageHeader.getCompressed_page_size();
+      byte[] pageLoad;
+      switch (pageHeader.type) {
+        case DICTIONARY_PAGE:
+          if (dictionaryPage != null) {
+            throw new IOException("has more than one dictionary page in column chunk");
+          }
+          //No quickUpdatePageAAD needed for dictionary page
+          DictionaryPageHeader dictPageHeader = pageHeader.dictionary_page_header;
+          pageLoad = processPageLoad(reader,
+                  true,
+                  compressor,
+                  decompressor,
+                  pageHeader.getCompressed_page_size(),
+                  pageHeader.getUncompressed_page_size(),
+                  encryptColumn,
+                  dataEncryptor,
+                  dictPageAAD);
+          writer.writeDictionaryPage(new DictionaryPage(BytesInput.from(pageLoad),
+                  pageHeader.getUncompressed_page_size(),
+                  dictPageHeader.getNum_values(),
+                  converter.getEncoding(dictPageHeader.getEncoding())),
+                  metaEncryptor,
+                  dictPageHeaderAAD);
+          break;
+        case DATA_PAGE:
+          if (encryptColumn) {
+            AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
+            AesCipher.quickUpdatePageAAD(dataPageAAD, pageOrdinal);
+          }
+          DataPageHeader headerV1 = pageHeader.data_page_header;
+          pageLoad = processPageLoad(reader,
+                  true,
+                  compressor,
+                  decompressor,
+                  pageHeader.getCompressed_page_size(),
+                  pageHeader.getUncompressed_page_size(),
+                  encryptColumn,
+                  dataEncryptor,
+                  dataPageAAD);
+          statistics = convertStatistics(
+                  createdBy, chunk.getPrimitiveType(), headerV1.getStatistics(), columnIndex, pageOrdinal, converter);
+          readValues += headerV1.getNum_values();
+          if (offsetIndex != null) {
+            long rowCount = 1 + offsetIndex.getLastRowIndex(
+                    pageOrdinal, totalChunkValues) - offsetIndex.getFirstRowIndex(pageOrdinal);
+            writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()),
+                    pageHeader.getUncompressed_page_size(),
+                    BytesInput.from(pageLoad),
+                    statistics,
+                    toIntWithCheck(rowCount),
+                    converter.getEncoding(headerV1.getRepetition_level_encoding()),
+                    converter.getEncoding(headerV1.getDefinition_level_encoding()),
+                    converter.getEncoding(headerV1.getEncoding()),
+                    metaEncryptor,
+                    dataPageHeaderAAD);
+          } else {
+            writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()),
+                    pageHeader.getUncompressed_page_size(),
+                    BytesInput.from(pageLoad),
+                    statistics,
+                    converter.getEncoding(headerV1.getRepetition_level_encoding()),
+                    converter.getEncoding(headerV1.getDefinition_level_encoding()),
+                    converter.getEncoding(headerV1.getEncoding()),
+                    metaEncryptor,
+                    dataPageHeaderAAD);
+          }
+          pageOrdinal++;
+          break;
+        case DATA_PAGE_V2:
+          if (encryptColumn) {
+            AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
+            AesCipher.quickUpdatePageAAD(dataPageAAD, pageOrdinal);
+          }
+          DataPageHeaderV2 headerV2 = pageHeader.data_page_header_v2;
+          int rlLength = headerV2.getRepetition_levels_byte_length();
+          BytesInput rlLevels = readBlockAllocate(rlLength, reader);
+          int dlLength = headerV2.getDefinition_levels_byte_length();
+          BytesInput dlLevels = readBlockAllocate(dlLength, reader);
+          int payLoadLength = pageHeader.getCompressed_page_size() - rlLength - dlLength;
+          int rawDataLength = pageHeader.getUncompressed_page_size() - rlLength - dlLength;
+          pageLoad = processPageLoad(
+                  reader,
+                  headerV2.is_compressed,
+                  compressor,
+                  decompressor,
+                  payLoadLength,
+                  rawDataLength,
+                  encryptColumn,
+                  dataEncryptor,
+                  dataPageAAD);
+          statistics = convertStatistics(
+                  createdBy, chunk.getPrimitiveType(), headerV2.getStatistics(), columnIndex, pageOrdinal, converter);
+          readValues += headerV2.getNum_values();
+          writer.writeDataPageV2(headerV2.getNum_rows(),
+                  headerV2.getNum_nulls(),
+                  headerV2.getNum_values(),
+                  rlLevels,
+                  dlLevels,
+                  converter.getEncoding(headerV2.getEncoding()),
+                  BytesInput.from(pageLoad),
+                  rawDataLength,
+                  statistics);
+          pageOrdinal++;
+          break;
+        default:
+          LOG.debug("skipping page of type {} of size {}", pageHeader.getType(), compressedPageSize);
+          break;
+      }
+    }
+  }
+
+  private Statistics convertStatistics(String createdBy,
+                                       PrimitiveType type,
+                                       org.apache.parquet.format.Statistics pageStatistics,
+                                       ColumnIndex columnIndex,
+                                       int pageIndex,
+                                       ParquetMetadataConverter converter) throws IOException {
+    if (columnIndex != null) {
+      if (columnIndex.getNullPages() == null) {
+        throw new IOException("columnIndex has null variable 'nullPages' which indicates corrupted data for type: " +
+                type.getName());
+      }
+      if (pageIndex > columnIndex.getNullPages().size()) {
+        throw new IOException("There are more pages " + pageIndex + " found in the column than in the columnIndex " +
+                columnIndex.getNullPages().size());
+      }
+      org.apache.parquet.column.statistics.Statistics.Builder statsBuilder =
+              org.apache.parquet.column.statistics.Statistics.getBuilderForReading(type);
+      statsBuilder.withNumNulls(columnIndex.getNullCounts().get(pageIndex));
+
+      if (!columnIndex.getNullPages().get(pageIndex)) {
+        statsBuilder.withMin(columnIndex.getMinValues().get(pageIndex).array().clone());
+        statsBuilder.withMax(columnIndex.getMaxValues().get(pageIndex).array().clone());
+      }
+      return statsBuilder.build();
+    } else if (pageStatistics != null) {
+      return converter.fromParquetStatistics(createdBy, pageStatistics, type);
+    } else {
+      return null;
+    }
+  }
+
+  private byte[] processPageLoad(TransParquetFileReader reader,
+                                 boolean isCompressed,
+                                 CompressionCodecFactory.BytesInputCompressor compressor,
+                                 CompressionCodecFactory.BytesInputDecompressor decompressor,
+                                 int payloadLength,
+                                 int rawDataLength,
+                                 boolean encrypt,
+                                 BlockCipher.Encryptor dataEncryptor,
+                                 byte[] AAD) throws IOException {
+    BytesInput data = readBlock(payloadLength, reader);
+
+    // recompress page load
+    if (compressor != null) {
+      if (isCompressed) {
+        data = decompressor.decompress(data, rawDataLength);
+      }
+      data = compressor.compress(data);
+    }
+
+    if (!encrypt) {
+      return data.toByteArray();
+    }
+
+    // encrypt page load
+    return dataEncryptor.encrypt(data.toByteArray(), AAD);
+  }
+
+  public BytesInput readBlock(int length, TransParquetFileReader reader) throws IOException {
+    byte[] data;
+    if (length > pageBufferSize) {
+      data = new byte[length];
+    } else {
+      data = pageBuffer;
+    }
+    reader.blockRead(data, 0, length);
+    return BytesInput.from(data, 0, length);
+  }
+
+  public BytesInput readBlockAllocate(int length, TransParquetFileReader reader) throws IOException {
+    byte[] data = new byte[length];
+    reader.blockRead(data, 0, length);
+    return BytesInput.from(data, 0, length);
+  }
+
+  private int toIntWithCheck(long size) {
+    if ((int)size != size) {
+      throw new ParquetEncodingException("size is bigger than " + Integer.MAX_VALUE + " bytes: " + size);
+    }
+    return (int)size;
+  }
+
+  // We have to rewrite getPaths because MessageType only get level 0 paths
+  private void getPaths(GroupType schema, List<String> paths, String parent) {
+    List<Type> fields = schema.getFields();
+    String prefix = (parent == null) ? "" : parent + ".";
+    for (Type field : fields) {
+      paths.add(prefix + field.getName());
+      if (field instanceof GroupType) {
+        getPaths(field.asGroupType(), paths, prefix + field.getName());
+      }
+    }
+  }
+
+  private MessageType pruneColumnsInSchema(MessageType schema, Set<ColumnPath> prunePaths) {
+    List<Type> fields = schema.getFields();
+    List<String> currentPath = new ArrayList<>();
+    List<Type> prunedFields = pruneColumnsInFields(fields, currentPath, prunePaths);
+    MessageType newSchema = new MessageType(schema.getName(), prunedFields);
+    return newSchema;
+  }
+
+  private List<Type> pruneColumnsInFields(List<Type> fields, List<String> currentPath, Set<ColumnPath> prunePaths) {
+    List<Type> prunedFields = new ArrayList<>();
+    for (Type childField : fields) {
+      Type prunedChildField = pruneColumnsInField(childField, currentPath, prunePaths);
+      if (prunedChildField != null) {
+        prunedFields.add(prunedChildField);
+      }
+    }
+    return prunedFields;
+  }
+
+  private Type pruneColumnsInField(Type field, List<String> currentPath, Set<ColumnPath> prunePaths) {
+    String fieldName = field.getName();
+    currentPath.add(fieldName);
+    ColumnPath path = ColumnPath.get(currentPath.toArray(new String[0]));
+    Type prunedField = null;
+    if (!prunePaths.contains(path)) {
+      if (field.isPrimitive()) {
+        prunedField = field;
+      } else {
+        List<Type> childFields = ((GroupType) field).getFields();
+        List<Type> prunedFields = pruneColumnsInFields(childFields, currentPath, prunePaths);
+        if (prunedFields.size() > 0) {
+          prunedField = ((GroupType) field).withNewFields(prunedFields);
+        }
+      }
+    }
+
+    currentPath.remove(fieldName);
+    return prunedField;
+  }
+
+  private Set<ColumnPath> convertToColumnPaths(List<String> cols) {
+    Set<ColumnPath> prunePaths = new HashSet<>();
+    for (String col : cols) {
+      prunePaths.add(ColumnPath.fromDotString(col));
+    }
+    return prunePaths;
+  }
+
+  private void nullifyColumn(ColumnDescriptor descriptor,
+                             ColumnChunkMetaData chunk,
+                             ColumnReadStoreImpl crStore,
+                             ParquetFileWriter writer,
+                             MessageType schema,
+                             CompressionCodecName newCodecName,
+                             EncryptorRunTime encryptorRunTime,
+                             boolean encryptColumn) throws IOException {
+    // TODO: support encryption
+    if (encryptorRunTime != null) {
+      throw new RuntimeException("Nullifying and encrypting column is not implemented yet");
+    }
+    long totalChunkValues = chunk.getValueCount();
+    int dMax = descriptor.getMaxDefinitionLevel();
+    ColumnReader cReader = crStore.getColumnReader(descriptor);
+
+    ParquetProperties.WriterVersion writerVersion = chunk.getEncodingStats().usesV2Pages() ?
+            ParquetProperties.WriterVersion.PARQUET_2_0 : ParquetProperties.WriterVersion.PARQUET_1_0;
+    ParquetProperties props = ParquetProperties.builder()
+            .withWriterVersion(writerVersion)
+            .build();
+    CodecFactory codecFactory = new CodecFactory(new Configuration(), props.getPageSizeThreshold());
+    CodecFactory.BytesCompressor compressor = codecFactory.getCompressor(newCodecName);
+
+    // Create new schema that only has the current column
+    MessageType newSchema = newSchema(schema, descriptor);
+    ColumnChunkPageWriteStore cPageStore = new ColumnChunkPageWriteStore(
+            compressor, newSchema, props.getAllocator(), props.getColumnIndexTruncateLength());
+    ColumnWriteStore cStore = props.newColumnWriteStore(newSchema, cPageStore);
+    ColumnWriter cWriter = cStore.getColumnWriter(descriptor);
+
+    for (int i = 0; i < totalChunkValues; i++) {
+      int rlvl = cReader.getCurrentRepetitionLevel();
+      int dlvl = cReader.getCurrentDefinitionLevel();
+      if (dlvl == dMax) {
+        // since we checked ether optional or repeated, dlvl should be > 0
+        if (dlvl == 0) {
+          throw new IOException("definition level is detected to be 0 for column " +
+                  chunk.getPath().toDotString() + " to be nullified");
+        }
+        // we just write one null for the whole list at the top level,
+        // instead of nullify the elements in the list one by one
+        if (rlvl == 0) {
+          cWriter.writeNull(rlvl, dlvl - 1);
+        }
+      } else {
+        cWriter.writeNull(rlvl, dlvl);
+      }
+      cStore.endRecord();
+    }
+
+    cStore.flush();
+    cPageStore.flushToFileWriter(writer);
+
+    cStore.close();
+    cWriter.close();
+  }
+
+  private MessageType newSchema(MessageType schema, ColumnDescriptor descriptor) {
+    String[] path = descriptor.getPath();
+    Type type = schema.getType(path);
+    if (path.length == 1) {
+      return new MessageType(schema.getName(), type);
+    }
+
+    for (Type field : schema.getFields()) {
+      if (!field.isPrimitive()) {
+        Type newType = extractField(field.asGroupType(), type);
+        if (newType != null) {
+          return new MessageType(schema.getName(), newType);
+        }
+      }
+    }
+
+    // We should never hit this because 'type' is returned by schema.getType().
+    throw new RuntimeException("No field is found");
+  }
+
+  private Type extractField(GroupType candidate, Type targetField) {
+    if (targetField.equals(candidate)) {
+      return targetField;
+    }
+
+    // In case 'type' is a descendants of candidate
+    for (Type field : candidate.asGroupType().getFields()) {
+      if (field.isPrimitive()) {
+        if (field.equals(targetField)) {
+          return new GroupType(candidate.getRepetition(), candidate.getName(), targetField);
+        }
+      } else {
+        Type tempField = extractField(field.asGroupType(), targetField);
+        if (tempField != null) {
+          return tempField;
+        }
+      }
+    }
+
+    return null;
+  }
+
+  private static final class DummyGroupConverter extends GroupConverter {
+    @Override public void start() {}
+    @Override public void end() {}
+    @Override public Converter getConverter(int fieldIndex) { return new DummyConverter(); }
+  }
+
+  private static final class DummyConverter extends PrimitiveConverter {
+    @Override public GroupConverter asGroupConverter() { return new DummyGroupConverter(); }
+  }
+
+  private static class EncryptorRunTime {

Review Comment:
   Renamed it to ColumnChunkEncryptorRunTime



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -0,0 +1,733 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.rewrite;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ColumnReader;
+import org.apache.parquet.column.ColumnWriteStore;
+import org.apache.parquet.column.ColumnWriter;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.impl.ColumnReadStoreImpl;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.InternalColumnEncryptionSetup;
+import org.apache.parquet.crypto.InternalFileEncryptor;
+import org.apache.parquet.format.BlockCipher;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.ColumnChunkPageWriteStore;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopCodecs;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH;
+import static org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
+import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;
+
+public class ParquetRewriter implements Closeable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ParquetRewriter.class);
+  private final int pageBufferSize = ParquetProperties.DEFAULT_PAGE_SIZE * 2;
+  private final byte[] pageBuffer = new byte[pageBufferSize];
+  private TransParquetFileReader reader;
+  private ParquetFileWriter writer;
+  private ParquetMetadata meta;
+  private MessageType schema;
+  private String createdBy;
+  private CompressionCodecName codecName = null;
+  private List<String> pruneColumns = null;
+  private Map<ColumnPath, MaskMode> maskColumns = null;
+  private Set<ColumnPath> encryptColumns = null;
+  private boolean encryptMode = false;
+
+  public ParquetRewriter(RewriteOptions options) throws IOException {
+    Path inPath = options.getInputFile();
+    Path outPath = options.getOutputFile();
+    Configuration conf = options.getConf();
+
+    // TODO: set more member variables
+    codecName = options.getCodecName();
+    pruneColumns = options.getPruneColumns();
+
+    // Get file metadata and full schema from the input file
+    meta = ParquetFileReader.readFooter(conf, inPath, NO_FILTER);
+    schema = meta.getFileMetaData().getSchema();
+    createdBy = meta.getFileMetaData().getCreatedBy();
+
+    // Prune columns if specified
+    if (pruneColumns != null && !pruneColumns.isEmpty()) {
+      List<String> paths = new ArrayList<>();
+      getPaths(schema, paths, null);
+      for (String col : pruneColumns) {
+        if (!paths.contains(col)) {
+          LOG.warn("Input column name {} doesn't show up in the schema of file {}", col, inPath.getName());
+        }
+      }
+
+      Set<ColumnPath> prunePaths = convertToColumnPaths(pruneColumns);
+      schema = pruneColumnsInSchema(schema, prunePaths);
+    }
+
+    if (options.getMaskColumns() != null) {
+      this.maskColumns = new HashMap<>();
+      for (Map.Entry<String, MaskMode> col : options.getMaskColumns().entrySet()) {
+        maskColumns.put(ColumnPath.fromDotString(col.getKey()), col.getValue());
+      }
+    }
+
+    if (options.getEncryptColumns() != null && options.getFileEncryptionProperties() != null) {
+      this.encryptColumns = convertToColumnPaths(options.getEncryptColumns());
+      this.encryptMode = true;
+      // TODO: make sure options.getFileEncryptionProperties() is set
+    }
+
+    reader = new TransParquetFileReader(
+            HadoopInputFile.fromPath(inPath, conf), HadoopReadOptions.builder(conf).build());
+
+    ParquetFileWriter.Mode writerMode = ParquetFileWriter.Mode.CREATE;
+    writer = new ParquetFileWriter(HadoopOutputFile.fromPath(outPath, conf), schema, writerMode,
+            DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT, DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH,
+            DEFAULT_STATISTICS_TRUNCATE_LENGTH, ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED,
+            options.getFileEncryptionProperties());
+    writer.start();
+  }
+
+  // Ctor for legacy CompressionConverter and ColumnMasker
+  public ParquetRewriter(TransParquetFileReader reader,
+                         ParquetFileWriter writer,
+                         ParquetMetadata meta,
+                         MessageType schema,
+                         String createdBy,
+                         CompressionCodecName codecName,
+                         List<String> maskColumns,
+                         MaskMode maskMode) {
+    this.reader = reader;
+    this.writer = writer;
+    this.meta = meta;
+    this.schema = schema;
+    this.createdBy = createdBy == null ? meta.getFileMetaData().getCreatedBy() : createdBy;
+    this.codecName = codecName;
+    if (maskColumns != null && maskMode != null) {
+      this.maskColumns = new HashMap<>();
+      for (String col : maskColumns) {
+        this.maskColumns.put(ColumnPath.fromDotString(col), maskMode);
+      }
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    writer.end(meta.getFileMetaData().getKeyValueMetaData());
+  }
+
+  public void processBlocks() throws IOException {
+    PageReadStore store = reader.readNextRowGroup();
+    ColumnReadStoreImpl crStore = new ColumnReadStoreImpl(store, new DummyGroupConverter(), schema, createdBy);
+    Map<ColumnPath, ColumnDescriptor> descriptorsMap = schema.getColumns().stream().collect(
+            Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
+
+    int blockId = 0;
+    while (store != null) {
+      writer.startBlock(store.getRowCount());
+
+      BlockMetaData blockMetaData = meta.getBlocks().get(blockId);
+      List<ColumnChunkMetaData> columnsInOrder = blockMetaData.getColumns();
+
+      for (int i = 0, columnId = 0; i < columnsInOrder.size(); i += 1) {
+        ColumnChunkMetaData chunk = columnsInOrder.get(i);
+        ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
+
+        // This column has been pruned.
+        if (descriptor == null) {
+          continue;
+        }
+
+        // If a column is encrypted, we simply throw exception.
+        // Later we can add a feature to trans-encrypt it with different keys
+        if (chunk.isEncrypted()) {
+          throw new IOException("Column " + chunk.getPath().toDotString() + " is already encrypted");
+        }
+
+        reader.setStreamPosition(chunk.getStartingPos());
+        CompressionCodecName newCodecName = codecName == null ? chunk.getCodec() : codecName;
+        EncryptorRunTime encryptorRunTime = null;
+        if (this.encryptMode) {
+          encryptorRunTime = new EncryptorRunTime(writer.getEncryptor(), chunk, blockId, columnId);
+        }
+        boolean encryptColumn = encryptColumns != null && encryptColumns.contains(chunk.getPath());
+
+        if (maskColumns != null && maskColumns.containsKey(chunk.getPath())) {
+          // Mask column and compress it again.
+          MaskMode maskMode = maskColumns.get(chunk.getPath());
+          if (maskMode.equals(MaskMode.NULLIFY)) {
+            Type.Repetition repetition = descriptor.getPrimitiveType().getRepetition();
+            if (repetition.equals(Type.Repetition.REQUIRED)) {
+              throw new IOException(
+                      "Required column [" + descriptor.getPrimitiveType().getName() + "] cannot be nullified");
+            }
+            nullifyColumn(descriptor, chunk, crStore, writer, schema, newCodecName, encryptorRunTime, encryptColumn);
+          } else {
+            throw new UnsupportedOperationException("Only nullify is supported for now");
+          }
+        } else if (encryptMode || codecName != null) {
+          // Translate compression and/or encryption
+          writer.startColumn(descriptor, crStore.getColumnReader(descriptor).getTotalValueCount(), newCodecName);
+          processChunk(chunk, newCodecName, encryptorRunTime, encryptColumn);
+          writer.endColumn();
+        } else {
+          // Nothing changed, simply copy the binary data.
+          BloomFilter bloomFilter = reader.readBloomFilter(chunk);
+          ColumnIndex columnIndex = reader.readColumnIndex(chunk);
+          OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
+          writer.appendColumnChunk(descriptor, reader.getStream(), chunk, bloomFilter, columnIndex, offsetIndex);
+        }
+
+        columnId++;
+      }
+
+      writer.endBlock();
+      store = reader.readNextRowGroup();
+      blockId++;
+    }
+  }
+
+  private void processChunk(ColumnChunkMetaData chunk,
+                            CompressionCodecName newCodecName,
+                            EncryptorRunTime encryptorRunTime,
+                            boolean encryptColumn) throws IOException {
+    CompressionCodecFactory codecFactory = HadoopCodecs.newFactory(0);
+    CompressionCodecFactory.BytesInputDecompressor decompressor = null;
+    CompressionCodecFactory.BytesInputCompressor compressor = null;
+    if (!newCodecName.equals(chunk.getCodec())) {
+      // Re-compress only if a different codec has been specified
+      decompressor = codecFactory.getDecompressor(chunk.getCodec());
+      compressor = codecFactory.getCompressor(newCodecName);
+    }
+
+    // EncryptorRunTime is only provided when encryption is required
+    BlockCipher.Encryptor metaEncryptor = null;
+    BlockCipher.Encryptor dataEncryptor = null;
+    byte[] dictPageAAD = null;
+    byte[] dataPageAAD = null;
+    byte[] dictPageHeaderAAD = null;
+    byte[] dataPageHeaderAAD = null;
+    if (encryptorRunTime != null) {
+      metaEncryptor = encryptorRunTime.getMetaDataEncryptor();
+      dataEncryptor = encryptorRunTime.getDataEncryptor();
+      dictPageAAD = encryptorRunTime.getDictPageAAD();
+      dataPageAAD = encryptorRunTime.getDataPageAAD();
+      dictPageHeaderAAD = encryptorRunTime.getDictPageHeaderAAD();
+      dataPageHeaderAAD = encryptorRunTime.getDataPageHeaderAAD();
+    }
+
+    ColumnIndex columnIndex = reader.readColumnIndex(chunk);
+    OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
+
+    reader.setStreamPosition(chunk.getStartingPos());
+    DictionaryPage dictionaryPage = null;
+    long readValues = 0;
+    Statistics statistics = null;
+    ParquetMetadataConverter converter = new ParquetMetadataConverter();
+    int pageOrdinal = 0;
+    long totalChunkValues = chunk.getValueCount();
+    while (readValues < totalChunkValues) {
+      PageHeader pageHeader = reader.readPageHeader();
+      int compressedPageSize = pageHeader.getCompressed_page_size();
+      byte[] pageLoad;
+      switch (pageHeader.type) {
+        case DICTIONARY_PAGE:
+          if (dictionaryPage != null) {
+            throw new IOException("has more than one dictionary page in column chunk");
+          }
+          //No quickUpdatePageAAD needed for dictionary page
+          DictionaryPageHeader dictPageHeader = pageHeader.dictionary_page_header;
+          pageLoad = processPageLoad(reader,
+                  true,
+                  compressor,
+                  decompressor,
+                  pageHeader.getCompressed_page_size(),
+                  pageHeader.getUncompressed_page_size(),
+                  encryptColumn,
+                  dataEncryptor,
+                  dictPageAAD);
+          writer.writeDictionaryPage(new DictionaryPage(BytesInput.from(pageLoad),
+                  pageHeader.getUncompressed_page_size(),
+                  dictPageHeader.getNum_values(),
+                  converter.getEncoding(dictPageHeader.getEncoding())),
+                  metaEncryptor,
+                  dictPageHeaderAAD);
+          break;
+        case DATA_PAGE:
+          if (encryptColumn) {
+            AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
+            AesCipher.quickUpdatePageAAD(dataPageAAD, pageOrdinal);
+          }
+          DataPageHeader headerV1 = pageHeader.data_page_header;
+          pageLoad = processPageLoad(reader,
+                  true,
+                  compressor,
+                  decompressor,
+                  pageHeader.getCompressed_page_size(),
+                  pageHeader.getUncompressed_page_size(),
+                  encryptColumn,
+                  dataEncryptor,
+                  dataPageAAD);
+          statistics = convertStatistics(
+                  createdBy, chunk.getPrimitiveType(), headerV1.getStatistics(), columnIndex, pageOrdinal, converter);
+          readValues += headerV1.getNum_values();
+          if (offsetIndex != null) {
+            long rowCount = 1 + offsetIndex.getLastRowIndex(
+                    pageOrdinal, totalChunkValues) - offsetIndex.getFirstRowIndex(pageOrdinal);
+            writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()),
+                    pageHeader.getUncompressed_page_size(),
+                    BytesInput.from(pageLoad),
+                    statistics,
+                    toIntWithCheck(rowCount),
+                    converter.getEncoding(headerV1.getRepetition_level_encoding()),
+                    converter.getEncoding(headerV1.getDefinition_level_encoding()),
+                    converter.getEncoding(headerV1.getEncoding()),
+                    metaEncryptor,
+                    dataPageHeaderAAD);
+          } else {
+            writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()),
+                    pageHeader.getUncompressed_page_size(),
+                    BytesInput.from(pageLoad),
+                    statistics,
+                    converter.getEncoding(headerV1.getRepetition_level_encoding()),
+                    converter.getEncoding(headerV1.getDefinition_level_encoding()),
+                    converter.getEncoding(headerV1.getEncoding()),
+                    metaEncryptor,
+                    dataPageHeaderAAD);
+          }
+          pageOrdinal++;
+          break;
+        case DATA_PAGE_V2:
+          if (encryptColumn) {
+            AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
+            AesCipher.quickUpdatePageAAD(dataPageAAD, pageOrdinal);
+          }
+          DataPageHeaderV2 headerV2 = pageHeader.data_page_header_v2;
+          int rlLength = headerV2.getRepetition_levels_byte_length();
+          BytesInput rlLevels = readBlockAllocate(rlLength, reader);
+          int dlLength = headerV2.getDefinition_levels_byte_length();
+          BytesInput dlLevels = readBlockAllocate(dlLength, reader);
+          int payLoadLength = pageHeader.getCompressed_page_size() - rlLength - dlLength;
+          int rawDataLength = pageHeader.getUncompressed_page_size() - rlLength - dlLength;
+          pageLoad = processPageLoad(
+                  reader,
+                  headerV2.is_compressed,
+                  compressor,
+                  decompressor,
+                  payLoadLength,
+                  rawDataLength,
+                  encryptColumn,
+                  dataEncryptor,
+                  dataPageAAD);
+          statistics = convertStatistics(
+                  createdBy, chunk.getPrimitiveType(), headerV2.getStatistics(), columnIndex, pageOrdinal, converter);
+          readValues += headerV2.getNum_values();
+          writer.writeDataPageV2(headerV2.getNum_rows(),
+                  headerV2.getNum_nulls(),
+                  headerV2.getNum_values(),
+                  rlLevels,
+                  dlLevels,
+                  converter.getEncoding(headerV2.getEncoding()),
+                  BytesInput.from(pageLoad),
+                  rawDataLength,
+                  statistics);
+          pageOrdinal++;
+          break;
+        default:
+          LOG.debug("skipping page of type {} of size {}", pageHeader.getType(), compressedPageSize);
+          break;
+      }
+    }
+  }
+
+  private Statistics convertStatistics(String createdBy,
+                                       PrimitiveType type,
+                                       org.apache.parquet.format.Statistics pageStatistics,
+                                       ColumnIndex columnIndex,
+                                       int pageIndex,
+                                       ParquetMetadataConverter converter) throws IOException {
+    if (columnIndex != null) {
+      if (columnIndex.getNullPages() == null) {
+        throw new IOException("columnIndex has null variable 'nullPages' which indicates corrupted data for type: " +
+                type.getName());
+      }
+      if (pageIndex > columnIndex.getNullPages().size()) {
+        throw new IOException("There are more pages " + pageIndex + " found in the column than in the columnIndex " +
+                columnIndex.getNullPages().size());
+      }
+      org.apache.parquet.column.statistics.Statistics.Builder statsBuilder =
+              org.apache.parquet.column.statistics.Statistics.getBuilderForReading(type);
+      statsBuilder.withNumNulls(columnIndex.getNullCounts().get(pageIndex));
+
+      if (!columnIndex.getNullPages().get(pageIndex)) {
+        statsBuilder.withMin(columnIndex.getMinValues().get(pageIndex).array().clone());
+        statsBuilder.withMax(columnIndex.getMaxValues().get(pageIndex).array().clone());
+      }
+      return statsBuilder.build();
+    } else if (pageStatistics != null) {
+      return converter.fromParquetStatistics(createdBy, pageStatistics, type);
+    } else {
+      return null;
+    }
+  }
+
+  private byte[] processPageLoad(TransParquetFileReader reader,
+                                 boolean isCompressed,
+                                 CompressionCodecFactory.BytesInputCompressor compressor,
+                                 CompressionCodecFactory.BytesInputDecompressor decompressor,
+                                 int payloadLength,
+                                 int rawDataLength,
+                                 boolean encrypt,
+                                 BlockCipher.Encryptor dataEncryptor,
+                                 byte[] AAD) throws IOException {
+    BytesInput data = readBlock(payloadLength, reader);
+
+    // recompress page load
+    if (compressor != null) {
+      if (isCompressed) {
+        data = decompressor.decompress(data, rawDataLength);
+      }
+      data = compressor.compress(data);
+    }
+
+    if (!encrypt) {
+      return data.toByteArray();
+    }
+
+    // encrypt page load
+    return dataEncryptor.encrypt(data.toByteArray(), AAD);
+  }
+
+  public BytesInput readBlock(int length, TransParquetFileReader reader) throws IOException {
+    byte[] data;
+    if (length > pageBufferSize) {
+      data = new byte[length];
+    } else {
+      data = pageBuffer;
+    }
+    reader.blockRead(data, 0, length);
+    return BytesInput.from(data, 0, length);
+  }
+
+  public BytesInput readBlockAllocate(int length, TransParquetFileReader reader) throws IOException {
+    byte[] data = new byte[length];
+    reader.blockRead(data, 0, length);
+    return BytesInput.from(data, 0, length);
+  }
+
+  private int toIntWithCheck(long size) {
+    if ((int)size != size) {
+      throw new ParquetEncodingException("size is bigger than " + Integer.MAX_VALUE + " bytes: " + size);
+    }
+    return (int)size;
+  }
+
+  // We have to rewrite getPaths because MessageType only get level 0 paths
+  private void getPaths(GroupType schema, List<String> paths, String parent) {
+    List<Type> fields = schema.getFields();
+    String prefix = (parent == null) ? "" : parent + ".";
+    for (Type field : fields) {
+      paths.add(prefix + field.getName());
+      if (field instanceof GroupType) {
+        getPaths(field.asGroupType(), paths, prefix + field.getName());
+      }
+    }
+  }
+
+  private MessageType pruneColumnsInSchema(MessageType schema, Set<ColumnPath> prunePaths) {
+    List<Type> fields = schema.getFields();
+    List<String> currentPath = new ArrayList<>();
+    List<Type> prunedFields = pruneColumnsInFields(fields, currentPath, prunePaths);
+    MessageType newSchema = new MessageType(schema.getName(), prunedFields);
+    return newSchema;
+  }
+
+  private List<Type> pruneColumnsInFields(List<Type> fields, List<String> currentPath, Set<ColumnPath> prunePaths) {
+    List<Type> prunedFields = new ArrayList<>();
+    for (Type childField : fields) {
+      Type prunedChildField = pruneColumnsInField(childField, currentPath, prunePaths);
+      if (prunedChildField != null) {
+        prunedFields.add(prunedChildField);
+      }
+    }
+    return prunedFields;
+  }
+
+  private Type pruneColumnsInField(Type field, List<String> currentPath, Set<ColumnPath> prunePaths) {
+    String fieldName = field.getName();
+    currentPath.add(fieldName);
+    ColumnPath path = ColumnPath.get(currentPath.toArray(new String[0]));
+    Type prunedField = null;
+    if (!prunePaths.contains(path)) {
+      if (field.isPrimitive()) {
+        prunedField = field;
+      } else {
+        List<Type> childFields = ((GroupType) field).getFields();
+        List<Type> prunedFields = pruneColumnsInFields(childFields, currentPath, prunePaths);
+        if (prunedFields.size() > 0) {
+          prunedField = ((GroupType) field).withNewFields(prunedFields);
+        }
+      }
+    }
+
+    currentPath.remove(fieldName);
+    return prunedField;
+  }
+
+  private Set<ColumnPath> convertToColumnPaths(List<String> cols) {
+    Set<ColumnPath> prunePaths = new HashSet<>();
+    for (String col : cols) {
+      prunePaths.add(ColumnPath.fromDotString(col));
+    }
+    return prunePaths;
+  }
+
+  private void nullifyColumn(ColumnDescriptor descriptor,
+                             ColumnChunkMetaData chunk,
+                             ColumnReadStoreImpl crStore,
+                             ParquetFileWriter writer,
+                             MessageType schema,
+                             CompressionCodecName newCodecName,
+                             EncryptorRunTime encryptorRunTime,
+                             boolean encryptColumn) throws IOException {
+    // TODO: support encryption
+    if (encryptorRunTime != null) {
+      throw new RuntimeException("Nullifying and encrypting column is not implemented yet");
+    }
+    long totalChunkValues = chunk.getValueCount();
+    int dMax = descriptor.getMaxDefinitionLevel();
+    ColumnReader cReader = crStore.getColumnReader(descriptor);
+
+    ParquetProperties.WriterVersion writerVersion = chunk.getEncodingStats().usesV2Pages() ?
+            ParquetProperties.WriterVersion.PARQUET_2_0 : ParquetProperties.WriterVersion.PARQUET_1_0;
+    ParquetProperties props = ParquetProperties.builder()
+            .withWriterVersion(writerVersion)
+            .build();
+    CodecFactory codecFactory = new CodecFactory(new Configuration(), props.getPageSizeThreshold());
+    CodecFactory.BytesCompressor compressor = codecFactory.getCompressor(newCodecName);
+
+    // Create new schema that only has the current column
+    MessageType newSchema = newSchema(schema, descriptor);
+    ColumnChunkPageWriteStore cPageStore = new ColumnChunkPageWriteStore(
+            compressor, newSchema, props.getAllocator(), props.getColumnIndexTruncateLength());
+    ColumnWriteStore cStore = props.newColumnWriteStore(newSchema, cPageStore);
+    ColumnWriter cWriter = cStore.getColumnWriter(descriptor);
+
+    for (int i = 0; i < totalChunkValues; i++) {
+      int rlvl = cReader.getCurrentRepetitionLevel();
+      int dlvl = cReader.getCurrentDefinitionLevel();
+      if (dlvl == dMax) {
+        // since we checked ether optional or repeated, dlvl should be > 0
+        if (dlvl == 0) {
+          throw new IOException("definition level is detected to be 0 for column " +
+                  chunk.getPath().toDotString() + " to be nullified");
+        }
+        // we just write one null for the whole list at the top level,
+        // instead of nullify the elements in the list one by one
+        if (rlvl == 0) {
+          cWriter.writeNull(rlvl, dlvl - 1);
+        }
+      } else {
+        cWriter.writeNull(rlvl, dlvl);
+      }
+      cStore.endRecord();
+    }
+
+    cStore.flush();
+    cPageStore.flushToFileWriter(writer);
+
+    cStore.close();
+    cWriter.close();
+  }
+
+  private MessageType newSchema(MessageType schema, ColumnDescriptor descriptor) {
+    String[] path = descriptor.getPath();
+    Type type = schema.getType(path);
+    if (path.length == 1) {
+      return new MessageType(schema.getName(), type);
+    }
+
+    for (Type field : schema.getFields()) {
+      if (!field.isPrimitive()) {
+        Type newType = extractField(field.asGroupType(), type);
+        if (newType != null) {
+          return new MessageType(schema.getName(), newType);
+        }
+      }
+    }
+
+    // We should never hit this because 'type' is returned by schema.getType().
+    throw new RuntimeException("No field is found");
+  }
+
+  private Type extractField(GroupType candidate, Type targetField) {
+    if (targetField.equals(candidate)) {
+      return targetField;
+    }
+
+    // In case 'type' is a descendants of candidate
+    for (Type field : candidate.asGroupType().getFields()) {
+      if (field.isPrimitive()) {
+        if (field.equals(targetField)) {
+          return new GroupType(candidate.getRepetition(), candidate.getName(), targetField);
+        }
+      } else {
+        Type tempField = extractField(field.asGroupType(), targetField);
+        if (tempField != null) {
+          return tempField;
+        }
+      }
+    }
+
+    return null;
+  }
+
+  private static final class DummyGroupConverter extends GroupConverter {
+    @Override public void start() {}
+    @Override public void end() {}
+    @Override public Converter getConverter(int fieldIndex) { return new DummyConverter(); }
+  }
+
+  private static final class DummyConverter extends PrimitiveConverter {
+    @Override public GroupConverter asGroupConverter() { return new DummyGroupConverter(); }
+  }
+
+  private static class EncryptorRunTime {
+    private final InternalColumnEncryptionSetup colEncrSetup;
+    private final BlockCipher.Encryptor dataEncryptor;
+    private final BlockCipher.Encryptor metaDataEncryptor;
+    private final byte[] fileAAD ;
+
+    private byte[] dataPageHeaderAAD;
+    private byte[] dataPageAAD;
+    private byte[] dictPageHeaderAAD;
+    private byte[] dictPageAAD;
+
+    public EncryptorRunTime(InternalFileEncryptor fileEncryptor,
+                            ColumnChunkMetaData chunk,
+                            int blockId,
+                            int columnId) throws IOException  {
+      if (fileEncryptor == null) {
+        this.colEncrSetup = null;
+        this.dataEncryptor =  null;
+        this.metaDataEncryptor =  null;
+
+        this.fileAAD =  null;
+        this.dataPageHeaderAAD =  null;
+        this.dataPageAAD =  null;
+        this.dictPageHeaderAAD =  null;
+        this.dictPageAAD =  null;
+      } else {
+        this.colEncrSetup = fileEncryptor.getColumnSetup(chunk.getPath(), true, columnId);
+        this.dataEncryptor = colEncrSetup.getDataEncryptor();
+        this.metaDataEncryptor = colEncrSetup.getMetaDataEncryptor();
+
+        this.fileAAD = fileEncryptor.getFileAAD();
+        this.dataPageHeaderAAD = createAAD(colEncrSetup, ModuleType.DataPageHeader, blockId, columnId);
+        this.dataPageAAD = createAAD(colEncrSetup, ModuleType.DataPage, blockId, columnId);
+        this.dictPageHeaderAAD = createAAD(colEncrSetup, ModuleType.DictionaryPageHeader, blockId, columnId);
+        this.dictPageAAD = createAAD(colEncrSetup, ModuleType.DictionaryPage, blockId, columnId);
+      }
+    }
+
+    private byte[] createAAD(InternalColumnEncryptionSetup colEncrSetup,
+                             ModuleType moduleType,
+                             int blockId,
+                             int columnId) {
+      if (colEncrSetup != null && colEncrSetup.isEncrypted()) {

Review Comment:
   Fixed it by checking `colEncrSetup` only once per creation.





> Unified Rewriter Tool  
> -----------------------
>
>                 Key: PARQUET-2075
>                 URL: https://issues.apache.org/jira/browse/PARQUET-2075
>             Project: Parquet
>          Issue Type: New Feature
>            Reporter: Xinli Shang
>            Assignee: Gang Wu
>            Priority: Major
>
> During the discussion of PARQUET-2071, we came up with the idea of a universal tool to translate the existing file to a different state while skipping some level steps like encoding/decoding, to gain speed. For example, only decompress pages and then compress directly. For PARQUET-2071, we only decrypt and then encrypt directly. This will be useful for the existing data to onboard Parquet features like column encryption, zstd etc. 
> We already have tools like trans-compression, column pruning etc. We will consolidate all these tools with this universal tool. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)