You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/09/11 11:58:00 UTC

[jira] [Commented] (PARQUET-1381) Add merge blocks command to parquet-tools

    [ https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16610495#comment-16610495 ] 

ASF GitHub Bot commented on PARQUET-1381:
-----------------------------------------

zivanfi closed pull request #512: PARQUET-1381: Add merge blocks command to parquet-tools
URL: https://github.com/apache/parquet-mr/pull/512
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java
index 37845961a..e582908ca 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java
@@ -75,7 +75,7 @@ public ColumnReader getColumnReader(ColumnDescriptor path) {
     return newMemColumnReader(path, pageReadStore.getPageReader(path));
   }
 
-  private ColumnReaderImpl newMemColumnReader(ColumnDescriptor path, PageReader pageReader) {
+  public ColumnReaderImpl newMemColumnReader(ColumnDescriptor path, PageReader pageReader) {
     PrimitiveConverter converter = getPrimitiveConverter(path);
     return new ColumnReaderImpl(path, pageReader, converter, writerVersion);
   }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
index 82c288fe4..5349dc28a 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
@@ -262,4 +262,9 @@ public void flushToFileWriter(ParquetFileWriter writer) throws IOException {
     }
   }
 
+  void flushToFileWriter(ColumnDescriptor path, ParquetFileWriter writer) throws IOException {
+    ColumnChunkPageWriter pageWriter = writers.get(path);
+    pageWriter.writeToFileWriter(writer);
+  }
+
 }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 15fe592db..527c8313b 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -40,6 +40,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -56,6 +57,7 @@
 import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.column.Encoding;
 import org.apache.parquet.column.page.DictionaryPageReadStore;
+import org.apache.parquet.column.page.PageReader;
 import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
 import org.apache.parquet.filter2.compat.FilterCompat;
 import org.apache.parquet.filter2.compat.RowGroupFilter;
