You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2018/09/28 07:52:20 UTC
[parquet-mr] 02/02: PARQUET-1381: Fix missing endRecord after
merging columnIndex
This is an automated email from the ASF dual-hosted git repository.
gabor pushed a commit to branch column-indexes
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
commit c215f1ff042cfb66b492749aed10999dd7bf1aa7
Author: Gabor Szadovszky <ga...@apache.org>
AuthorDate: Mon Sep 24 17:08:24 2018 +0200
PARQUET-1381: Fix missing endRecord after merging columnIndex
---
.../java/org/apache/parquet/hadoop/ParquetFileWriter.java | 13 +++++++++++--
1 file changed, 11 insertions(+), 2 deletions(-)
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 7dd6e80..a8cd686 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -51,6 +51,7 @@ import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.bytes.HeapByteBufferAllocator;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.ColumnReader;
+import org.apache.parquet.column.ColumnWriteStore;
import org.apache.parquet.column.ColumnWriter;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.EncodingStats;
@@ -687,7 +688,7 @@ public class ParquetFileWriter {
if (columnChunkPageReader.isPresent()) {
ColumnReader columnReader = columnReadStore.newMemColumnReader(path, columnChunkPageReader.get());
for (int i = 0; i < columnReader.getTotalValueCount(); i++) {
- consumeTriplet(columnWriter, columnReader);
+ consumeTriplet(columnWriteStoreV1, columnWriter, columnReader);
}
} else {
MessageType inputFileSchema = parquetFileReader.getFileMetaData().getSchema();
@@ -696,6 +697,10 @@ public class ParquetFileWriter {
int rep = parquetFileReader.getFileMetaData().getSchema().getMaxRepetitionLevel(parentPath);
for (int i = 0; i < parquetFileReader.getBlockMetaData(smallBlock.getBlockIndex()).getRowCount(); i++) {
columnWriter.writeNull(rep, def);
+ if (def == 0) {
+ // V1 pages also respect record boundaries so we have to mark them
+ columnWriteStoreV1.endRecord();
+ }
}
}
} catch (Exception e) {
@@ -733,7 +738,7 @@ public class ParquetFileWriter {
return readers;
}
- private void consumeTriplet(ColumnWriter columnWriter, ColumnReader columnReader) {
+ private void consumeTriplet(ColumnWriteStore columnWriteStore, ColumnWriter columnWriter, ColumnReader columnReader) {
int definitionLevel = columnReader.getCurrentDefinitionLevel();
int repetitionLevel = columnReader.getCurrentRepetitionLevel();
ColumnDescriptor column = columnReader.getDescriptor();
@@ -767,6 +772,10 @@ public class ParquetFileWriter {
}
}
columnReader.consume();
+ if (repetitionLevel == 0) {
+ // V1 pages also respect record boundaries so we have to mark them
+ columnWriteStore.endRecord();
+ }
}
/**