You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/07/12 16:05:31 UTC
[spark] branch branch-3.2 updated: [SPARK-36056][SQL] Combine
readBatch and readIntegers in VectorizedRleValuesReader
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 5b2f191 [SPARK-36056][SQL] Combine readBatch and readIntegers in VectorizedRleValuesReader
5b2f191 is described below
commit 5b2f1912280e7a5afb92a96b894a7bc5f263aa6e
Author: Chao Sun <su...@apple.com>
AuthorDate: Mon Jul 12 22:30:21 2021 +0800
[SPARK-36056][SQL] Combine readBatch and readIntegers in VectorizedRleValuesReader
### What changes were proposed in this pull request?
Combine `readBatch` and `readIntegers` in `VectorizedRleValuesReader` by having them share the same `readBatchInternal` method.
### Why are the changes needed?
`readBatch` and `readIntegers` share similar code path and this Jira aims to combine them into one method for easier maintenance.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing tests as this is just a refactoring.
Closes #33271 from sunchao/SPARK-35743-read-integers.
Authored-by: Chao Sun <su...@apple.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit 5edbbd1711402735623fa1fc9b86ff41c28996e9)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../parquet/ParquetVectorUpdaterFactory.java | 2 +-
.../parquet/VectorizedRleValuesReader.java | 90 +++++-----------------
2 files changed, 20 insertions(+), 72 deletions(-)
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
index 2282dc7..39de909 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
@@ -216,7 +216,7 @@ public class ParquetVectorUpdaterFactory {
}
}
- private static class IntegerUpdater implements ParquetVectorUpdater {
+ static class IntegerUpdater implements ParquetVectorUpdater {
@Override
public void readValues(
int total,
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
index 03bda0f..9d88039 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
@@ -167,70 +167,8 @@ public final class VectorizedRleValuesReader extends ValuesReader
ParquetReadState state,
WritableColumnVector values,
VectorizedValuesReader valueReader,
- ParquetVectorUpdater updater) throws IOException {
- int offset = state.offset;
- long rowId = state.rowId;
- int leftInBatch = state.valuesToReadInBatch;
- int leftInPage = state.valuesToReadInPage;
-
- while (leftInBatch > 0 && leftInPage > 0) {
- if (this.currentCount == 0) this.readNextGroup();
- int n = Math.min(leftInBatch, Math.min(leftInPage, this.currentCount));
-
- long rangeStart = state.currentRangeStart();
- long rangeEnd = state.currentRangeEnd();
-
- if (rowId + n < rangeStart) {
- updater.skipValues(n, valueReader);
- advance(n);
- rowId += n;
- leftInPage -= n;
- } else if (rowId > rangeEnd) {
- state.nextRange();
- } else {
- // the range [rowId, rowId + n) overlaps with the current row range in state
- long start = Math.max(rangeStart, rowId);
- long end = Math.min(rangeEnd, rowId + n - 1);
-
- // skip the part [rowId, start)
- int toSkip = (int) (start - rowId);
- if (toSkip > 0) {
- updater.skipValues(toSkip, valueReader);
- advance(toSkip);
- rowId += toSkip;
- leftInPage -= toSkip;
- }
-
- // read the part [start, end]
- n = (int) (end - start + 1);
-
- switch (mode) {
- case RLE:
- if (currentValue == state.maxDefinitionLevel) {
- updater.readValues(n, offset, values, valueReader);
- } else {
- values.putNulls(offset, n);
- }
- break;
- case PACKED:
- for (int i = 0; i < n; ++i) {
- if (currentBuffer[currentBufferIdx++] == state.maxDefinitionLevel) {
- updater.readValue(offset + i, values, valueReader);
- } else {
- values.putNull(offset + i);
- }
- }
- break;
- }
- offset += n;
- leftInBatch -= n;
- rowId += n;
- leftInPage -= n;
- currentCount -= n;
- }
- }
-
- state.advanceOffsetAndRowId(offset, rowId);
+ ParquetVectorUpdater updater) {
+ readBatchInternal(state, values, values, valueReader, updater);
}
/**
@@ -241,7 +179,17 @@ public final class VectorizedRleValuesReader extends ValuesReader
ParquetReadState state,
WritableColumnVector values,
WritableColumnVector nulls,
- VectorizedValuesReader data) throws IOException {
+ VectorizedValuesReader data) {
+ readBatchInternal(state, values, nulls, data, new ParquetVectorUpdaterFactory.IntegerUpdater());
+ }
+
+ private void readBatchInternal(
+ ParquetReadState state,
+ WritableColumnVector values,
+ WritableColumnVector nulls,
+ VectorizedValuesReader valueReader,
+ ParquetVectorUpdater updater) {
+
int offset = state.offset;
long rowId = state.rowId;
int leftInBatch = state.valuesToReadInBatch;
@@ -255,7 +203,7 @@ public final class VectorizedRleValuesReader extends ValuesReader
long rangeEnd = state.currentRangeEnd();
if (rowId + n < rangeStart) {
- data.skipIntegers(n);
+ updater.skipValues(n, valueReader);
advance(n);
rowId += n;
leftInPage -= n;
@@ -269,7 +217,7 @@ public final class VectorizedRleValuesReader extends ValuesReader
// skip the part [rowId, start)
int toSkip = (int) (start - rowId);
if (toSkip > 0) {
- data.skipIntegers(toSkip);
+ updater.skipValues(toSkip, valueReader);
advance(toSkip);
rowId += toSkip;
leftInPage -= toSkip;
@@ -281,7 +229,7 @@ public final class VectorizedRleValuesReader extends ValuesReader
switch (mode) {
case RLE:
if (currentValue == state.maxDefinitionLevel) {
- data.readIntegers(n, values, offset);
+ updater.readValues(n, offset, values, valueReader);
} else {
nulls.putNulls(offset, n);
}
@@ -289,17 +237,17 @@ public final class VectorizedRleValuesReader extends ValuesReader
case PACKED:
for (int i = 0; i < n; ++i) {
if (currentBuffer[currentBufferIdx++] == state.maxDefinitionLevel) {
- values.putInt(offset + i, data.readInteger());
+ updater.readValue(offset + i, values, valueReader);
} else {
nulls.putNull(offset + i);
}
}
break;
}
- rowId += n;
- leftInPage -= n;
offset += n;
leftInBatch -= n;
+ rowId += n;
+ leftInPage -= n;
currentCount -= n;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org