You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2023/11/08 15:45:11 UTC

(parquet-mr) branch master updated: PARQUET-2372: Avoid unnecessary reading of RowGroup data during rewriting (#1183)

This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a066d8a5 PARQUET-2372: Avoid unnecessary reading of RowGroup data during rewriting (#1183)
0a066d8a5 is described below

commit 0a066d8a5c71386e56dee7bd7a21170b27e4283a
Author: Xianyang Liu <li...@hotmail.com>
AuthorDate: Wed Nov 8 23:45:06 2023 +0800

    PARQUET-2372: Avoid unnecessary reading of RowGroup data during rewriting (#1183)
---
 .../parquet/hadoop/rewrite/ParquetRewriter.java    | 22 ++++++++--------------
 1 file changed, 8 insertions(+), 14 deletions(-)

diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
index bf2155e28..004a1d135 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
@@ -265,14 +265,9 @@ public class ParquetRewriter implements Closeable {
   }
 
   private void processBlocksFromReader(IndexCache indexCache) throws IOException {
-    PageReadStore store = reader.readNextRowGroup();
-    ColumnReadStoreImpl crStore = new ColumnReadStoreImpl(store, new DummyGroupConverter(), schema, originalCreatedBy);
-
-    int blockId = 0;
-    while (store != null) {
-      writer.startBlock(store.getRowCount());
-
+    for (int blockId = 0; blockId < meta.getBlocks().size(); blockId ++) {
       BlockMetaData blockMetaData = meta.getBlocks().get(blockId);
+      writer.startBlock(blockMetaData.getRowCount());
       indexCache.setBlockMetadata(blockMetaData);
       List<ColumnChunkMetaData> columnsInOrder = blockMetaData.getColumns();
       for (int i = 0, columnId = 0; i < columnsInOrder.size(); i++) {
@@ -304,9 +299,9 @@ public class ParquetRewriter implements Closeable {
                       "Required column [" + descriptor.getPrimitiveType().getName() + "] cannot be nullified");
             }
             nullifyColumn(
+                    blockId,
                     descriptor,
                     chunk,
-                    crStore,
                     writer,
                     schema,
                     newCodecName,
@@ -323,7 +318,7 @@ public class ParquetRewriter implements Closeable {
           }
 
           // Translate compression and/or encryption
-          writer.startColumn(descriptor, crStore.getColumnReader(descriptor).getTotalValueCount(), newCodecName);
+          writer.startColumn(descriptor, chunk.getValueCount(), newCodecName);
           processChunk(
                   chunk,
                   newCodecName,
@@ -345,9 +340,6 @@ public class ParquetRewriter implements Closeable {
       }
 
       writer.endBlock();
-      store = reader.readNextRowGroup();
-      crStore = new ColumnReadStoreImpl(store, new DummyGroupConverter(), schema, originalCreatedBy);
-      blockId++;
       numBlocksRewritten++;
     }
   }
@@ -675,9 +667,9 @@ public class ParquetRewriter implements Closeable {
     return prunePaths;
   }
 
-  private void nullifyColumn(ColumnDescriptor descriptor,
+  private void nullifyColumn(int blockIndex,
+                             ColumnDescriptor descriptor,
                              ColumnChunkMetaData chunk,
-                             ColumnReadStoreImpl crStore,
                              ParquetFileWriter writer,
                              MessageType schema,
                              CompressionCodecName newCodecName,
@@ -688,6 +680,8 @@ public class ParquetRewriter implements Closeable {
 
     long totalChunkValues = chunk.getValueCount();
     int dMax = descriptor.getMaxDefinitionLevel();
+    PageReadStore pageReadStore = reader.readRowGroup(blockIndex);
+    ColumnReadStoreImpl crStore = new ColumnReadStoreImpl(pageReadStore, new DummyGroupConverter(), schema, originalCreatedBy);
     ColumnReader cReader = crStore.getColumnReader(descriptor);
 
     ParquetProperties.WriterVersion writerVersion = chunk.getEncodingStats().usesV2Pages() ?