You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ge...@apache.org on 2022/06/03 20:48:41 UTC

[spark] branch master updated: [SPARK-39294][SQL] Support vectorized Orc scans with DEFAULT values

This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e76c2142b3 [SPARK-39294][SQL] Support vectorized Orc scans with DEFAULT values
8e76c2142b3 is described below

commit 8e76c2142b382410f1c0091d873b2ee84e9cbd62
Author: Daniel Tenedorio <da...@databricks.com>
AuthorDate: Fri Jun 3 13:48:27 2022 -0700

    [SPARK-39294][SQL] Support vectorized Orc scans with DEFAULT values
    
    ### What changes were proposed in this pull request?
    
    Support vectorized Orc scans when the table schema has associated DEFAULT column values.
    
    (Note, this PR depends on https://github.com/apache/spark/pull/36672 which adds the same for Parquet files.)
    
    Example:
    
    ```
    create table t(i int) using orc;
    insert into t values(42);
    alter table t add column s string default concat('abc', def');
    select * from t;
    > 42, 'abcdef'
    ```
    
    ### Why are the changes needed?
    
    This change makes it easier to build, query, and maintain tables backed by Orc data.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes.
    
    ### How was this patch tested?
    
    This PR includes new test coverage.
    
    Closes #36675 from dtenedor/default-orc-vectorized.
    
    Authored-by: Daniel Tenedorio <da...@databricks.com>
    Signed-off-by: Gengliang Wang <ge...@apache.org>
---
 .../execution/datasources/orc/OrcColumnarBatchReader.java    | 12 +++++++++++-
 .../scala/org/apache/spark/sql/sources/InsertSuite.scala     |  2 ++
 2 files changed, 13 insertions(+), 1 deletion(-)

diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
index 40ed0b2454c..175ad37aace 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
@@ -164,6 +164,7 @@ public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> {
     // Just wrap the ORC column vector instead of copying it to Spark column vector.
     orcVectorWrappers = new org.apache.spark.sql.vectorized.ColumnVector[resultSchema.length()];
 
+    StructType requiredSchema = new StructType(requiredFields);
     for (int i = 0; i < requiredFields.length; i++) {
       DataType dt = requiredFields[i].dataType();
       if (requestedPartitionColIds[i] != -1) {
@@ -176,7 +177,16 @@ public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> {
         // Initialize the missing columns once.
         if (colId == -1) {
           OnHeapColumnVector missingCol = new OnHeapColumnVector(capacity, dt);
-          missingCol.putNulls(0, capacity);
+          // Check if the missing column has an associated default value in the schema metadata.
+          // If so, fill the corresponding column vector with the value.
+          Object defaultValue = requiredSchema.existenceDefaultValues()[i];
+          if (defaultValue == null) {
+            missingCol.putNulls(0, capacity);
+          } else if (!missingCol.appendObjects(capacity, defaultValue).isPresent()) {
+            throw new IllegalArgumentException("Cannot assign default column value to result " +
+              "column batch in vectorized Orc reader because the data type is not supported: " +
+              defaultValue);
+          }
           missingCol.setIsConstant();
           orcVectorWrappers[i] = missingCol;
         } else {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 35a6f8f8a0b..1b70998c642 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -1608,6 +1608,8 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
       TestCase(
         dataSource = "orc",
         Seq(
+          Config(
+            None),
           Config(
             Some(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false"),
             insertNullsToStorage = false))),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org