You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/07/14 10:15:21 UTC

[spark] branch branch-3.2 updated: [SPARK-36123][SQL] Parquet vectorized reader doesn't skip null values correctly

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 4cc7d9b  [SPARK-36123][SQL] Parquet vectorized reader doesn't skip null values correctly
4cc7d9b is described below

commit 4cc7d9b8f1bb8fd6041ff906f3c454b3699ee534
Author: Chao Sun <su...@apple.com>
AuthorDate: Wed Jul 14 18:14:17 2021 +0800

    [SPARK-36123][SQL] Parquet vectorized reader doesn't skip null values correctly
    
    ### What changes were proposed in this pull request?
    
    Fix the skipping values logic in Parquet vectorized reader when column index is effective, by considering nulls and only call `ParquetVectorUpdater.skipValues` when the values are non-null.
    
    ### Why are the changes needed?
    
    Currently, the Parquet vectorized reader may not work correctly if column index filtering is effective, and the data page contains null values. For instance, let's say we have two columns `c1: BIGINT` and `c2: STRING`, and the following pages:
    ```
       * c1        500       500       500       500
       *  |---------|---------|---------|---------|
       *  |-------|-----|-----|---|---|---|---|---|
       * c2     400   300   300 200 200 200 200 200
    ```
    
    and suppose we have a query like the following:
    ```sql
    SELECT * FROM t WHERE c1 = 500
    ```
    
    this will create a Parquet row range `[500, 1000)` which, when applied to `c2`, will require us to skip all the rows in `[400,500)`. However the current logic for skipping rows is via `updater.skipValues(n, valueReader)` which is incorrect since this skips the next `n` non-null values. In the case when nulls are present, this will not work correctly.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Added a new test in `ParquetColumnIndexSuite`.
    
    Closes #33330 from sunchao/SPARK-36123-skip-nulls.
    
    Authored-by: Chao Sun <su...@apple.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit e980c7a8404ab290998c2dec0e2e2437d675d67c)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../parquet/VectorizedRleValuesReader.java         | 62 +++++++++++++++-------
 .../parquet/ParquetColumnIndexSuite.scala          | 12 ++++-
 2 files changed, 54 insertions(+), 20 deletions(-)

diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
index 9d88039..af739a5 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
@@ -203,8 +203,7 @@ public final class VectorizedRleValuesReader extends ValuesReader
       long rangeEnd = state.currentRangeEnd();
 
       if (rowId + n < rangeStart) {
-        updater.skipValues(n, valueReader);
-        advance(n);
+        skipValues(n, state, valueReader, updater);
         rowId += n;
         leftInPage -= n;
       } else if (rowId > rangeEnd) {
@@ -217,8 +216,7 @@ public final class VectorizedRleValuesReader extends ValuesReader
         // skip the part [rowId, start)
         int toSkip = (int) (start - rowId);
         if (toSkip > 0) {
-          updater.skipValues(toSkip, valueReader);
-          advance(toSkip);
+          skipValues(toSkip, state, valueReader, updater);
           rowId += toSkip;
           leftInPage -= toSkip;
         }
@@ -255,6 +253,39 @@ public final class VectorizedRleValuesReader extends ValuesReader
     state.advanceOffsetAndRowId(offset, rowId);
   }
 
+  /**
+   * Skip the next `n` values (either null or non-null) from this definition level reader and
+   * `valueReader`.
+   */
+  private void skipValues(
+      int n,
+      ParquetReadState state,
+      VectorizedValuesReader valuesReader,
+      ParquetVectorUpdater updater) {
+    while (n > 0) {
+      if (this.currentCount == 0) this.readNextGroup();
+      int num = Math.min(n, this.currentCount);
+      switch (mode) {
+        case RLE:
+          // we only need to skip non-null values from `valuesReader` since nulls are represented
+          // via definition levels which are skipped here via decrementing `currentCount`.
+          if (currentValue == state.maxDefinitionLevel) {
+            updater.skipValues(num, valuesReader);
+          }
+          break;
+        case PACKED:
+          for (int i = 0; i < num; ++i) {
+            // same as above, only skip non-null values from `valuesReader`
+            if (currentBuffer[currentBufferIdx++] == state.maxDefinitionLevel) {
+              updater.skipValues(1, valuesReader);
+            }
+          }
+          break;
+      }
+      currentCount -= num;
+      n -= num;
+    }
+  }
 
   // The RLE reader implements the vectorized decoding interface when used to decode dictionary
   // IDs. This is different than the above APIs that decodes definitions levels along with values.
@@ -358,7 +389,14 @@ public final class VectorizedRleValuesReader extends ValuesReader
     while (left > 0) {
       if (this.currentCount == 0) this.readNextGroup();
       int n = Math.min(left, this.currentCount);
-      advance(n);
+      switch (mode) {
+        case RLE:
+          break;
+        case PACKED:
+          currentBufferIdx += n;
+          break;
+      }
+      currentCount -= n;
       left -= n;
     }
   }
@@ -404,20 +442,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
   }
 
   /**
-   * Advance and skip the next `n` values in the current block. `n` MUST be <= `currentCount`.
-   */
-  private void advance(int n) {
-    switch (mode) {
-      case RLE:
-        break;
-      case PACKED:
-        currentBufferIdx += n;
-        break;
-    }
-    currentCount -= n;
-  }
-
-  /**
    * Reads the next varint encoded int.
    */
   private int readUnsignedVarInt() throws IOException {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnIndexSuite.scala
index 8a217fe..5f1c5b5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnIndexSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnIndexSuite.scala
@@ -38,7 +38,7 @@ class ParquetColumnIndexSuite extends QueryTest with ParquetTest with SharedSpar
    * col_1     500       500       500       500
    *  |---------|---------|---------|---------|
    *  |-------|-----|-----|---|---|---|---|---|
-   * col_2   400   300   200 200 200 200 200 200
+   * col_2   400   300   300 200 200 200 200 200
    */
   def checkUnalignedPages(df: DataFrame)(actions: (DataFrame => DataFrame)*): Unit = {
     Seq(true, false).foreach { enableDictionary =>
@@ -92,4 +92,14 @@ class ParquetColumnIndexSuite extends QueryTest with ParquetTest with SharedSpar
     )
     checkUnalignedPages(df)(actions: _*)
   }
+
+  test("SPARK-36123: reading from unaligned pages - test filters with nulls") {
+    // insert 50 null values in [400, 450) to verify that they are skipped during processing row
+    // range [500, 1000) against the second page of col_2 [400, 800)
+    var df = spark.range(0, 2000).map { i =>
+      val strVal = if (i >= 400 && i < 450) null else i + ":" + "o" * (i / 100).toInt
+      (i, strVal)
+    }.toDF()
+    checkUnalignedPages(df)(actions: _*)
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org