You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/02/27 20:41:37 UTC
spark git commit: [SPARK-13530][SQL] Add ShortType support to
UnsafeRowParquetRecordReader
Repository: spark
Updated Branches:
refs/heads/master ec0cc75e1 -> 3814d0bcf
[SPARK-13530][SQL] Add ShortType support to UnsafeRowParquetRecordReader
JIRA: https://issues.apache.org/jira/browse/SPARK-13530
## What changes were proposed in this pull request?
By enabling vectorized parquet scanner by default, the unit test `ParquetHadoopFsRelationSuite` based on `HadoopFsRelationTest` will be failed due to the lack of short type support in `UnsafeRowParquetRecordReader`. We should fix it.
The error exception:
[info] ParquetHadoopFsRelationSuite:
[info] - test all data types - StringType (499 milliseconds)
[info] - test all data types - BinaryType (447 milliseconds)
[info] - test all data types - BooleanType (520 milliseconds)
[info] - test all data types - ByteType (418 milliseconds)
00:22:58.920 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 124.0 (TID 1949)
org.apache.commons.lang.NotImplementedException: Unimplemented type: ShortType
at org.apache.spark.sql.execution.datasources.parquet.UnsafeRowParquetRecordReader$ColumnReader.readIntBatch(UnsafeRowParquetRecordReader.java:769)
at org.apache.spark.sql.execution.datasources.parquet.UnsafeRowParquetRecordReader$ColumnReader.readBatch(UnsafeRowParquetRecordReader.java:640)
at org.apache.spark.sql.execution.datasources.parquet.UnsafeRowParquetRecordReader$ColumnReader.access$000(UnsafeRowParquetRecordReader.java:461)
at org.apache.spark.sql.execution.datasources.parquet.UnsafeRowParquetRecordReader.nextBatch(UnsafeRowParquetRecordReader.java:224)
## How was this patch tested?
The unit test `ParquetHadoopFsRelationSuite` based on `HadoopFsRelationTest` will be [failed](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/52110/consoleFull) due to the lack of short type support in UnsafeRowParquetRecordReader. By adding this support, the test can be passed.
Author: Liang-Chi Hsieh <vi...@gmail.com>
Closes #11412 from viirya/add-shorttype-support.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3814d0bc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3814d0bc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3814d0bc
Branch: refs/heads/master
Commit: 3814d0bcf6f1697a94123be4b224cbd7554025a9
Parents: ec0cc75
Author: Liang-Chi Hsieh <vi...@gmail.com>
Authored: Sat Feb 27 11:41:35 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Sat Feb 27 11:41:35 2016 -0800
----------------------------------------------------------------------
.../parquet/UnsafeRowParquetRecordReader.java | 3 ++
.../parquet/VectorizedRleValuesReader.java | 34 +++++++++++++++++++-
2 files changed, 36 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/3814d0bc/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
index 9d50cfa..e7f0ec2 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
@@ -765,6 +765,9 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
} else if (DecimalType.is64BitDecimalType(column.dataType())) {
defColumn.readIntsAsLongs(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
+ } else if (column.dataType() == DataTypes.ShortType) {
+ defColumn.readShorts(
+ num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
} else {
throw new NotImplementedException("Unimplemented type: " + column.dataType());
}
http://git-wip-us.apache.org/repos/asf/spark/blob/3814d0bc/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
index b2048c0..8613fca 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
@@ -301,6 +301,38 @@ public final class VectorizedRleValuesReader extends ValuesReader
}
}
+ public void readShorts(int total, ColumnVector c,
+ int rowId, int level, VectorizedValuesReader data) {
+ int left = total;
+ while (left > 0) {
+ if (this.currentCount == 0) this.readNextGroup();
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ if (currentValue == level) {
+ for (int i = 0; i < n; i++) {
+ c.putShort(rowId + i, (short)data.readInteger());
+ }
+ } else {
+ c.putNulls(rowId, n);
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < n; ++i) {
+ if (currentBuffer[currentBufferIdx++] == level) {
+ c.putShort(rowId + i, (short)data.readInteger());
+ } else {
+ c.putNull(rowId + i);
+ }
+ }
+ break;
+ }
+ rowId += n;
+ left -= n;
+ currentCount -= n;
+ }
+ }
+
public void readLongs(int total, ColumnVector c, int rowId, int level,
VectorizedValuesReader data) {
int left = total;
@@ -611,4 +643,4 @@ public final class VectorizedRleValuesReader extends ValuesReader
throw new ParquetDecodingException("not a valid mode " + this.mode);
}
}
-}
\ No newline at end of file
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org