You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/07/12 16:05:31 UTC

[spark] branch branch-3.2 updated: [SPARK-36056][SQL] Combine readBatch and readIntegers in VectorizedRleValuesReader

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 5b2f191  [SPARK-36056][SQL] Combine readBatch and readIntegers in VectorizedRleValuesReader
5b2f191 is described below

commit 5b2f1912280e7a5afb92a96b894a7bc5f263aa6e
Author: Chao Sun <su...@apple.com>
AuthorDate: Mon Jul 12 22:30:21 2021 +0800

    [SPARK-36056][SQL] Combine readBatch and readIntegers in VectorizedRleValuesReader
    
    ### What changes were proposed in this pull request?
    
    Combine `readBatch` and `readIntegers` in `VectorizedRleValuesReader` by having them share the same `readBatchInternal` method.
    
    ### Why are the changes needed?
    
    `readBatch` and `readIntegers` share similar code path and this Jira aims to combine them into one method for easier maintenance.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing tests as this is just a refactoring.
    
    Closes #33271 from sunchao/SPARK-35743-read-integers.
    
    Authored-by: Chao Sun <su...@apple.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 5edbbd1711402735623fa1fc9b86ff41c28996e9)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../parquet/ParquetVectorUpdaterFactory.java       |  2 +-
 .../parquet/VectorizedRleValuesReader.java         | 90 +++++-----------------
 2 files changed, 20 insertions(+), 72 deletions(-)

diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
index 2282dc7..39de909 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
@@ -216,7 +216,7 @@ public class ParquetVectorUpdaterFactory {
     }
   }
 
-  private static class IntegerUpdater implements ParquetVectorUpdater {
+  static class IntegerUpdater implements ParquetVectorUpdater {
     @Override
     public void readValues(
         int total,
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
index 03bda0f..9d88039 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
@@ -167,70 +167,8 @@ public final class VectorizedRleValuesReader extends ValuesReader
       ParquetReadState state,
       WritableColumnVector values,
       VectorizedValuesReader valueReader,
-      ParquetVectorUpdater updater) throws IOException {
-    int offset = state.offset;
-    long rowId = state.rowId;
-    int leftInBatch = state.valuesToReadInBatch;
-    int leftInPage = state.valuesToReadInPage;
-
-    while (leftInBatch > 0 && leftInPage > 0) {
-      if (this.currentCount == 0) this.readNextGroup();
-      int n = Math.min(leftInBatch, Math.min(leftInPage, this.currentCount));
-
-      long rangeStart = state.currentRangeStart();
-      long rangeEnd = state.currentRangeEnd();
-
-      if (rowId + n < rangeStart) {
-        updater.skipValues(n, valueReader);
-        advance(n);
-        rowId += n;
-        leftInPage -= n;
-      } else if (rowId > rangeEnd) {
-        state.nextRange();
-      } else {
-        // the range [rowId, rowId + n) overlaps with the current row range in state
-        long start = Math.max(rangeStart, rowId);
-        long end = Math.min(rangeEnd, rowId + n - 1);
-
-        // skip the part [rowId, start)
-        int toSkip = (int) (start - rowId);
-        if (toSkip > 0) {
-          updater.skipValues(toSkip, valueReader);
-          advance(toSkip);
-          rowId += toSkip;
-          leftInPage -= toSkip;
-        }
-
-        // read the part [start, end]
-        n = (int) (end - start + 1);
-
-        switch (mode) {
-          case RLE:
-            if (currentValue == state.maxDefinitionLevel) {
-              updater.readValues(n, offset, values, valueReader);
-            } else {
-              values.putNulls(offset, n);
-            }
-            break;
-          case PACKED:
-            for (int i = 0; i < n; ++i) {
-              if (currentBuffer[currentBufferIdx++] == state.maxDefinitionLevel) {
-                updater.readValue(offset + i, values, valueReader);
-              } else {
-                values.putNull(offset + i);
-              }
-            }
-            break;
-        }
-        offset += n;
-        leftInBatch -= n;
-        rowId += n;
-        leftInPage -= n;
-        currentCount -= n;
-      }
-    }
-
-    state.advanceOffsetAndRowId(offset, rowId);
+      ParquetVectorUpdater updater) {
+    readBatchInternal(state, values, values, valueReader, updater);
   }
 
   /**
@@ -241,7 +179,17 @@ public final class VectorizedRleValuesReader extends ValuesReader
       ParquetReadState state,
       WritableColumnVector values,
       WritableColumnVector nulls,
-      VectorizedValuesReader data) throws IOException {
+      VectorizedValuesReader data) {
+    readBatchInternal(state, values, nulls, data, new ParquetVectorUpdaterFactory.IntegerUpdater());
+  }
+
+  private void readBatchInternal(
+      ParquetReadState state,
+      WritableColumnVector values,
+      WritableColumnVector nulls,
+      VectorizedValuesReader valueReader,
+      ParquetVectorUpdater updater) {
+
     int offset = state.offset;
     long rowId = state.rowId;
     int leftInBatch = state.valuesToReadInBatch;
@@ -255,7 +203,7 @@ public final class VectorizedRleValuesReader extends ValuesReader
       long rangeEnd = state.currentRangeEnd();
 
       if (rowId + n < rangeStart) {
-        data.skipIntegers(n);
+        updater.skipValues(n, valueReader);
         advance(n);
         rowId += n;
         leftInPage -= n;
@@ -269,7 +217,7 @@ public final class VectorizedRleValuesReader extends ValuesReader
         // skip the part [rowId, start)
         int toSkip = (int) (start - rowId);
         if (toSkip > 0) {
-          data.skipIntegers(toSkip);
+          updater.skipValues(toSkip, valueReader);
           advance(toSkip);
           rowId += toSkip;
           leftInPage -= toSkip;
@@ -281,7 +229,7 @@ public final class VectorizedRleValuesReader extends ValuesReader
         switch (mode) {
           case RLE:
             if (currentValue == state.maxDefinitionLevel) {
-              data.readIntegers(n, values, offset);
+              updater.readValues(n, offset, values, valueReader);
             } else {
               nulls.putNulls(offset, n);
             }
@@ -289,17 +237,17 @@ public final class VectorizedRleValuesReader extends ValuesReader
           case PACKED:
             for (int i = 0; i < n; ++i) {
               if (currentBuffer[currentBufferIdx++] == state.maxDefinitionLevel) {
-                values.putInt(offset + i, data.readInteger());
+                updater.readValue(offset + i, values, valueReader);
               } else {
                 nulls.putNull(offset + i);
               }
             }
             break;
         }
-        rowId += n;
-        leftInPage -= n;
         offset += n;
         leftInBatch -= n;
+        rowId += n;
+        leftInPage -= n;
         currentCount -= n;
       }
     }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org