You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/02/27 20:41:37 UTC

spark git commit: [SPARK-13530][SQL] Add ShortType support to UnsafeRowParquetRecordReader

Repository: spark
Updated Branches:
  refs/heads/master ec0cc75e1 -> 3814d0bcf


[SPARK-13530][SQL] Add ShortType support to UnsafeRowParquetRecordReader

JIRA: https://issues.apache.org/jira/browse/SPARK-13530

## What changes were proposed in this pull request?

By enabling vectorized parquet scanner by default, the unit test `ParquetHadoopFsRelationSuite` based on `HadoopFsRelationTest` will be failed due to the lack of short type support in `UnsafeRowParquetRecordReader`. We should fix it.

The error exception:

    [info] ParquetHadoopFsRelationSuite:
    [info] - test all data types - StringType (499 milliseconds)
    [info] - test all data types - BinaryType (447 milliseconds)
    [info] - test all data types - BooleanType (520 milliseconds)
    [info] - test all data types - ByteType (418 milliseconds)
    00:22:58.920 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 124.0 (TID 1949)
    org.apache.commons.lang.NotImplementedException: Unimplemented type: ShortType
	at org.apache.spark.sql.execution.datasources.parquet.UnsafeRowParquetRecordReader$ColumnReader.readIntBatch(UnsafeRowParquetRecordReader.java:769)
	at org.apache.spark.sql.execution.datasources.parquet.UnsafeRowParquetRecordReader$ColumnReader.readBatch(UnsafeRowParquetRecordReader.java:640)
	at org.apache.spark.sql.execution.datasources.parquet.UnsafeRowParquetRecordReader$ColumnReader.access$000(UnsafeRowParquetRecordReader.java:461)
	at org.apache.spark.sql.execution.datasources.parquet.UnsafeRowParquetRecordReader.nextBatch(UnsafeRowParquetRecordReader.java:224)
## How was this patch tested?

The unit test `ParquetHadoopFsRelationSuite` based on `HadoopFsRelationTest` will be [failed](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/52110/consoleFull) due to the lack of short type support in UnsafeRowParquetRecordReader. By adding this support, the test can be passed.

Author: Liang-Chi Hsieh <vi...@gmail.com>

Closes #11412 from viirya/add-shorttype-support.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3814d0bc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3814d0bc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3814d0bc

Branch: refs/heads/master
Commit: 3814d0bcf6f1697a94123be4b224cbd7554025a9
Parents: ec0cc75
Author: Liang-Chi Hsieh <vi...@gmail.com>
Authored: Sat Feb 27 11:41:35 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Sat Feb 27 11:41:35 2016 -0800

----------------------------------------------------------------------
 .../parquet/UnsafeRowParquetRecordReader.java   |  3 ++
 .../parquet/VectorizedRleValuesReader.java      | 34 +++++++++++++++++++-
 2 files changed, 36 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3814d0bc/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
index 9d50cfa..e7f0ec2 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
@@ -765,6 +765,9 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
       } else if (DecimalType.is64BitDecimalType(column.dataType())) {
         defColumn.readIntsAsLongs(
             num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
+      } else if (column.dataType() == DataTypes.ShortType) {
+        defColumn.readShorts(
+            num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
       } else {
         throw new NotImplementedException("Unimplemented type: " + column.dataType());
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/3814d0bc/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
index b2048c0..8613fca 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
@@ -301,6 +301,38 @@ public final class VectorizedRleValuesReader extends ValuesReader
     }
   }
 
+  public void readShorts(int total, ColumnVector c,
+                        int rowId, int level, VectorizedValuesReader data) {
+    int left = total;
+    while (left > 0) {
+      if (this.currentCount == 0) this.readNextGroup();
+      int n = Math.min(left, this.currentCount);
+      switch (mode) {
+        case RLE:
+          if (currentValue == level) {
+            for (int i = 0; i < n; i++) {
+              c.putShort(rowId + i, (short)data.readInteger());
+            }
+          } else {
+            c.putNulls(rowId, n);
+          }
+          break;
+        case PACKED:
+          for (int i = 0; i < n; ++i) {
+            if (currentBuffer[currentBufferIdx++] == level) {
+              c.putShort(rowId + i, (short)data.readInteger());
+            } else {
+              c.putNull(rowId + i);
+            }
+          }
+          break;
+      }
+      rowId += n;
+      left -= n;
+      currentCount -= n;
+    }
+  }
+
   public void readLongs(int total, ColumnVector c, int rowId, int level,
                         VectorizedValuesReader data) {
     int left = total;
@@ -611,4 +643,4 @@ public final class VectorizedRleValuesReader extends ValuesReader
         throw new ParquetDecodingException("not a valid mode " + this.mode);
     }
   }
-}
\ No newline at end of file
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org