You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by GitBox <gi...@apache.org> on 2020/06/18 13:32:09 UTC

[GitHub] [parquet-mr] gszadovszky commented on a change in pull request #796: Parquet-1872: Add TransCompression command to parquet-tools

gszadovszky commented on a change in pull request #796:
URL: https://github.com/apache/parquet-mr/pull/796#discussion_r442225858



##########
File path: parquet-cli/src/test/java/org/apache/parquet/cli/commands/TransCompressionCommandTest.java
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cli.commands;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroup;
+import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
+import static org.apache.parquet.schema.Type.Repetition.REPEATED;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TransCompressionCommandTest extends ParquetFileTest  {
+
+  private Configuration conf = new Configuration();
+  private Map<String, String> extraMeta
+    = ImmutableMap.of("key1", "value1", "key2", "value2");
+
+  @Test
+  public void testTransCompression() throws Exception {
+    String[] codecs = {"UNCOMPRESSED", "SNAPPY", "GZIP", "ZSTD"};
+    for (int i = 0; i < codecs.length; i++) {
+      for (int j = 0; j <codecs.length; j++) {
+        // Same codec for both are considered as valid test case
+        testInternal(codecs[i], codecs[j], ParquetProperties.WriterVersion.PARQUET_1_0, ParquetProperties.DEFAULT_PAGE_SIZE);
+        testInternal(codecs[i], codecs[j], ParquetProperties.WriterVersion.PARQUET_2_0, ParquetProperties.DEFAULT_PAGE_SIZE);
+        testInternal(codecs[i], codecs[j], ParquetProperties.WriterVersion.PARQUET_1_0, 64);
+        testInternal(codecs[i], codecs[j], ParquetProperties.WriterVersion.PARQUET_1_0, ParquetProperties.DEFAULT_PAGE_SIZE * 100);
+      }
+    }
+  }
+
+  @Test
+  public void testSpeed() throws Exception {
+    String inputFile = createParquetFile("input", "GZIP", 100000,
+    ParquetProperties.WriterVersion.PARQUET_1_0, ParquetProperties.DEFAULT_PAGE_SIZE);
+    String outputFile = createTempFile("output_trans");
+
+    long start = System.currentTimeMillis();
+    TransCompressionCommand command = new TransCompressionCommand(createLogger());
+    command.setConf(new Configuration());
+    command.input = inputFile;
+    command.output = outputFile;
+    command.codec = "ZSTD";
+    command.run();
+    long durationTrans = System.currentTimeMillis() - start;
+
+    outputFile = createTempFile("output_record");
+    start = System.currentTimeMillis();
+    convertRecordByRecord(CompressionCodecName.valueOf("ZSTD"), new Path(inputFile), new Path(outputFile));
+    long durationRecord = System.currentTimeMillis() - start;
+
+    // The TransCompressionCommand is ~5 times faster than translating record by record
+    Assert.assertTrue(durationTrans < durationRecord);
+  }
+
+  private void testInternal(String srcCodec, String destCodec, ParquetProperties.WriterVersion writerVersion, int pageSize) throws Exception {
+    int numRecord = 1000;
+    String inputFile = createParquetFile("input", srcCodec, numRecord, writerVersion, pageSize);
+    String outputFile = createTempFile("output_trans");
+
+    TransCompressionCommand command = new TransCompressionCommand(createLogger());
+    command.setConf(new Configuration());
+    command.input = inputFile;
+    command.output = outputFile;
+    command.codec = destCodec;
+    command.run();
+
+    validateColumns(inputFile, numRecord);

Review comment:
       I think, you should check the output file instead.

##########
File path: parquet-cli/src/main/java/org/apache/parquet/cli/commands/TransCompressionCommand.java
##########
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.cli.commands;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.cli.BaseCommand;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.impl.ColumnReadStoreImpl;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.compression.CompressionCodecFactory.BytesInputCompressor;
+import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.Util;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopCodecs;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+
+@Parameters(commandDescription="Translate the compression from one to another")
+public class TransCompressionCommand extends BaseCommand {
+
+  public TransCompressionCommand(Logger console) {
+    super(console);
+  }
+
+  @Parameter(description = "<input parquet file path>")
+  String input;
+
+  @Parameter(description = "<output parquet file path>")
+  String output;
+
+  @Parameter(description = "<new compression codec")
+  String codec;
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public int run() throws IOException {
+    Preconditions.checkArgument(input != null && output != null,
+      "Both input and output parquet file paths are required.");
+
+    Preconditions.checkArgument(codec != null,
+      "The codec cannot be null");
+
+    Path inPath = new Path(input);
+    Path outPath = new Path(output);
+    CompressionCodecName codecName = CompressionCodecName.valueOf(codec);
+
+    ParquetMetadata metaData = ParquetFileReader.readFooter(getConf(), inPath, NO_FILTER);
+    MessageType schema = metaData.getFileMetaData().getSchema();
+
+    try (TransParquetFileReader reader = new TransParquetFileReader(HadoopInputFile.fromPath(inPath, getConf()), HadoopReadOptions.builder(getConf()).build())) {
+      ParquetFileWriter writer = new ParquetFileWriter(getConf(), schema, outPath, ParquetFileWriter.Mode.CREATE);
+      writer.start();
+      processBlocks(reader, writer, metaData, schema, metaData.getFileMetaData().getCreatedBy(), codecName);
+      writer.end(metaData.getFileMetaData().getKeyValueMetaData());
+    }
+    return 0;
+  }
+
+  @Override
+  public List<String> getExamples() {
+    return Lists.newArrayList(
+        "# Translate the compression from one to another",
+        " input.parquet output.parquet ZSTD"
+    );
+  }
+  private void processBlocks(TransParquetFileReader reader, ParquetFileWriter writer, ParquetMetadata meta, MessageType schema,
+                             String createdBy, CompressionCodecName codecName) throws IOException {
+    int blockIndex = 0;
+    PageReadStore store = reader.readNextRowGroup();
+    while (store != null) {
+      writer.startBlock(store.getRowCount());
+      BlockMetaData blockMetaData = meta.getBlocks().get(blockIndex);
+      List<ColumnChunkMetaData> columnsInOrder = blockMetaData.getColumns();
+      Map<ColumnPath, ColumnDescriptor> descriptorsMap = schema.getColumns().stream().collect(
+        Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
+      for (int i = 0; i < columnsInOrder.size(); i += 1) {
+        ColumnChunkMetaData chunk = columnsInOrder.get(i);
+        ColumnReadStoreImpl crstore = new ColumnReadStoreImpl(store, new DumpGroupConverter(), schema, createdBy);
+        ColumnDescriptor columnDescriptor = descriptorsMap.get(chunk.getPath());
+        writer.startColumn(columnDescriptor, crstore.getColumnReader(columnDescriptor).getTotalValueCount(), codecName);
+        processChunk(reader, writer, chunk, createdBy, codecName);
+        writer.endColumn();
+      }
+      writer.endBlock();
+      store = reader.readNextRowGroup();
+      blockIndex++;
+    }
+  }
+
+  private void processChunk(TransParquetFileReader reader, ParquetFileWriter writer, ColumnChunkMetaData chunk,
+                            String createdBy, CompressionCodecName codecName) throws IOException {
+    CompressionCodecFactory codecFactory = HadoopCodecs.newFactory(0);
+    BytesInputDecompressor decompressor = codecFactory.getDecompressor(chunk.getCodec());
+    BytesInputCompressor compressor = codecFactory.getCompressor(codecName);
+    ColumnIndex columnIndex = reader.readColumnIndex(chunk);
+    OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
+
+    reader.setStreamPosition(chunk.getStartingPos());
+    DictionaryPage dictionaryPage = null;
+    long readValues = 0;
+    Statistics statistics = null;
+    ParquetMetadataConverter converter = new ParquetMetadataConverter();
+    int pageIndex = 0;
+    long totalChunkValues = chunk.getValueCount();
+    while (readValues < totalChunkValues) {
+      PageHeader pageHeader = reader.readPageHeader();
+      byte[] pageLoad;
+      switch (pageHeader.type) {
+        case DICTIONARY_PAGE:
+          if (dictionaryPage != null) {
+            throw new IOException("has more than one dictionary page in column chunk");
+          }
+          DictionaryPageHeader dictPageHeader = pageHeader.dictionary_page_header;
+          pageLoad = translatePageLoad(reader, true, compressor, decompressor, pageHeader.getCompressed_page_size(), pageHeader.getUncompressed_page_size());
+          writer.writeDictionaryPage(new DictionaryPage(BytesInput.from(pageLoad),
+            pageHeader.getUncompressed_page_size(),
+            converter.getEncoding(dictPageHeader.getEncoding())));
+          break;
+        case DATA_PAGE:
+          DataPageHeader headerV1 = pageHeader.data_page_header;
+          pageLoad = translatePageLoad(reader, true, compressor, decompressor, pageHeader.getCompressed_page_size(), pageHeader.getUncompressed_page_size());
+          statistics = convertStatistics(createdBy, chunk.getPrimitiveType(), headerV1.getStatistics(), columnIndex, pageIndex, converter);
+          readValues += headerV1.getNum_values();
+          if (offsetIndex != null) {
+            long rowCount = 1 + offsetIndex.getLastRowIndex(pageIndex, totalChunkValues) - offsetIndex.getFirstRowIndex(pageIndex);
+            writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()),
+              pageHeader.uncompressed_page_size,
+              BytesInput.from(pageLoad),
+              statistics,
+              toIntWithCheck(rowCount),
+              converter.getEncoding(headerV1.getRepetition_level_encoding()),
+              converter.getEncoding(headerV1.getDefinition_level_encoding()),
+              converter.getEncoding(headerV1.getEncoding()));
+          } else {
+            writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()),
+              pageHeader.uncompressed_page_size,
+              BytesInput.from(pageLoad),
+              statistics,
+              converter.getEncoding(headerV1.getRepetition_level_encoding()),
+              converter.getEncoding(headerV1.getDefinition_level_encoding()),
+              converter.getEncoding(headerV1.getEncoding()));
+          }
+          pageIndex++;
+          break;
+        case DATA_PAGE_V2:
+          DataPageHeaderV2 headerV2 = pageHeader.data_page_header_v2;
+          int rlLength = headerV2.getRepetition_levels_byte_length();
+          BytesInput rlLevels = readBlock(rlLength, reader);
+          int dlLength = headerV2.getDefinition_levels_byte_length();
+          BytesInput dlLevels = readBlock(dlLength, reader);
+          int payLoadLength = pageHeader.getCompressed_page_size() - rlLength - dlLength;
+          int rawDataLength = pageHeader.getUncompressed_page_size() - rlLength - dlLength;
+          pageLoad = translatePageLoad(reader, headerV2.is_compressed, compressor, decompressor, payLoadLength, rawDataLength);
+          statistics = convertStatistics(createdBy, chunk.getPrimitiveType(), headerV2.getStatistics(), columnIndex, pageIndex, converter);
+          readValues += headerV2.getNum_values();
+          writer.writeDataPageV2(headerV2.getNum_rows(),
+            headerV2.getNum_nulls(),
+            headerV2.getNum_values(),
+            rlLevels,
+            dlLevels,
+            converter.getEncoding(headerV2.getEncoding()),
+            BytesInput.from(pageLoad),
+            pageHeader.uncompressed_page_size - rlLength - dlLength,
+            statistics);
+          pageIndex++;
+          break;
+        default:
+          break;
+      }
+    }
+  }
+
+  private Statistics convertStatistics(String createdBy, PrimitiveType type, org.apache.parquet.format.Statistics pageStatistics,
+                                       ColumnIndex columnIndex, int pageIndex, ParquetMetadataConverter converter) throws IOException {
+    if (pageStatistics != null) {
+      return converter.fromParquetStatistics(createdBy, pageStatistics, type);
+    } else if (columnIndex != null) {
+      if (columnIndex.getNullPages() == null) {
+        throw new IOException("columnIndex has null variable 'nullPages' which indicates corrupted data for type: " +  type.getName());
+      }
+      if (pageIndex > columnIndex.getNullPages().size()) {
+        throw new IOException("There are more pages " + pageIndex + " found in the column than in the columnIndex " + columnIndex.getNullPages().size());
+      }
+      org.apache.parquet.column.statistics.Statistics.Builder statsBuilder = org.apache.parquet.column.statistics.Statistics.getBuilderForReading(type);
+      statsBuilder.withNumNulls(columnIndex.getNullCounts().get(pageIndex));
+
+      if (!columnIndex.getNullPages().get(pageIndex)) {
+        statsBuilder.withMin(columnIndex.getMinValues().get(pageIndex).array().clone());
+        statsBuilder.withMax(columnIndex.getMaxValues().get(pageIndex).array().clone());
+      }
+      return statsBuilder.build();

Review comment:
       We stopped writing page statistics in purpose. It is never used in any of the implementations I aware of and using it would require to read every page headers which does not perform well. That's why we introduced _column indexes_. So, I would suggest writing page level statistics only if the original file has them.

##########
File path: parquet-cli/src/test/java/org/apache/parquet/cli/commands/TransCompressionCommandTest.java
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cli.commands;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroup;
+import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
+import static org.apache.parquet.schema.Type.Repetition.REPEATED;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TransCompressionCommandTest extends ParquetFileTest  {
+
+  private Configuration conf = new Configuration();
+  private Map<String, String> extraMeta
+    = ImmutableMap.of("key1", "value1", "key2", "value2");
+
+  @Test
+  public void testTransCompression() throws Exception {
+    String[] codecs = {"UNCOMPRESSED", "SNAPPY", "GZIP", "ZSTD"};
+    for (int i = 0; i < codecs.length; i++) {
+      for (int j = 0; j <codecs.length; j++) {
+        // Same codec for both are considered as valid test case
+        testInternal(codecs[i], codecs[j], ParquetProperties.WriterVersion.PARQUET_1_0, ParquetProperties.DEFAULT_PAGE_SIZE);
+        testInternal(codecs[i], codecs[j], ParquetProperties.WriterVersion.PARQUET_2_0, ParquetProperties.DEFAULT_PAGE_SIZE);
+        testInternal(codecs[i], codecs[j], ParquetProperties.WriterVersion.PARQUET_1_0, 64);
+        testInternal(codecs[i], codecs[j], ParquetProperties.WriterVersion.PARQUET_1_0, ParquetProperties.DEFAULT_PAGE_SIZE * 100);
+      }
+    }
+  }
+
+  @Test
+  public void testSpeed() throws Exception {
+    String inputFile = createParquetFile("input", "GZIP", 100000,
+    ParquetProperties.WriterVersion.PARQUET_1_0, ParquetProperties.DEFAULT_PAGE_SIZE);
+    String outputFile = createTempFile("output_trans");
+
+    long start = System.currentTimeMillis();
+    TransCompressionCommand command = new TransCompressionCommand(createLogger());
+    command.setConf(new Configuration());
+    command.input = inputFile;
+    command.output = outputFile;
+    command.codec = "ZSTD";
+    command.run();
+    long durationTrans = System.currentTimeMillis() - start;
+
+    outputFile = createTempFile("output_record");
+    start = System.currentTimeMillis();
+    convertRecordByRecord(CompressionCodecName.valueOf("ZSTD"), new Path(inputFile), new Path(outputFile));
+    long durationRecord = System.currentTimeMillis() - start;
+
+    // The TransCompressionCommand is ~5 times faster than translating record by record
+    Assert.assertTrue(durationTrans < durationRecord);
+  }
+
+  private void testInternal(String srcCodec, String destCodec, ParquetProperties.WriterVersion writerVersion, int pageSize) throws Exception {
+    int numRecord = 1000;
+    String inputFile = createParquetFile("input", srcCodec, numRecord, writerVersion, pageSize);
+    String outputFile = createTempFile("output_trans");
+
+    TransCompressionCommand command = new TransCompressionCommand(createLogger());
+    command.setConf(new Configuration());
+    command.input = inputFile;
+    command.output = outputFile;
+    command.codec = destCodec;
+    command.run();
+
+    validateColumns(inputFile, numRecord);
+    validMeta(inputFile, outputFile);
+    validColumnIndex(inputFile, outputFile);
+  }
+
+  private void convertRecordByRecord(CompressionCodecName codecName, Path inpath, Path outpath) throws Exception {
+    ParquetMetadata metaData = ParquetFileReader.readFooter(conf, inpath, NO_FILTER);
+    MessageType schema = metaData.getFileMetaData().getSchema();
+    HadoopInputFile inputFile = HadoopInputFile.fromPath(inpath, conf);
+    ParquetReadOptions readOptions = HadoopReadOptions.builder(conf).build();
+
+    conf.set(GroupWriteSupport.PARQUET_EXAMPLE_SCHEMA, schema.toString());
+    ExampleParquetWriter.Builder builder = ExampleParquetWriter.builder(outpath).withConf(conf).withCompressionCodec(codecName);
+
+    ParquetWriter parquetWriter = builder.build();
+
+    PageReadStore pages;
+    ParquetFileReader reader = new ParquetFileReader(inputFile, readOptions);
+
+    while ((pages = reader.readNextRowGroup()) != null) {
+      long rows = pages.getRowCount();
+      MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
+      RecordReader recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema));
+
+      for (int i = 0; i < rows; i++) {
+        SimpleGroup simpleGroup = (SimpleGroup) recordReader.read();
+        parquetWriter.write(simpleGroup);
+      }
+    }
+
+    parquetWriter.close();
+  }
+
+  private void validateColumns(String inputFile, int numRecord) throws IOException {
+    ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), new Path(inputFile)).withConf(conf).build();
+    for (int i = 0; i < numRecord; i++) {
+      Group group = reader.read();
+      assertTrue(group.getLong("DocId", 0) < 1000);
+      assertEquals(group.getBinary("Name", 0).length(), 100);
+      assertEquals(group.getBinary("Gender", 0).length(), 100);
+      Group subGroup = group.getGroup("Links", 0);
+      assertEquals(subGroup.getBinary("Backward", 0).length(), 100);
+      assertEquals(subGroup.getBinary("Forward", 0).length(), 100);

Review comment:
       I don't like these tests. You should test for exact values. You may either keep the generated data in memory so you can match the file content with it or you may read both the source and the target file in the same time and match the values.

##########
File path: parquet-cli/src/main/java/org/apache/parquet/cli/commands/TransCompressionCommand.java
##########
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.cli.commands;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.cli.BaseCommand;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.impl.ColumnReadStoreImpl;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.compression.CompressionCodecFactory.BytesInputCompressor;
+import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.Util;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopCodecs;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+
+@Parameters(commandDescription="Translate the compression from one to another")
+public class TransCompressionCommand extends BaseCommand {
+
+  public TransCompressionCommand(Logger console) {
+    super(console);
+  }
+
+  @Parameter(description = "<input parquet file path>")
+  String input;
+
+  @Parameter(description = "<output parquet file path>")
+  String output;
+
+  @Parameter(description = "<new compression codec")
+  String codec;
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public int run() throws IOException {
+    Preconditions.checkArgument(input != null && output != null,
+      "Both input and output parquet file paths are required.");
+
+    Preconditions.checkArgument(codec != null,
+      "The codec cannot be null");
+
+    Path inPath = new Path(input);
+    Path outPath = new Path(output);
+    CompressionCodecName codecName = CompressionCodecName.valueOf(codec);
+
+    ParquetMetadata metaData = ParquetFileReader.readFooter(getConf(), inPath, NO_FILTER);
+    MessageType schema = metaData.getFileMetaData().getSchema();
+
+    try (TransParquetFileReader reader = new TransParquetFileReader(HadoopInputFile.fromPath(inPath, getConf()), HadoopReadOptions.builder(getConf()).build())) {
+      ParquetFileWriter writer = new ParquetFileWriter(getConf(), schema, outPath, ParquetFileWriter.Mode.CREATE);

Review comment:
       You may add the writer to the resource opening code part just next to the reader (by separating them with `';'`).

##########
File path: parquet-cli/src/main/java/org/apache/parquet/cli/commands/TransCompressionCommand.java
##########
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.cli.commands;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.cli.BaseCommand;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.impl.ColumnReadStoreImpl;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.compression.CompressionCodecFactory.BytesInputCompressor;
+import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.Util;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopCodecs;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+
+@Parameters(commandDescription="Translate the compression from one to another")
+public class TransCompressionCommand extends BaseCommand {
+
+  public TransCompressionCommand(Logger console) {
+    super(console);
+  }
+
+  @Parameter(description = "<input parquet file path>")
+  String input;
+
+  @Parameter(description = "<output parquet file path>")
+  String output;
+
+  @Parameter(description = "<new compression codec")
+  String codec;
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public int run() throws IOException {
+    Preconditions.checkArgument(input != null && output != null,
+      "Both input and output parquet file paths are required.");
+
+    Preconditions.checkArgument(codec != null,
+      "The codec cannot be null");
+
+    Path inPath = new Path(input);
+    Path outPath = new Path(output);
+    CompressionCodecName codecName = CompressionCodecName.valueOf(codec);
+
+    ParquetMetadata metaData = ParquetFileReader.readFooter(getConf(), inPath, NO_FILTER);
+    MessageType schema = metaData.getFileMetaData().getSchema();
+
+    try (TransParquetFileReader reader = new TransParquetFileReader(HadoopInputFile.fromPath(inPath, getConf()), HadoopReadOptions.builder(getConf()).build())) {
+      ParquetFileWriter writer = new ParquetFileWriter(getConf(), schema, outPath, ParquetFileWriter.Mode.CREATE);
+      writer.start();
+      processBlocks(reader, writer, metaData, schema, metaData.getFileMetaData().getCreatedBy(), codecName);
+      writer.end(metaData.getFileMetaData().getKeyValueMetaData());
+    }
+    return 0;
+  }
+
+  @Override
+  public List<String> getExamples() {
+    return Lists.newArrayList(
+        "# Translate the compression from one to another",
+        " input.parquet output.parquet ZSTD"
+    );
+  }
+  private void processBlocks(TransParquetFileReader reader, ParquetFileWriter writer, ParquetMetadata meta, MessageType schema,
+                             String createdBy, CompressionCodecName codecName) throws IOException {
+    int blockIndex = 0;
+    PageReadStore store = reader.readNextRowGroup();
+    while (store != null) {
+      writer.startBlock(store.getRowCount());
+      BlockMetaData blockMetaData = meta.getBlocks().get(blockIndex);
+      List<ColumnChunkMetaData> columnsInOrder = blockMetaData.getColumns();
+      Map<ColumnPath, ColumnDescriptor> descriptorsMap = schema.getColumns().stream().collect(
+        Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
+      for (int i = 0; i < columnsInOrder.size(); i += 1) {
+        ColumnChunkMetaData chunk = columnsInOrder.get(i);
+        ColumnReadStoreImpl crstore = new ColumnReadStoreImpl(store, new DumpGroupConverter(), schema, createdBy);
+        ColumnDescriptor columnDescriptor = descriptorsMap.get(chunk.getPath());
+        writer.startColumn(columnDescriptor, crstore.getColumnReader(columnDescriptor).getTotalValueCount(), codecName);
+        processChunk(reader, writer, chunk, createdBy, codecName);
+        writer.endColumn();
+      }
+      writer.endBlock();
+      store = reader.readNextRowGroup();
+      blockIndex++;
+    }
+  }
+
+  private void processChunk(TransParquetFileReader reader, ParquetFileWriter writer, ColumnChunkMetaData chunk,
+                            String createdBy, CompressionCodecName codecName) throws IOException {
+    CompressionCodecFactory codecFactory = HadoopCodecs.newFactory(0);
+    BytesInputDecompressor decompressor = codecFactory.getDecompressor(chunk.getCodec());
+    BytesInputCompressor compressor = codecFactory.getCompressor(codecName);
+    ColumnIndex columnIndex = reader.readColumnIndex(chunk);
+    OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
+
+    reader.setStreamPosition(chunk.getStartingPos());
+    DictionaryPage dictionaryPage = null;
+    long readValues = 0;
+    Statistics statistics = null;
+    ParquetMetadataConverter converter = new ParquetMetadataConverter();
+    int pageIndex = 0;
+    long totalChunkValues = chunk.getValueCount();
+    while (readValues < totalChunkValues) {
+      PageHeader pageHeader = reader.readPageHeader();
+      byte[] pageLoad;
+      switch (pageHeader.type) {
+        case DICTIONARY_PAGE:
+          if (dictionaryPage != null) {
+            throw new IOException("has more than one dictionary page in column chunk");
+          }
+          DictionaryPageHeader dictPageHeader = pageHeader.dictionary_page_header;
+          pageLoad = translatePageLoad(reader, true, compressor, decompressor, pageHeader.getCompressed_page_size(), pageHeader.getUncompressed_page_size());
+          writer.writeDictionaryPage(new DictionaryPage(BytesInput.from(pageLoad),
+            pageHeader.getUncompressed_page_size(),
+            converter.getEncoding(dictPageHeader.getEncoding())));
+          break;
+        case DATA_PAGE:
+          DataPageHeader headerV1 = pageHeader.data_page_header;
+          pageLoad = translatePageLoad(reader, true, compressor, decompressor, pageHeader.getCompressed_page_size(), pageHeader.getUncompressed_page_size());
+          statistics = convertStatistics(createdBy, chunk.getPrimitiveType(), headerV1.getStatistics(), columnIndex, pageIndex, converter);
+          readValues += headerV1.getNum_values();
+          if (offsetIndex != null) {
+            long rowCount = 1 + offsetIndex.getLastRowIndex(pageIndex, totalChunkValues) - offsetIndex.getFirstRowIndex(pageIndex);
+            writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()),
+              pageHeader.uncompressed_page_size,
+              BytesInput.from(pageLoad),
+              statistics,
+              toIntWithCheck(rowCount),
+              converter.getEncoding(headerV1.getRepetition_level_encoding()),
+              converter.getEncoding(headerV1.getDefinition_level_encoding()),
+              converter.getEncoding(headerV1.getEncoding()));
+          } else {
+            writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()),
+              pageHeader.uncompressed_page_size,
+              BytesInput.from(pageLoad),
+              statistics,
+              converter.getEncoding(headerV1.getRepetition_level_encoding()),
+              converter.getEncoding(headerV1.getDefinition_level_encoding()),
+              converter.getEncoding(headerV1.getEncoding()));
+          }
+          pageIndex++;
+          break;
+        case DATA_PAGE_V2:
+          DataPageHeaderV2 headerV2 = pageHeader.data_page_header_v2;
+          int rlLength = headerV2.getRepetition_levels_byte_length();
+          BytesInput rlLevels = readBlock(rlLength, reader);
+          int dlLength = headerV2.getDefinition_levels_byte_length();
+          BytesInput dlLevels = readBlock(dlLength, reader);
+          int payLoadLength = pageHeader.getCompressed_page_size() - rlLength - dlLength;
+          int rawDataLength = pageHeader.getUncompressed_page_size() - rlLength - dlLength;
+          pageLoad = translatePageLoad(reader, headerV2.is_compressed, compressor, decompressor, payLoadLength, rawDataLength);
+          statistics = convertStatistics(createdBy, chunk.getPrimitiveType(), headerV2.getStatistics(), columnIndex, pageIndex, converter);
+          readValues += headerV2.getNum_values();
+          writer.writeDataPageV2(headerV2.getNum_rows(),
+            headerV2.getNum_nulls(),
+            headerV2.getNum_values(),
+            rlLevels,
+            dlLevels,
+            converter.getEncoding(headerV2.getEncoding()),
+            BytesInput.from(pageLoad),
+            pageHeader.uncompressed_page_size - rlLength - dlLength,
+            statistics);
+          pageIndex++;
+          break;
+        default:
+          break;
+      }
+    }
+  }
+
+  private Statistics convertStatistics(String createdBy, PrimitiveType type, org.apache.parquet.format.Statistics pageStatistics,
+                                       ColumnIndex columnIndex, int pageIndex, ParquetMetadataConverter converter) throws IOException {
+    if (pageStatistics != null) {
+      return converter.fromParquetStatistics(createdBy, pageStatistics, type);
+    } else if (columnIndex != null) {
+      if (columnIndex.getNullPages() == null) {
+        throw new IOException("columnIndex has null variable 'nullPages' which indicates corrupted data for type: " +  type.getName());
+      }
+      if (pageIndex > columnIndex.getNullPages().size()) {
+        throw new IOException("There are more pages " + pageIndex + " found in the column than in the columnIndex " + columnIndex.getNullPages().size());
+      }
+      org.apache.parquet.column.statistics.Statistics.Builder statsBuilder = org.apache.parquet.column.statistics.Statistics.getBuilderForReading(type);
+      statsBuilder.withNumNulls(columnIndex.getNullCounts().get(pageIndex));
+
+      if (!columnIndex.getNullPages().get(pageIndex)) {
+        statsBuilder.withMin(columnIndex.getMinValues().get(pageIndex).array().clone());
+        statsBuilder.withMax(columnIndex.getMaxValues().get(pageIndex).array().clone());
+      }
+      return statsBuilder.build();
+    } else {
+      return null;
+    }
+  }
+
+  private byte[] translatePageLoad(TransParquetFileReader reader, boolean isCompressed, BytesInputCompressor compressor,
+                                   BytesInputDecompressor decompressor, int payloadLength, int rawDataLength) throws IOException {
+    BytesInput data = readBlock(payloadLength, reader);
+    if (isCompressed) {
+      data = decompressor.decompress(data, rawDataLength);
+    }
+    BytesInput newCompressedData = compressor.compress(data);
+    return newCompressedData.toByteArray();
+  }
+
+  private BytesInput readBlock(int length, TransParquetFileReader reader) throws IOException {
+    byte[] data = new byte[length];
+    reader.blockRead(data);
+    return BytesInput.from(data);
+  }
+
+  private int toIntWithCheck(long size) {
+    if ((int)size != size) {
+      throw new ParquetEncodingException("size is bigger than " + Integer.MAX_VALUE + " bytes: " + size);
+    }
+    return (int)size;
+  }
+
+  private static final class DumpGroupConverter extends GroupConverter {
+    @Override public void start() {}
+    @Override public void end() {}
+    @Override public Converter getConverter(int fieldIndex) { return new DumpConverter(); }
+  }
+
+  private static final class DumpConverter extends PrimitiveConverter {
+    @Override public GroupConverter asGroupConverter() { return new DumpGroupConverter(); }
+  }

Review comment:
       I think, `Dummy` would be a better naming instead of `Dump`.

##########
File path: parquet-cli/src/main/java/org/apache/parquet/cli/commands/TransCompressionCommand.java
##########
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.cli.commands;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.cli.BaseCommand;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.impl.ColumnReadStoreImpl;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.compression.CompressionCodecFactory.BytesInputCompressor;
+import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.Util;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopCodecs;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+
+@Parameters(commandDescription="Translate the compression from one to another")
+public class TransCompressionCommand extends BaseCommand {
+
+  public TransCompressionCommand(Logger console) {
+    super(console);
+  }
+
+  @Parameter(description = "<input parquet file path>")
+  String input;
+
+  @Parameter(description = "<output parquet file path>")
+  String output;
+
+  @Parameter(description = "<new compression codec")
+  String codec;
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public int run() throws IOException {
+    Preconditions.checkArgument(input != null && output != null,
+      "Both input and output parquet file paths are required.");
+
+    Preconditions.checkArgument(codec != null,
+      "The codec cannot be null");
+
+    Path inPath = new Path(input);
+    Path outPath = new Path(output);
+    CompressionCodecName codecName = CompressionCodecName.valueOf(codec);
+
+    ParquetMetadata metaData = ParquetFileReader.readFooter(getConf(), inPath, NO_FILTER);
+    MessageType schema = metaData.getFileMetaData().getSchema();
+
+    try (TransParquetFileReader reader = new TransParquetFileReader(HadoopInputFile.fromPath(inPath, getConf()), HadoopReadOptions.builder(getConf()).build())) {
+      ParquetFileWriter writer = new ParquetFileWriter(getConf(), schema, outPath, ParquetFileWriter.Mode.CREATE);
+      writer.start();
+      processBlocks(reader, writer, metaData, schema, metaData.getFileMetaData().getCreatedBy(), codecName);
+      writer.end(metaData.getFileMetaData().getKeyValueMetaData());
+    }
+    return 0;
+  }
+
+  @Override
+  public List<String> getExamples() {
+    return Lists.newArrayList(
+        "# Translate the compression from one to another",
+        " input.parquet output.parquet ZSTD"
+    );
+  }
+  private void processBlocks(TransParquetFileReader reader, ParquetFileWriter writer, ParquetMetadata meta, MessageType schema,
+                             String createdBy, CompressionCodecName codecName) throws IOException {
+    int blockIndex = 0;
+    PageReadStore store = reader.readNextRowGroup();
+    while (store != null) {
+      writer.startBlock(store.getRowCount());
+      BlockMetaData blockMetaData = meta.getBlocks().get(blockIndex);
+      List<ColumnChunkMetaData> columnsInOrder = blockMetaData.getColumns();
+      Map<ColumnPath, ColumnDescriptor> descriptorsMap = schema.getColumns().stream().collect(
+        Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
+      for (int i = 0; i < columnsInOrder.size(); i += 1) {
+        ColumnChunkMetaData chunk = columnsInOrder.get(i);
+        ColumnReadStoreImpl crstore = new ColumnReadStoreImpl(store, new DumpGroupConverter(), schema, createdBy);
+        ColumnDescriptor columnDescriptor = descriptorsMap.get(chunk.getPath());
+        writer.startColumn(columnDescriptor, crstore.getColumnReader(columnDescriptor).getTotalValueCount(), codecName);
+        processChunk(reader, writer, chunk, createdBy, codecName);
+        writer.endColumn();
+      }
+      writer.endBlock();
+      store = reader.readNextRowGroup();
+      blockIndex++;
+    }
+  }
+
+  private void processChunk(TransParquetFileReader reader, ParquetFileWriter writer, ColumnChunkMetaData chunk,
+                            String createdBy, CompressionCodecName codecName) throws IOException {
+    CompressionCodecFactory codecFactory = HadoopCodecs.newFactory(0);
+    BytesInputDecompressor decompressor = codecFactory.getDecompressor(chunk.getCodec());
+    BytesInputCompressor compressor = codecFactory.getCompressor(codecName);
+    ColumnIndex columnIndex = reader.readColumnIndex(chunk);
+    OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
+
+    reader.setStreamPosition(chunk.getStartingPos());
+    DictionaryPage dictionaryPage = null;
+    long readValues = 0;
+    Statistics statistics = null;
+    ParquetMetadataConverter converter = new ParquetMetadataConverter();
+    int pageIndex = 0;
+    long totalChunkValues = chunk.getValueCount();
+    while (readValues < totalChunkValues) {
+      PageHeader pageHeader = reader.readPageHeader();
+      byte[] pageLoad;
+      switch (pageHeader.type) {
+        case DICTIONARY_PAGE:
+          if (dictionaryPage != null) {
+            throw new IOException("has more than one dictionary page in column chunk");
+          }
+          DictionaryPageHeader dictPageHeader = pageHeader.dictionary_page_header;
+          pageLoad = translatePageLoad(reader, true, compressor, decompressor, pageHeader.getCompressed_page_size(), pageHeader.getUncompressed_page_size());
+          writer.writeDictionaryPage(new DictionaryPage(BytesInput.from(pageLoad),
+            pageHeader.getUncompressed_page_size(),
+            converter.getEncoding(dictPageHeader.getEncoding())));
+          break;
+        case DATA_PAGE:
+          DataPageHeader headerV1 = pageHeader.data_page_header;
+          pageLoad = translatePageLoad(reader, true, compressor, decompressor, pageHeader.getCompressed_page_size(), pageHeader.getUncompressed_page_size());
+          statistics = convertStatistics(createdBy, chunk.getPrimitiveType(), headerV1.getStatistics(), columnIndex, pageIndex, converter);
+          readValues += headerV1.getNum_values();
+          if (offsetIndex != null) {
+            long rowCount = 1 + offsetIndex.getLastRowIndex(pageIndex, totalChunkValues) - offsetIndex.getFirstRowIndex(pageIndex);
+            writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()),
+              pageHeader.uncompressed_page_size,
+              BytesInput.from(pageLoad),
+              statistics,
+              toIntWithCheck(rowCount),
+              converter.getEncoding(headerV1.getRepetition_level_encoding()),
+              converter.getEncoding(headerV1.getDefinition_level_encoding()),
+              converter.getEncoding(headerV1.getEncoding()));
+          } else {
+            writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()),
+              pageHeader.uncompressed_page_size,
+              BytesInput.from(pageLoad),
+              statistics,
+              converter.getEncoding(headerV1.getRepetition_level_encoding()),
+              converter.getEncoding(headerV1.getDefinition_level_encoding()),
+              converter.getEncoding(headerV1.getEncoding()));
+          }
+          pageIndex++;
+          break;
+        case DATA_PAGE_V2:
+          DataPageHeaderV2 headerV2 = pageHeader.data_page_header_v2;
+          int rlLength = headerV2.getRepetition_levels_byte_length();
+          BytesInput rlLevels = readBlock(rlLength, reader);
+          int dlLength = headerV2.getDefinition_levels_byte_length();
+          BytesInput dlLevels = readBlock(dlLength, reader);
+          int payLoadLength = pageHeader.getCompressed_page_size() - rlLength - dlLength;
+          int rawDataLength = pageHeader.getUncompressed_page_size() - rlLength - dlLength;
+          pageLoad = translatePageLoad(reader, headerV2.is_compressed, compressor, decompressor, payLoadLength, rawDataLength);
+          statistics = convertStatistics(createdBy, chunk.getPrimitiveType(), headerV2.getStatistics(), columnIndex, pageIndex, converter);
+          readValues += headerV2.getNum_values();
+          writer.writeDataPageV2(headerV2.getNum_rows(),
+            headerV2.getNum_nulls(),
+            headerV2.getNum_values(),
+            rlLevels,
+            dlLevels,
+            converter.getEncoding(headerV2.getEncoding()),
+            BytesInput.from(pageLoad),
+            pageHeader.uncompressed_page_size - rlLength - dlLength,
+            statistics);
+          pageIndex++;
+          break;
+        default:
+          break;
+      }
+    }
+  }
+
+  private Statistics convertStatistics(String createdBy, PrimitiveType type, org.apache.parquet.format.Statistics pageStatistics,
+                                       ColumnIndex columnIndex, int pageIndex, ParquetMetadataConverter converter) throws IOException {
+    if (pageStatistics != null) {
+      return converter.fromParquetStatistics(createdBy, pageStatistics, type);
+    } else if (columnIndex != null) {
+      if (columnIndex.getNullPages() == null) {
+        throw new IOException("columnIndex has null variable 'nullPages' which indicates corrupted data for type: " +  type.getName());
+      }
+      if (pageIndex > columnIndex.getNullPages().size()) {
+        throw new IOException("There are more pages " + pageIndex + " found in the column than in the columnIndex " + columnIndex.getNullPages().size());
+      }
+      org.apache.parquet.column.statistics.Statistics.Builder statsBuilder = org.apache.parquet.column.statistics.Statistics.getBuilderForReading(type);
+      statsBuilder.withNumNulls(columnIndex.getNullCounts().get(pageIndex));
+
+      if (!columnIndex.getNullPages().get(pageIndex)) {
+        statsBuilder.withMin(columnIndex.getMinValues().get(pageIndex).array().clone());
+        statsBuilder.withMax(columnIndex.getMaxValues().get(pageIndex).array().clone());
+      }
+      return statsBuilder.build();
+    } else {
+      return null;
+    }
+  }
+
+  private byte[] translatePageLoad(TransParquetFileReader reader, boolean isCompressed, BytesInputCompressor compressor,
+                                   BytesInputDecompressor decompressor, int payloadLength, int rawDataLength) throws IOException {
+    BytesInput data = readBlock(payloadLength, reader);
+    if (isCompressed) {
+      data = decompressor.decompress(data, rawDataLength);
+    }
+    BytesInput newCompressedData = compressor.compress(data);
+    return newCompressedData.toByteArray();
+  }
+
+  private BytesInput readBlock(int length, TransParquetFileReader reader) throws IOException {
+    byte[] data = new byte[length];

Review comment:
       Creating a byte array can be expensive if there are many pages. I would suggest maintaining a cache array that is reused and created only if a block is larger than the cache.

##########
File path: parquet-cli/src/test/java/org/apache/parquet/cli/commands/TransCompressionCommandTest.java
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cli.commands;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroup;
+import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
+import static org.apache.parquet.schema.Type.Repetition.REPEATED;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TransCompressionCommandTest extends ParquetFileTest  {
+
+  private Configuration conf = new Configuration();
+  private Map<String, String> extraMeta
+    = ImmutableMap.of("key1", "value1", "key2", "value2");
+
+  @Test
+  public void testTransCompression() throws Exception {
+    String[] codecs = {"UNCOMPRESSED", "SNAPPY", "GZIP", "ZSTD"};
+    for (int i = 0; i < codecs.length; i++) {
+      for (int j = 0; j <codecs.length; j++) {
+        // Same codec for both are considered as valid test case
+        testInternal(codecs[i], codecs[j], ParquetProperties.WriterVersion.PARQUET_1_0, ParquetProperties.DEFAULT_PAGE_SIZE);
+        testInternal(codecs[i], codecs[j], ParquetProperties.WriterVersion.PARQUET_2_0, ParquetProperties.DEFAULT_PAGE_SIZE);
+        testInternal(codecs[i], codecs[j], ParquetProperties.WriterVersion.PARQUET_1_0, 64);
+        testInternal(codecs[i], codecs[j], ParquetProperties.WriterVersion.PARQUET_1_0, ParquetProperties.DEFAULT_PAGE_SIZE * 100);
+      }
+    }
+  }
+
+  @Test
+  public void testSpeed() throws Exception {
+    String inputFile = createParquetFile("input", "GZIP", 100000,
+    ParquetProperties.WriterVersion.PARQUET_1_0, ParquetProperties.DEFAULT_PAGE_SIZE);
+    String outputFile = createTempFile("output_trans");
+
+    long start = System.currentTimeMillis();
+    TransCompressionCommand command = new TransCompressionCommand(createLogger());
+    command.setConf(new Configuration());
+    command.input = inputFile;
+    command.output = outputFile;
+    command.codec = "ZSTD";
+    command.run();
+    long durationTrans = System.currentTimeMillis() - start;
+
+    outputFile = createTempFile("output_record");
+    start = System.currentTimeMillis();
+    convertRecordByRecord(CompressionCodecName.valueOf("ZSTD"), new Path(inputFile), new Path(outputFile));
+    long durationRecord = System.currentTimeMillis() - start;
+
+    // The TransCompressionCommand is ~5 times faster than translating record by record
+    Assert.assertTrue(durationTrans < durationRecord);
+  }
+
+  private void testInternal(String srcCodec, String destCodec, ParquetProperties.WriterVersion writerVersion, int pageSize) throws Exception {
+    int numRecord = 1000;
+    String inputFile = createParquetFile("input", srcCodec, numRecord, writerVersion, pageSize);
+    String outputFile = createTempFile("output_trans");
+
+    TransCompressionCommand command = new TransCompressionCommand(createLogger());
+    command.setConf(new Configuration());
+    command.input = inputFile;
+    command.output = outputFile;
+    command.codec = destCodec;
+    command.run();
+
+    validateColumns(inputFile, numRecord);
+    validMeta(inputFile, outputFile);
+    validColumnIndex(inputFile, outputFile);
+  }
+
+  private void convertRecordByRecord(CompressionCodecName codecName, Path inpath, Path outpath) throws Exception {
+    ParquetMetadata metaData = ParquetFileReader.readFooter(conf, inpath, NO_FILTER);
+    MessageType schema = metaData.getFileMetaData().getSchema();
+    HadoopInputFile inputFile = HadoopInputFile.fromPath(inpath, conf);
+    ParquetReadOptions readOptions = HadoopReadOptions.builder(conf).build();
+
+    conf.set(GroupWriteSupport.PARQUET_EXAMPLE_SCHEMA, schema.toString());
+    ExampleParquetWriter.Builder builder = ExampleParquetWriter.builder(outpath).withConf(conf).withCompressionCodec(codecName);
+
+    ParquetWriter parquetWriter = builder.build();
+
+    PageReadStore pages;
+    ParquetFileReader reader = new ParquetFileReader(inputFile, readOptions);
+
+    while ((pages = reader.readNextRowGroup()) != null) {
+      long rows = pages.getRowCount();
+      MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
+      RecordReader recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema));
+
+      for (int i = 0; i < rows; i++) {
+        SimpleGroup simpleGroup = (SimpleGroup) recordReader.read();
+        parquetWriter.write(simpleGroup);
+      }
+    }
+
+    parquetWriter.close();
+  }
+
+  private void validateColumns(String inputFile, int numRecord) throws IOException {
+    ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), new Path(inputFile)).withConf(conf).build();
+    for (int i = 0; i < numRecord; i++) {
+      Group group = reader.read();
+      assertTrue(group.getLong("DocId", 0) < 1000);
+      assertEquals(group.getBinary("Name", 0).length(), 100);
+      assertEquals(group.getBinary("Gender", 0).length(), 100);
+      Group subGroup = group.getGroup("Links", 0);
+      assertEquals(subGroup.getBinary("Backward", 0).length(), 100);
+      assertEquals(subGroup.getBinary("Forward", 0).length(), 100);
+    }
+    reader.close();
+  }
+
+  private void validMeta(String inputFile, String outFile) throws Exception {
+    ParquetMetadata inMetaData = ParquetFileReader.readFooter(conf, new Path(inputFile), NO_FILTER);
+    ParquetMetadata outMetaData = ParquetFileReader.readFooter(conf, new Path(outFile), NO_FILTER);
+    Assert.assertEquals(inMetaData.getFileMetaData().getSchema(), outMetaData.getFileMetaData().getSchema());
+    Assert.assertEquals(inMetaData.getFileMetaData().getKeyValueMetaData(), outMetaData.getFileMetaData().getKeyValueMetaData());
+  }
+
+  private void validColumnIndex(String inputFile, String outFile) throws Exception {
+    ParquetMetadata inMetaData = ParquetFileReader.readFooter(conf, new Path(inputFile), NO_FILTER);
+    ParquetMetadata outMetaData = ParquetFileReader.readFooter(conf, new Path(outFile), NO_FILTER);
+    Assert.assertEquals(inMetaData.getBlocks().size(), outMetaData.getBlocks().size());
+    try (ParquetFileReader inReader = new ParquetFileReader(HadoopInputFile.fromPath(new Path(inputFile), conf), HadoopReadOptions.builder(conf).build());
+         ParquetFileReader outReader = new ParquetFileReader(HadoopInputFile.fromPath(new Path(outFile), conf), HadoopReadOptions.builder(conf).build())) {
+      for (int i = 0; i < inMetaData.getBlocks().size(); i++) {
+        BlockMetaData inBlockMetaData = inMetaData.getBlocks().get(i);
+        BlockMetaData outBlockMetaData = outMetaData.getBlocks().get(i);
+        Assert.assertEquals(inBlockMetaData.getColumns().size(), outBlockMetaData.getColumns().size());
+        for (int j = 0; j < inBlockMetaData.getColumns().size(); j++) {
+          ColumnChunkMetaData inChunk = inBlockMetaData.getColumns().get(j);
+          ColumnIndex inColumnIndex = inReader.readColumnIndex(inChunk);
+          OffsetIndex inOffsetIndex = inReader.readOffsetIndex(inChunk);
+          ColumnChunkMetaData outChunk = outBlockMetaData.getColumns().get(j);
+          ColumnIndex outColumnIndex = outReader.readColumnIndex(outChunk);
+          OffsetIndex outOffsetIndex = outReader.readOffsetIndex(outChunk);
+          if (inColumnIndex != null) {
+            Assert.assertEquals(inColumnIndex.getBoundaryOrder(), outColumnIndex.getBoundaryOrder());
+            Assert.assertEquals(inColumnIndex.getMaxValues(), outColumnIndex.getMaxValues());
+            Assert.assertEquals(inColumnIndex.getMinValues(), outColumnIndex.getMinValues());
+            Assert.assertEquals(inColumnIndex.getNullCounts(), outColumnIndex.getNullCounts());
+          }
+          if (inOffsetIndex != null) {
+            Assert.assertEquals(inOffsetIndex.getPageCount(), outOffsetIndex.getPageCount());
+            for (int k = 0; k < inOffsetIndex.getPageCount(); k++) {
+              Assert.assertEquals(inOffsetIndex.getFirstRowIndex(k), outOffsetIndex.getFirstRowIndex(k));
+              Assert.assertEquals(inOffsetIndex.getLastRowIndex(k, inChunk.getValueCount()),
+                outOffsetIndex.getLastRowIndex(k, outChunk.getValueCount()));
+            }

Review comment:
       You are not testing `OffsetIndex.getOffset(int)` which is a big problem. These offsets are to reference the exact locations of the pages in the file. It is not trivial to test it but if you were it would fail because you are not rewriting it in the new file. Every other data in the column/offset indexes shall be the same in the source and the target file but these values shall be recalculated as the sizes of the pages will be different after compressed by another codec so the positions shift in the file.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org