You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2018/08/03 05:39:31 UTC

[parquet-mr] branch column-indexes updated: PARQUET-1364: Invalid row indexes for pages starting with nulls (#507)

This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch column-indexes
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/column-indexes by this push:
     new 43ac3e1  PARQUET-1364: Invalid row indexes for pages starting with nulls (#507)
43ac3e1 is described below

commit 43ac3e18aeb10a421657480da9d9bd2a862fdd3c
Author: Gabor Szadovszky <ga...@apache.org>
AuthorDate: Fri Aug 3 07:39:28 2018 +0200

    PARQUET-1364: Invalid row indexes for pages starting with nulls (#507)
---
 .../apache/parquet/column/impl/ColumnWriteStoreBase.java  |  4 ++--
 .../org/apache/parquet/column/impl/ColumnWriterBase.java  | 15 ++++++++-------
 .../apache/parquet/column/impl/TestColumnReaderImpl.java  |  8 ++++----
 .../java/org/apache/parquet/column/mem/TestMemColumn.java |  5 ++++-
 4 files changed, 18 insertions(+), 14 deletions(-)

diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
index d04192f..5cd7d87 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
@@ -149,7 +149,7 @@ abstract class ColumnWriteStoreBase implements ColumnWriteStore {
     for (ColumnWriterBase memColumn : columns.values()) {
       long rows = rowCount - memColumn.getRowsWrittenSoFar();
       if (rows > 0) {
-        memColumn.writePage(rowCount);
+        memColumn.writePage();
       }
       memColumn.finalizeColumnChunk();
     }
@@ -195,7 +195,7 @@ abstract class ColumnWriteStoreBase implements ColumnWriteStore {
       long rows = rowCount - writer.getRowsWrittenSoFar();
       long remainingMem = props.getPageSizeThreshold() - usedMem;
       if (remainingMem <= thresholdTolerance) {
-        writer.writePage(rowCount);
+        writer.writePage();
         remainingMem = props.getPageSizeThreshold();
       }
       long rowsToFillPage =
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
index 16085bb..3788c82 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
@@ -20,7 +20,6 @@ package org.apache.parquet.column.impl;
 
 import java.io.IOException;
 
-import org.apache.parquet.Ints;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.ColumnWriter;
 import org.apache.parquet.column.ParquetProperties;
@@ -52,6 +51,7 @@ abstract class ColumnWriterBase implements ColumnWriter {
 
   private Statistics<?> statistics;
   private long rowsWrittenSoFar = 0;
+  private int pageRowCount;
 
   ColumnWriterBase(
       ColumnDescriptor path,
@@ -84,6 +84,10 @@ abstract class ColumnWriterBase implements ColumnWriter {
 
   private void repetitionLevel(int repetitionLevel) {
     repetitionLevelColumn.writeInteger(repetitionLevel);
+    assert pageRowCount == 0 ? repetitionLevel == 0 : true : "Every page shall start on record boundaries";
+    if (repetitionLevel == 0) {
+      ++pageRowCount;
+    }
   }
 
   /**
@@ -299,13 +303,9 @@ abstract class ColumnWriterBase implements ColumnWriter {
 
   /**
    * Writes the current data to a new page in the page store
-   *
-   * @param rowCount
-   *          how many rows have been written so far
    */
-  void writePage(long rowCount) {
-    int pageRowCount = Ints.checkedCast(rowCount - rowsWrittenSoFar);
-    this.rowsWrittenSoFar = rowCount;
+  void writePage() {
+    this.rowsWrittenSoFar += pageRowCount;
     if (DEBUG)
       LOG.debug("write page");
     try {
@@ -318,6 +318,7 @@ abstract class ColumnWriterBase implements ColumnWriter {
     dataColumn.reset();
     valueCount = 0;
     resetStatistics();
+    pageRowCount = 0;
   }
 
   abstract void writePage(int rowCount, int valueCount, Statistics<?> statistics, ValuesWriter repetitionLevels,
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java b/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java
index d2d78c4..35fddaf 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java
@@ -65,10 +65,10 @@ public class TestColumnReaderImpl {
     for (int i = 0; i < rows; i++) {
       columnWriterV2.write(Binary.fromString("bar" + i % 10), 0, 0);
       if ((i + 1) % 1000 == 0) {
-        columnWriterV2.writePage(i);
+        columnWriterV2.writePage();
       }
     }
-    columnWriterV2.writePage(rows);
+    columnWriterV2.writePage();
     columnWriterV2.finalizeColumnChunk();
     List<DataPage> pages = pageWriter.getPages();
     int valueCount = 0;
@@ -103,10 +103,10 @@ public class TestColumnReaderImpl {
     for (int i = 0; i < rows; i++) {
       columnWriterV2.writeNull(0, 0);
       if ((i + 1) % 1000 == 0) {
-        columnWriterV2.writePage(i);
+        columnWriterV2.writePage();
       }
     }
-    columnWriterV2.writePage(rows);
+    columnWriterV2.writePage();
     columnWriterV2.finalizeColumnChunk();
     List<DataPage> pages = pageWriter.getPages();
     int valueCount = 0;
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
index c28649e..e5db38c 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
@@ -138,13 +138,16 @@ public class TestMemColumn {
       int r = rs[i % rs.length];
       int d = ds[i % ds.length];
       LOG.debug("write i: {}", i);
+      if (i != 0 && r == 0) {
+        memColumnsStore.endRecord();
+      }
       if (d == 2) {
         columnWriter.write((long)i, r, d);
       } else {
         columnWriter.writeNull(r, d);
       }
-      memColumnsStore.endRecord();
     }
+    memColumnsStore.endRecord();
     memColumnsStore.flush();
 
     ColumnReader columnReader = getColumnReader(memPageStore, path, mt);