You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2023/11/08 15:45:11 UTC
(parquet-mr) branch master updated: PARQUET-2372: Avoid unnecessary reading of RowGroup data during rewriting (#1183)
This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new 0a066d8a5 PARQUET-2372: Avoid unnecessary reading of RowGroup data during rewriting (#1183)
0a066d8a5 is described below
commit 0a066d8a5c71386e56dee7bd7a21170b27e4283a
Author: Xianyang Liu <li...@hotmail.com>
AuthorDate: Wed Nov 8 23:45:06 2023 +0800
PARQUET-2372: Avoid unnecessary reading of RowGroup data during rewriting (#1183)
---
.../parquet/hadoop/rewrite/ParquetRewriter.java | 22 ++++++++--------------
1 file changed, 8 insertions(+), 14 deletions(-)
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
index bf2155e28..004a1d135 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
@@ -265,14 +265,9 @@ public class ParquetRewriter implements Closeable {
}
private void processBlocksFromReader(IndexCache indexCache) throws IOException {
- PageReadStore store = reader.readNextRowGroup();
- ColumnReadStoreImpl crStore = new ColumnReadStoreImpl(store, new DummyGroupConverter(), schema, originalCreatedBy);
-
- int blockId = 0;
- while (store != null) {
- writer.startBlock(store.getRowCount());
-
+ for (int blockId = 0; blockId < meta.getBlocks().size(); blockId ++) {
BlockMetaData blockMetaData = meta.getBlocks().get(blockId);
+ writer.startBlock(blockMetaData.getRowCount());
indexCache.setBlockMetadata(blockMetaData);
List<ColumnChunkMetaData> columnsInOrder = blockMetaData.getColumns();
for (int i = 0, columnId = 0; i < columnsInOrder.size(); i++) {
@@ -304,9 +299,9 @@ public class ParquetRewriter implements Closeable {
"Required column [" + descriptor.getPrimitiveType().getName() + "] cannot be nullified");
}
nullifyColumn(
+ blockId,
descriptor,
chunk,
- crStore,
writer,
schema,
newCodecName,
@@ -323,7 +318,7 @@ public class ParquetRewriter implements Closeable {
}
// Translate compression and/or encryption
- writer.startColumn(descriptor, crStore.getColumnReader(descriptor).getTotalValueCount(), newCodecName);
+ writer.startColumn(descriptor, chunk.getValueCount(), newCodecName);
processChunk(
chunk,
newCodecName,
@@ -345,9 +340,6 @@ public class ParquetRewriter implements Closeable {
}
writer.endBlock();
- store = reader.readNextRowGroup();
- crStore = new ColumnReadStoreImpl(store, new DummyGroupConverter(), schema, originalCreatedBy);
- blockId++;
numBlocksRewritten++;
}
}
@@ -675,9 +667,9 @@ public class ParquetRewriter implements Closeable {
return prunePaths;
}
- private void nullifyColumn(ColumnDescriptor descriptor,
+ private void nullifyColumn(int blockIndex,
+ ColumnDescriptor descriptor,
ColumnChunkMetaData chunk,
- ColumnReadStoreImpl crStore,
ParquetFileWriter writer,
MessageType schema,
CompressionCodecName newCodecName,
@@ -688,6 +680,8 @@ public class ParquetRewriter implements Closeable {
long totalChunkValues = chunk.getValueCount();
int dMax = descriptor.getMaxDefinitionLevel();
+ PageReadStore pageReadStore = reader.readRowGroup(blockIndex);
+ ColumnReadStoreImpl crStore = new ColumnReadStoreImpl(pageReadStore, new DummyGroupConverter(), schema, originalCreatedBy);
ColumnReader cReader = crStore.getColumnReader(descriptor);
ParquetProperties.WriterVersion writerVersion = chunk.getEncodingStats().usesV2Pages() ?