You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2018/09/28 07:52:20 UTC

[parquet-mr] 02/02: PARQUET-1381: Fix missing endRecord after merging columnIndex

This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch column-indexes
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git

commit c215f1ff042cfb66b492749aed10999dd7bf1aa7
Author: Gabor Szadovszky <ga...@apache.org>
AuthorDate: Mon Sep 24 17:08:24 2018 +0200

    PARQUET-1381: Fix missing endRecord after merging columnIndex
---
 .../java/org/apache/parquet/hadoop/ParquetFileWriter.java   | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)

diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 7dd6e80..a8cd686 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -51,6 +51,7 @@ import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.bytes.HeapByteBufferAllocator;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.ColumnReader;
+import org.apache.parquet.column.ColumnWriteStore;
 import org.apache.parquet.column.ColumnWriter;
 import org.apache.parquet.column.Encoding;
 import org.apache.parquet.column.EncodingStats;
@@ -687,7 +688,7 @@ public class ParquetFileWriter {
               if (columnChunkPageReader.isPresent()) {
                 ColumnReader columnReader = columnReadStore.newMemColumnReader(path, columnChunkPageReader.get());
                 for (int i = 0; i < columnReader.getTotalValueCount(); i++) {
-                  consumeTriplet(columnWriter, columnReader);
+                  consumeTriplet(columnWriteStoreV1, columnWriter, columnReader);
                 }
               } else {
                 MessageType inputFileSchema = parquetFileReader.getFileMetaData().getSchema();
@@ -696,6 +697,10 @@ public class ParquetFileWriter {
                 int rep = parquetFileReader.getFileMetaData().getSchema().getMaxRepetitionLevel(parentPath);
                 for (int i = 0; i < parquetFileReader.getBlockMetaData(smallBlock.getBlockIndex()).getRowCount(); i++) {
                   columnWriter.writeNull(rep, def);
+                  if (def == 0) {
+                    // V1 pages also respect record boundaries so we have to mark them
+                    columnWriteStoreV1.endRecord();
+                  }
                 }
               }
             } catch (Exception e) {
@@ -733,7 +738,7 @@ public class ParquetFileWriter {
     return readers;
   }
 
-  private void consumeTriplet(ColumnWriter columnWriter, ColumnReader columnReader) {
+  private void consumeTriplet(ColumnWriteStore columnWriteStore, ColumnWriter columnWriter, ColumnReader columnReader) {
     int definitionLevel = columnReader.getCurrentDefinitionLevel();
     int repetitionLevel = columnReader.getCurrentRepetitionLevel();
     ColumnDescriptor column = columnReader.getDescriptor();
@@ -767,6 +772,10 @@ public class ParquetFileWriter {
       }
     }
     columnReader.consume();
+    if (repetitionLevel == 0) {
+      // V1 pages also respect record boundaries so we have to mark them
+      columnWriteStore.endRecord();
+    }
   }
 
   /**