You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/08/03 05:40:00 UTC

[jira] [Commented] (PARQUET-1364) Column Indexes: Invalid row indexes for pages starting with nulls

    [ https://issues.apache.org/jira/browse/PARQUET-1364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16567804#comment-16567804 ] 

ASF GitHub Bot commented on PARQUET-1364:
-----------------------------------------

gszadovszky closed pull request #507: PARQUET-1364: Invalid row indexes for pages starting with nulls
URL: https://github.com/apache/parquet-mr/pull/507
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
index d04192fbf..5cd7d876e 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
@@ -149,7 +149,7 @@ public void flush() {
     for (ColumnWriterBase memColumn : columns.values()) {
       long rows = rowCount - memColumn.getRowsWrittenSoFar();
       if (rows > 0) {
-        memColumn.writePage(rowCount);
+        memColumn.writePage();
       }
       memColumn.finalizeColumnChunk();
     }
@@ -195,7 +195,7 @@ private void sizeCheck() {
       long rows = rowCount - writer.getRowsWrittenSoFar();
       long remainingMem = props.getPageSizeThreshold() - usedMem;
       if (remainingMem <= thresholdTolerance) {
-        writer.writePage(rowCount);
+        writer.writePage();
         remainingMem = props.getPageSizeThreshold();
       }
       long rowsToFillPage =
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
index 16085bb80..3788c82e4 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
@@ -20,7 +20,6 @@
 
 import java.io.IOException;
 
-import org.apache.parquet.Ints;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.ColumnWriter;
 import org.apache.parquet.column.ParquetProperties;
@@ -52,6 +51,7 @@
 
   private Statistics<?> statistics;
   private long rowsWrittenSoFar = 0;
+  private int pageRowCount;
 
   ColumnWriterBase(
       ColumnDescriptor path,
@@ -84,6 +84,10 @@ private void definitionLevel(int definitionLevel) {
 
   private void repetitionLevel(int repetitionLevel) {
     repetitionLevelColumn.writeInteger(repetitionLevel);
+    assert pageRowCount == 0 ? repetitionLevel == 0 : true : "Every page shall start on record boundaries";
+    if (repetitionLevel == 0) {
+      ++pageRowCount;
+    }
   }
 
   /**
@@ -299,13 +303,9 @@ long getRowsWrittenSoFar() {
 
   /**
    * Writes the current data to a new page in the page store
-   *
-   * @param rowCount
-   *          how many rows have been written so far
    */
-  void writePage(long rowCount) {
-    int pageRowCount = Ints.checkedCast(rowCount - rowsWrittenSoFar);
-    this.rowsWrittenSoFar = rowCount;
+  void writePage() {
+    this.rowsWrittenSoFar += pageRowCount;
     if (DEBUG)
       LOG.debug("write page");
     try {
@@ -318,6 +318,7 @@ void writePage(long rowCount) {
     dataColumn.reset();
     valueCount = 0;
     resetStatistics();
+    pageRowCount = 0;
   }
 
   abstract void writePage(int rowCount, int valueCount, Statistics<?> statistics, ValuesWriter repetitionLevels,
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java b/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java
index d2d78c43d..35fddaf0b 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java
@@ -65,10 +65,10 @@ public void test() throws Exception {
     for (int i = 0; i < rows; i++) {
       columnWriterV2.write(Binary.fromString("bar" + i % 10), 0, 0);
       if ((i + 1) % 1000 == 0) {
-        columnWriterV2.writePage(i);
+        columnWriterV2.writePage();
       }
     }
-    columnWriterV2.writePage(rows);
+    columnWriterV2.writePage();
     columnWriterV2.finalizeColumnChunk();
     List<DataPage> pages = pageWriter.getPages();
     int valueCount = 0;
@@ -103,10 +103,10 @@ public void testOptional() throws Exception {
     for (int i = 0; i < rows; i++) {
       columnWriterV2.writeNull(0, 0);
       if ((i + 1) % 1000 == 0) {
-        columnWriterV2.writePage(i);
+        columnWriterV2.writePage();
       }
     }
-    columnWriterV2.writePage(rows);
+    columnWriterV2.writePage();
     columnWriterV2.finalizeColumnChunk();
     List<DataPage> pages = pageWriter.getPages();
     int valueCount = 0;
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
index c28649eef..e5db38c94 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
@@ -138,13 +138,16 @@ public void testMemColumnSeveralPagesRepeated() throws Exception {
       int r = rs[i % rs.length];
       int d = ds[i % ds.length];
       LOG.debug("write i: {}", i);
+      if (i != 0 && r == 0) {
+        memColumnsStore.endRecord();
+      }
       if (d == 2) {
         columnWriter.write((long)i, r, d);
       } else {
         columnWriter.writeNull(r, d);
       }
-      memColumnsStore.endRecord();
     }
+    memColumnsStore.endRecord();
     memColumnsStore.flush();
 
     ColumnReader columnReader = getColumnReader(memPageStore, path, mt);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Column Indexes: Invalid row indexes for pages starting with nulls
> -----------------------------------------------------------------
>
>                 Key: PARQUET-1364
>                 URL: https://issues.apache.org/jira/browse/PARQUET-1364
>             Project: Parquet
>          Issue Type: Sub-task
>            Reporter: Gabor Szadovszky
>            Assignee: Gabor Szadovszky
>            Priority: Major
>              Labels: pull-request-available
>
> The current implementation for writing managing row indexes for the pages is not reliable. There is a logic [MessageColumnIO|https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java#L153] which caches null values and flush them just *before* opening a new group. This logic might cause starting pages with these cached nulls which are not correctly counted in the written rows so the rowIndexes are incorrect. It does not cause any issues if all the pages are read continuously put it is a huge problem for column index based filtering.
> The implementation described above is really complicated and would not like to redesign because of the mentioned issue. It is easier to simply count the {{0}} repetition levels as record boundaries at the column writer level.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)