@@ -1160,27 +1162,8 @@ public void addChunk(ChunkDescriptor descriptor) {
      * @throws IOException if there is an error while reading from the stream
      */
     public List<Chunk> readAll(SeekableInputStream f) throws IOException {
-      List<Chunk> result = new ArrayList<Chunk>(chunks.size());
-      f.seek(offset);
-
-      int fullAllocations = length / options.getMaxAllocationSize();
-      int lastAllocationSize = length % options.getMaxAllocationSize();
-
-      int numAllocations = fullAllocations + (lastAllocationSize > 0 ? 1 : 0);
-      List<ByteBuffer> buffers = new ArrayList<>(numAllocations);
-
-      for (int i = 0; i < fullAllocations; i += 1) {
-        buffers.add(options.getAllocator().allocate(options.getMaxAllocationSize()));
-      }
-
-      if (lastAllocationSize > 0) {
-        buffers.add(options.getAllocator().allocate(lastAllocationSize));
-      }
-
-      for (ByteBuffer buffer : buffers) {
-        f.readFully(buffer);
-        buffer.flip();
-      }
+      List<Chunk> result = new ArrayList<>(chunks.size());
+      List<ByteBuffer> buffers = readBlocks(f, offset, length);
 
       // report in a counter the data we just scanned
       BenchmarkCounter.incrementBytesRead(length);
@@ -1206,4 +1189,72 @@ public long endPos() {
 
   }
 
+  /**
+   * @param f file to read the blocks from
+   * @return the ByteBuffer blocks
+   * @throws IOException if there is an error while reading from the stream
+   */
+  List<ByteBuffer> readBlocks(SeekableInputStream f, long offset, int length) throws IOException {
+    f.seek(offset);
+
+    int fullAllocations = length / options.getMaxAllocationSize();
+    int lastAllocationSize = length % options.getMaxAllocationSize();
+
+    int numAllocations = fullAllocations + (lastAllocationSize > 0 ? 1 : 0);
+    List<ByteBuffer> buffers = new ArrayList<>(numAllocations);
+
+    for (int i = 0; i < fullAllocations; i++) {
+      buffers.add(options.getAllocator().allocate(options.getMaxAllocationSize()));
+    }
+
+    if (lastAllocationSize > 0) {
+      buffers.add(options.getAllocator().allocate(lastAllocationSize));
+    }
+
+    for (ByteBuffer buffer : buffers) {
+      f.readFully(buffer);
+      buffer.flip();
+    }
+    return buffers;
+  }
+
+  Optional<PageReader> readColumnInBlock(int blockIndex, ColumnDescriptor columnDescriptor) {
+    BlockMetaData block = blocks.get(blockIndex);
+    if (block.getRowCount() == 0) {
+      throw new RuntimeException("Illegal row group of 0 rows");
+    }
+    Optional<ColumnChunkMetaData> mc = findColumnByPath(block, columnDescriptor.getPath());
+
+    return mc.map(column -> new ChunkDescriptor(columnDescriptor, column, column.getStartingPos(), (int) column.getTotalSize()))
+      .map(chunk -> readChunk(f, chunk));
+  }
+
+  private ColumnChunkPageReader readChunk(SeekableInputStream f, ChunkDescriptor descriptor) {
+    try {
+      List<ByteBuffer> buffers = readBlocks(f, descriptor.fileOffset, descriptor.size);
+      ByteBufferInputStream stream = ByteBufferInputStream.wrap(buffers);
+      Chunk chunk = new WorkaroundChunk(descriptor, stream.sliceBuffers(descriptor.size), f);
+      return chunk.readAllPages();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private Optional<ColumnChunkMetaData> findColumnByPath(BlockMetaData block, String[] path) {
+    for (ColumnChunkMetaData column : block.getColumns()) {
+      if (Arrays.equals(column.getPath().toArray(), path)) {
+        return Optional.of(column);
+      }
+    }
+    return Optional.empty();
+  }
+
+  public int blocksCount() {
+    return blocks.size();
+  }
+
+  public BlockMetaData getBlockMetaData(int blockIndex) {
+    return blocks.get(blockIndex);
+  }
+
 }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index c98c24796..b944e9707 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -25,12 +25,15 @@
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
@@ -41,13 +44,22 @@
 import org.apache.parquet.Preconditions;
 import org.apache.parquet.Strings;
 import org.apache.parquet.Version;
+import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.bytes.HeapByteBufferAllocator;
 import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ColumnReader;
+import org.apache.parquet.column.ColumnWriter;
 import org.apache.parquet.column.Encoding;
 import org.apache.parquet.column.EncodingStats;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.impl.ColumnReadStoreImpl;
+import org.apache.parquet.column.impl.ColumnWriteStoreV1;
 import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReader;
 import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.example.DummyRecordConverter;
 import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
 import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
@@ -57,6 +69,7 @@
 import org.apache.parquet.hadoop.metadata.FileMetaData;
 import org.apache.parquet.hadoop.metadata.GlobalMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.BlocksCombiner;
 import org.apache.parquet.hadoop.util.HadoopOutputFile;
 import org.apache.parquet.hadoop.util.HadoopStreams;
 import org.apache.parquet.io.InputFile;
@@ -519,6 +532,108 @@ public void appendFile(InputFile file) throws IOException {
     ParquetFileReader.open(file).appendTo(this);
   }
 
+  public int merge(List<InputFile> inputFiles, CodecFactory.BytesCompressor compressor, String createdBy, long maxBlockSize) throws IOException {
+    List<ParquetFileReader> readers = getReaders(inputFiles);
+    try {
+      ByteBufferAllocator allocator = new HeapByteBufferAllocator();
+      ColumnReadStoreImpl columnReadStore = new ColumnReadStoreImpl(null, new DummyRecordConverter(schema).getRootConverter(), schema, createdBy);
+      this.start();
+      List<BlocksCombiner.SmallBlocksUnion> largeBlocks = BlocksCombiner.combineLargeBlocks(readers, maxBlockSize);
+      for (BlocksCombiner.SmallBlocksUnion smallBlocks : largeBlocks) {
+        for (int columnIndex = 0; columnIndex < schema.getColumns().size(); columnIndex++) {
+          ColumnDescriptor path = schema.getColumns().get(columnIndex);
+          ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(compressor, schema, allocator);
+          ColumnWriteStoreV1 columnWriteStoreV1 = new ColumnWriteStoreV1(store, ParquetProperties.builder().build());
+          for (BlocksCombiner.SmallBlock smallBlock : smallBlocks.getBlocks()) {
+            ParquetFileReader parquetFileReader = smallBlock.getReader();
+            try {
+              Optional<PageReader> columnChunkPageReader = parquetFileReader.readColumnInBlock(smallBlock.getBlockIndex(), path);
+              ColumnWriter columnWriter = columnWriteStoreV1.getColumnWriter(path);
+              if (columnChunkPageReader.isPresent()) {
+                ColumnReader columnReader = columnReadStore.newMemColumnReader(path, columnChunkPageReader.get());
+                for (int i = 0; i < columnReader.getTotalValueCount(); i++) {
+                  consumeTriplet(columnWriter, columnReader);
+                }
+              } else {
+                MessageType inputFileSchema = parquetFileReader.getFileMetaData().getSchema();
+                String[] parentPath = getExisingParentPath(path, inputFileSchema);
+                int def = parquetFileReader.getFileMetaData().getSchema().getMaxDefinitionLevel(parentPath);
+                int rep = parquetFileReader.getFileMetaData().getSchema().getMaxRepetitionLevel(parentPath);
+                for (int i = 0; i < parquetFileReader.getBlockMetaData(smallBlock.getBlockIndex()).getRowCount(); i++) {
+                  columnWriter.writeNull(rep, def);
+                }
+              }
+            } catch (Exception e) {
+              LOG.error("File {} is not readable", parquetFileReader.getFile(), e);
+            }
+          }
+          if (columnIndex == 0) {
+            this.startBlock(smallBlocks.getRowCount());
+          }
+          columnWriteStoreV1.flush();
+          store.flushToFileWriter(path, this);
+        }
+        this.endBlock();
+      }
+      this.end(Collections.emptyMap());
+    }finally {
+      BlocksCombiner.closeReaders(readers);
+    }
+    return 0;
+  }
+
+  private String[] getExisingParentPath(ColumnDescriptor path, MessageType inputFileSchema) {
+    List<String> parentPath = Arrays.asList(path.getPath());
+    while (parentPath.size() > 0 && !inputFileSchema.containsPath(parentPath.toArray(new String[parentPath.size()]))) {
+      parentPath = parentPath.subList(0, parentPath.size() - 1);
+    }
+    return parentPath.toArray(new String[parentPath.size()]);
+  }
+
+  private List<ParquetFileReader> getReaders(List<InputFile> inputFiles) throws IOException {
+    List<ParquetFileReader> readers = new ArrayList<>(inputFiles.size());
+    for (InputFile inputFile : inputFiles) {
+      readers.add(ParquetFileReader.open(inputFile));
+    }
+    return readers;
+  }
+
+  private void consumeTriplet(ColumnWriter columnWriter, ColumnReader columnReader) {
+    int definitionLevel = columnReader.getCurrentDefinitionLevel();
+    int repetitionLevel = columnReader.getCurrentRepetitionLevel();
+    ColumnDescriptor column = columnReader.getDescriptor();
+    PrimitiveType type = column.getPrimitiveType();
+    if (definitionLevel < column.getMaxDefinitionLevel()) {
+      columnWriter.writeNull(repetitionLevel, definitionLevel);
+    } else {
+      switch (type.getPrimitiveTypeName()) {
+        case INT32:
+          columnWriter.write(columnReader.getInteger(), repetitionLevel, definitionLevel);
+          break;
+        case INT64:
+          columnWriter.write(columnReader.getLong(), repetitionLevel, definitionLevel);
+          break;
+        case BINARY:
+        case FIXED_LEN_BYTE_ARRAY:
+        case INT96:
+          columnWriter.write(columnReader.getBinary(), repetitionLevel, definitionLevel);
+          break;
+        case BOOLEAN:
+          columnWriter.write(columnReader.getBoolean(), repetitionLevel, definitionLevel);
+          break;
+        case FLOAT:
+          columnWriter.write(columnReader.getFloat(), repetitionLevel, definitionLevel);
+          break;
+        case DOUBLE:
+          columnWriter.write(columnReader.getDouble(), repetitionLevel, definitionLevel);
+          break;
+        default:
+          throw new IllegalArgumentException("Unknown primitive type " + type);
+      }
+    }
+    columnReader.consume();
+  }
+
   /**
    * @param file a file stream to read from
    * @param rowGroups row groups to copy
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/BlocksCombiner.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/BlocksCombiner.java
new file mode 100644
index 000000000..02dadc7f5
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/BlocksCombiner.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util;
+
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.util.Collections.unmodifiableList;
+
+public class BlocksCombiner {
+
+  private static final Logger LOG = LoggerFactory.getLogger(BlocksCombiner.class);
+
+  public static List<SmallBlocksUnion> combineLargeBlocks(List<ParquetFileReader> readers, long maxBlockSize) {
+    List<SmallBlocksUnion> blocks = new ArrayList<>();
+    long largeBlockSize = 0;
+    long largeBlockRecords = 0;
+    List<SmallBlock> smallBlocks = new ArrayList<>();
+    for (ParquetFileReader reader : readers) {
+      for (int blockIndex = 0; blockIndex < reader.blocksCount(); blockIndex++) {
+        BlockMetaData block = reader.getBlockMetaData(blockIndex);
+        if (!smallBlocks.isEmpty() && largeBlockSize + block.getTotalByteSize() > maxBlockSize) {
+          blocks.add(new SmallBlocksUnion(smallBlocks, largeBlockRecords));
+          smallBlocks = new ArrayList<>();
+          largeBlockSize = 0;
+          largeBlockRecords = 0;
+        }
+        largeBlockSize += block.getTotalByteSize();
+        largeBlockRecords += block.getRowCount();
+        smallBlocks.add(new SmallBlock(reader, blockIndex));
+      }
+    }
+    if (!smallBlocks.isEmpty()) {
+      blocks.add(new SmallBlocksUnion(smallBlocks, largeBlockRecords));
+    }
+    return unmodifiableList(blocks);
+  }
+
+  public static void closeReaders(List<ParquetFileReader> readers) {
+    readers.forEach(r -> {
+      try {
+        r.close();
+      } catch (IOException e) {
+        LOG.error("Error closing reader {}", r.getFile(), e);
+      }
+    });
+  }
+
+  public static class SmallBlocksUnion {
+    private final List<SmallBlock> blocks;
+    private final long rowCount;
+
+    public SmallBlocksUnion(List<SmallBlock> blocks, long rowCount) {
+      this.blocks = blocks;
+      this.rowCount = rowCount;
+    }
+
+    public List<SmallBlock> getBlocks() {
+      return blocks;
+    }
+
+    public long getRowCount() {
+      return rowCount;
+    }
+  }
+
+  public static class SmallBlock {
+    private final ParquetFileReader reader;
+    private final int blockIndex;
+
+    public SmallBlock(ParquetFileReader reader, int blockIndex) {
+      this.reader = reader;
+      this.blockIndex = blockIndex;
+    }
+
+    public ParquetFileReader getReader() {
+      return reader;
+    }
+
+    public int getBlockIndex() {
+      return blockIndex;
+    }
+  }
+}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterMergeBlocks.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterMergeBlocks.java
new file mode 100644
index 000000000..a972238cb
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterMergeBlocks.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Types;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static java.util.Arrays.asList;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE;
+import static org.apache.parquet.schema.OriginalType.UTF8;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+
+public class TestParquetWriterMergeBlocks {
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  public static final int FILE_SIZE = 10000;
+  public static final Configuration CONF = new Configuration();
+  public static final Map<String, String> EMPTY_METADATA =
+    new HashMap<String, String>();
+  public static final MessageType FILE_SCHEMA = Types.buildMessage()
+    .required(INT32).named("id")
+    .required(BINARY).as(UTF8).named("string")
+    .named("AppendTest");
+  public static final SimpleGroupFactory GROUP_FACTORY =
+    new SimpleGroupFactory(FILE_SCHEMA);
+
+  public Path file1;
+  public List<Group> file1content = new ArrayList<Group>();
+  public Path file2;
+  public List<Group> file2content = new ArrayList<Group>();
+
+  @Before
+  public void createSourceData() throws IOException {
+    this.file1 = newTemp();
+    this.file2 = newTemp();
+
+    ParquetWriter<Group> writer1 = ExampleParquetWriter.builder(file1)
+      .withType(FILE_SCHEMA)
+      .build();
+    ParquetWriter<Group> writer2 = ExampleParquetWriter.builder(file2)
+      .withType(FILE_SCHEMA)
+      .build();
+
+    for (int i = 0; i < FILE_SIZE; i += 1) {
+      Group group1 = GROUP_FACTORY.newGroup();
+      group1.add("id", i);
+      group1.add("string", UUID.randomUUID().toString());
+      writer1.write(group1);
+      file1content.add(group1);
+
+      Group group2 = GROUP_FACTORY.newGroup();
+      group2.add("id", FILE_SIZE+i);
+      group2.add("string", UUID.randomUUID().toString());
+      writer2.write(group2);
+      file2content.add(group2);
+    }
+
+    writer1.close();
+    writer2.close();
+  }
+
+  @Test
+  public void testBasicBehavior() throws IOException {
+    Path combinedFile = newTemp();
+    ParquetFileWriter writer = new ParquetFileWriter(
+      CONF, FILE_SCHEMA, combinedFile);
+
+    // Merge schema and extraMeta
+    List<Path> inputFiles = asList(file1, file2);
+    FileMetaData mergedMeta = ParquetFileWriter.mergeMetadataFiles(inputFiles, CONF).getFileMetaData();
+    List<InputFile> inputFileList = toInputFiles(inputFiles);
+    CodecFactory.BytesCompressor compressor = new CodecFactory(CONF, DEFAULT_PAGE_SIZE).getCompressor(CompressionCodecName.SNAPPY);
+
+    writer.merge(inputFileList, compressor, mergedMeta.getCreatedBy(), 128 * 1024 * 1024);
+
+    LinkedList<Group> expected = new LinkedList<>();
+    expected.addAll(file1content);
+    expected.addAll(file2content);
+
+    ParquetReader<Group> reader = ParquetReader
+      .builder(new GroupReadSupport(), combinedFile)
+      .build();
+
+    Group next;
+    while ((next = reader.read()) != null) {
+      Group expectedNext = expected.removeFirst();
+      // check each value; equals is not supported for simple records
+      Assert.assertEquals("Each id should match",
+        expectedNext.getInteger("id", 0), next.getInteger("id", 0));
+      Assert.assertEquals("Each string should match",
+        expectedNext.getString("string", 0), next.getString("string", 0));
+    }
+
+    Assert.assertEquals("All records should be present", 0, expected.size());
+  }
+
+  private List<InputFile> toInputFiles(List<Path> inputFiles) {
+    return inputFiles.stream()
+      .map(input -> {
+        try {
+          return HadoopInputFile.fromPath(input, CONF);
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }).collect(Collectors.toList());
+  }
+
+  @Test
+  public void testMergedMetadata() throws IOException {
+    Path combinedFile = newTemp();
+    ParquetFileWriter writer = new ParquetFileWriter(
+      CONF, FILE_SCHEMA, combinedFile);
+
+    // Merge schema and extraMeta
+    List<Path> inputFiles = asList(file1, file2);
+    FileMetaData mergedMeta = ParquetFileWriter.mergeMetadataFiles(inputFiles, CONF).getFileMetaData();
+    List<InputFile> inputFileList = toInputFiles(inputFiles);
+    CompressionCodecName codecName = CompressionCodecName.GZIP;
+    CodecFactory.BytesCompressor compressor = new CodecFactory(CONF, DEFAULT_PAGE_SIZE).getCompressor(codecName);
+    writer.merge(inputFileList, compressor, mergedMeta.getCreatedBy(), 128 * 1024 * 1024);
+
+    ParquetMetadata combinedFooter = ParquetFileReader.readFooter(
+      CONF, combinedFile, NO_FILTER);
+    ParquetMetadata f1Footer = ParquetFileReader.readFooter(
+      CONF, file1, NO_FILTER);
+    ParquetMetadata f2Footer = ParquetFileReader.readFooter(
+      CONF, file2, NO_FILTER);
+
+    LinkedList<BlockMetaData> expectedRowGroups = new LinkedList<>();
+    expectedRowGroups.addAll(f1Footer.getBlocks());
+    expectedRowGroups.addAll(f2Footer.getBlocks());
+    long totalRowCount = expectedRowGroups.stream().mapToLong(BlockMetaData::getRowCount).sum();
+    Assert.assertEquals("Combined should have a single row group",
+      1,
+      combinedFooter.getBlocks().size());
+
+    BlockMetaData rowGroup = combinedFooter.getBlocks().get(0);
+    Assert.assertEquals("Row count should match",
+      totalRowCount, rowGroup.getRowCount());
+    assertColumnsEquivalent(f1Footer.getBlocks().get(0).getColumns(), rowGroup.getColumns(), codecName);
+  }
+
+  public void assertColumnsEquivalent(List<ColumnChunkMetaData> expected,
+                                      List<ColumnChunkMetaData> actual,
+                                      CompressionCodecName codecName) {
+    Assert.assertEquals("Should have the expected columns",
+      expected.size(), actual.size());
+    for (int i = 0; i < actual.size(); i += 1) {
+      long numNulls = 0;
+      long valueCount = 0;
+      ColumnChunkMetaData current = actual.get(i);
+      Statistics statistics = current.getStatistics();
+      numNulls += statistics.getNumNulls();
+      valueCount += current.getValueCount();
+      if (i != 0) {
+        ColumnChunkMetaData previous = actual.get(i - 1);
+        long expectedStart = previous.getStartingPos() + previous.getTotalSize();
+        Assert.assertEquals("Should start after the previous column",
+          expectedStart, current.getStartingPos());
+      }
+
+      assertColumnMetadataEquivalent(expected.get(i), current, codecName, numNulls, valueCount);
+    }
+  }
+
+  public void assertColumnMetadataEquivalent(ColumnChunkMetaData expected,
+                                             ColumnChunkMetaData actual,
+                                             CompressionCodecName codecName,
+                                             long numNulls,
+                                             long valueCount) {
+    Assert.assertEquals("Should be the expected column",
+      expected.getPath(), expected.getPath());
+    Assert.assertEquals("Primitive type should not change",
+      expected.getType(), actual.getType());
+    Assert.assertEquals("Compression codec should not change",
+      codecName, actual.getCodec());
+    Assert.assertEquals("Data encodings should not change",
+      expected.getEncodings(), actual.getEncodings());
+    Assert.assertEquals("Statistics should not change",
+      numNulls, actual.getStatistics().getNumNulls());
+    Assert.assertEquals("Number of values should not change",
+      valueCount, actual.getValueCount());
+
+  }
+
+  @Test
+  public void testAllowDroppingColumns() throws IOException {
+    MessageType droppedColumnSchema = Types.buildMessage()
+      .required(BINARY).as(UTF8).named("string")
+      .named("AppendTest");
+
+    Path droppedColumnFile = newTemp();
+    List<Path> inputFiles = asList(file1, file2);
+    ParquetFileWriter writer = new ParquetFileWriter(
+      CONF, droppedColumnSchema, droppedColumnFile);
+    List<InputFile> inputFileList = toInputFiles(inputFiles);
+    CompressionCodecName codecName = CompressionCodecName.GZIP;
+    CodecFactory.BytesCompressor compressor = new CodecFactory(CONF, DEFAULT_PAGE_SIZE).getCompressor(codecName);
+    writer.merge(inputFileList, compressor, "", 128*1024*1024);
+
+    LinkedList<Group> expected = new LinkedList<Group>();
+    expected.addAll(file1content);
+    expected.addAll(file2content);
+
+    ParquetMetadata footer = ParquetFileReader.readFooter(
+      CONF, droppedColumnFile, NO_FILTER);
+    for (BlockMetaData rowGroup : footer.getBlocks()) {
+      Assert.assertEquals("Should have only the string column",
+        1, rowGroup.getColumns().size());
+    }
+
+    ParquetReader<Group> reader = ParquetReader
+      .builder(new GroupReadSupport(), droppedColumnFile)
+      .build();
+
+    Group next;
+    while ((next = reader.read()) != null) {
+      Group expectedNext = expected.removeFirst();
+      Assert.assertEquals("Each string should match",
+        expectedNext.getString("string", 0), next.getString("string", 0));
+    }
+
+    Assert.assertEquals("All records should be present", 0, expected.size());
+  }
+
+  private Path newTemp() throws IOException {
+    File file = temp.newFile();
+    Preconditions.checkArgument(file.delete(), "Could not remove temp file");
+    return new Path(file.toString());
+  }
+}
diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java b/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
index fe6458756..6d5b31380 100644
--- a/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
+++ b/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
@@ -19,20 +19,29 @@
 package org.apache.parquet.tools.command;
 
 import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.hadoop.util.HadoopInputFile;
 import org.apache.parquet.hadoop.util.HiddenFileFilter;
 import org.apache.parquet.hadoop.ParquetFileWriter;
 import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.io.InputFile;
 import org.apache.parquet.tools.Main;
 
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
+import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE;
 
 public class MergeCommand extends ArgsOnlyCommand {
   public static final String[] USAGE = new String[] {
@@ -49,12 +58,43 @@
 
   private Configuration conf;
 
+  private static final Options OPTIONS;
+  static {
+    OPTIONS = new Options();
+
+    Option block = Option.builder("b")
+      .longOpt("block")
+      .desc("Merge adjacent blocks into one up to upper bound size limit default to 128 MB")
+      .build();
+
+    Option limit = Option.builder("l")
+      .longOpt("limit")
+      .desc("Upper bound for merged block size in megabytes. Default: 128 MB")
+      .hasArg()
+      .build();
+
+    Option codec = Option.builder("c")
+      .longOpt("codec")
+      .desc("Compression codec name. Default: SNAPPY. Valid values: UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD")
+      .hasArg()
+      .build();
+
+    OPTIONS.addOption(limit);
+    OPTIONS.addOption(block);
+    OPTIONS.addOption(codec);
+  }
+
   public MergeCommand() {
     super(2, MAX_FILE_NUM + 1);
 
     conf = new Configuration();
   }
 
+  @Override
+  public Options getOptions() {
+    return OPTIONS;
+  }
+
   @Override
   public String[] getUsageDescription() {
     return USAGE;
@@ -63,18 +103,32 @@ public MergeCommand() {
   @Override
   public String getCommandDescription() {
     return "Merges multiple Parquet files into one. " +
-      "The command doesn't merge row groups, just places one after the other. " +
+      "Without -b option the command doesn't merge row groups, just places one after the other. " +
       "When used to merge many small files, the resulting file will still contain small row groups, " +
-      "which usually leads to bad query performance.";
+      "which usually leads to bad query performance. " +
+      "To have adjacent small blocks merged together use -b option. " +
+      "Blocks will be grouped into larger one until the upper bound is reached. " +
+      "Default block upper bound 128 MB and default compression SNAPPY can be customized using -l and -c options";
   }
 
   @Override
   public void execute(CommandLine options) throws Exception {
+    boolean mergeBlocks = options.hasOption('b');
+    int maxBlockSize = options.hasOption('l')? Integer.parseInt(options.getOptionValue('l')) * 1024 * 1024 : DEFAULT_BLOCK_SIZE;
+    CompressionCodecName compressionCodec = options.hasOption('c') ? CompressionCodecName.valueOf(options.getOptionValue('c')) : CompressionCodecName.SNAPPY;
     // Prepare arguments
     List<String> args = options.getArgList();
     List<Path> inputFiles = getInputFiles(args.subList(0, args.size() - 1));
     Path outputFile = new Path(args.get(args.size() - 1));
+    if (mergeBlocks) {
+      CodecFactory.BytesCompressor compressor = new CodecFactory(conf, DEFAULT_PAGE_SIZE).getCompressor(compressionCodec);
+      mergeBlocks(maxBlockSize, compressor, inputFiles, outputFile);
+    } else {
+      mergeFiles(inputFiles, outputFile);
+    }
+  }
 
+  private void mergeFiles(List<Path> inputFiles, Path outputFile) throws IOException {
     // Merge schema and extraMeta
     FileMetaData mergedMeta = mergedMetadata(inputFiles);
     PrintWriter out = new PrintWriter(Main.out, true);
@@ -103,6 +157,23 @@ public void execute(CommandLine options) throws Exception {
     writer.end(mergedMeta.getKeyValueMetaData());
   }
 
+  private void mergeBlocks(int maxBlockSize, CodecFactory.BytesCompressor compressor, List<Path> inputFiles, Path outputFile) throws IOException {
+    // Merge schema and extraMeta
+    FileMetaData mergedMeta = mergedMetadata(inputFiles);
+
+    // Merge data
+    ParquetFileWriter writer = new ParquetFileWriter(conf, mergedMeta.getSchema(), outputFile, ParquetFileWriter.Mode.CREATE);
+    List<InputFile> inputFileList = inputFiles.stream()
+      .map(input -> {
+        try {
+          return HadoopInputFile.fromPath(input, conf);
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }).collect(Collectors.toList());
+    writer.merge(inputFileList, compressor, mergedMeta.getCreatedBy(), maxBlockSize);
+  }
+
   private FileMetaData mergedMetadata(List<Path> inputFiles) throws IOException {
     return ParquetFileWriter.mergeMetadataFiles(inputFiles, conf).getFileMetaData();
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Add merge blocks command to parquet-tools
> -----------------------------------------
>
>                 Key: PARQUET-1381
>                 URL: https://issues.apache.org/jira/browse/PARQUET-1381
>             Project: Parquet
>          Issue Type: New Feature
>          Components: parquet-mr
>    Affects Versions: 1.10.0
>            Reporter: Ekaterina Galieva
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.10.1
>
>
> Current implementation of merge command in parquet-tools doesn't merge row groups, just places one after the other. Add API and command option to be able to merge small blocks into larger ones up to specified size limit.
> h6. Implementation details:
> Blocks are not reordered not to break possible initial predicate pushdown optimizations.
> Blocks are not divided to fit upper bound perfectly. 
> This is an intentional performance optimization. 
> This gives an opportunity to form new blocks by coping full content of smaller blocks by column, not by row.
> h6. Examples:
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [128 | 40], [120]{code}
> Expected output file blocks sizes:
> {{merge }}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b}}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b -l 256 }}
> {code:java}
> [163 | 168 | 120]
> {code}
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [40], [120], [6] {code}
> Expected output file blocks sizes:
> {{merge}}
> {code:java}
> [128 | 35 | 40 | 120 | 6] 
> {code}
> {{merge -b}}
> {code:java}
> [128 | 75 | 126] 
> {code}
> {{merge -b -l 256}}
> {code:java}
> [203 | 126]{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)