You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ge...@apache.org on 2022/06/03 20:48:41 UTC
[spark] branch master updated: [SPARK-39294][SQL] Support vectorized Orc scans with DEFAULT values
This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8e76c2142b3 [SPARK-39294][SQL] Support vectorized Orc scans with DEFAULT values
8e76c2142b3 is described below
commit 8e76c2142b382410f1c0091d873b2ee84e9cbd62
Author: Daniel Tenedorio <da...@databricks.com>
AuthorDate: Fri Jun 3 13:48:27 2022 -0700
[SPARK-39294][SQL] Support vectorized Orc scans with DEFAULT values
### What changes were proposed in this pull request?
Support vectorized Orc scans when the table schema has associated DEFAULT column values.
(Note, this PR depends on https://github.com/apache/spark/pull/36672 which adds the same for Parquet files.)
Example:
```
create table t(i int) using orc;
insert into t values(42);
alter table t add column s string default concat('abc', def');
select * from t;
> 42, 'abcdef'
```
### Why are the changes needed?
This change makes it easier to build, query, and maintain tables backed by Orc data.
### Does this PR introduce _any_ user-facing change?
Yes.
### How was this patch tested?
This PR includes new test coverage.
Closes #36675 from dtenedor/default-orc-vectorized.
Authored-by: Daniel Tenedorio <da...@databricks.com>
Signed-off-by: Gengliang Wang <ge...@apache.org>
---
.../execution/datasources/orc/OrcColumnarBatchReader.java | 12 +++++++++++-
.../scala/org/apache/spark/sql/sources/InsertSuite.scala | 2 ++
2 files changed, 13 insertions(+), 1 deletion(-)
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
index 40ed0b2454c..175ad37aace 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
@@ -164,6 +164,7 @@ public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> {
// Just wrap the ORC column vector instead of copying it to Spark column vector.
orcVectorWrappers = new org.apache.spark.sql.vectorized.ColumnVector[resultSchema.length()];
+ StructType requiredSchema = new StructType(requiredFields);
for (int i = 0; i < requiredFields.length; i++) {
DataType dt = requiredFields[i].dataType();
if (requestedPartitionColIds[i] != -1) {
@@ -176,7 +177,16 @@ public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> {
// Initialize the missing columns once.
if (colId == -1) {
OnHeapColumnVector missingCol = new OnHeapColumnVector(capacity, dt);
- missingCol.putNulls(0, capacity);
+ // Check if the missing column has an associated default value in the schema metadata.
+ // If so, fill the corresponding column vector with the value.
+ Object defaultValue = requiredSchema.existenceDefaultValues()[i];
+ if (defaultValue == null) {
+ missingCol.putNulls(0, capacity);
+ } else if (!missingCol.appendObjects(capacity, defaultValue).isPresent()) {
+ throw new IllegalArgumentException("Cannot assign default column value to result " +
+ "column batch in vectorized Orc reader because the data type is not supported: " +
+ defaultValue);
+ }
missingCol.setIsConstant();
orcVectorWrappers[i] = missingCol;
} else {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 35a6f8f8a0b..1b70998c642 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -1608,6 +1608,8 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
TestCase(
dataSource = "orc",
Seq(
+ Config(
+ None),
Config(
Some(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false"),
insertNullsToStorage = false))),
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org