You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ge...@apache.org on 2022/06/03 18:45:07 UTC
[spark] branch master updated: [SPARK-39265][SQL] Support vectorized Parquet scans with DEFAULT values
This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 3e6598e8d4f [SPARK-39265][SQL] Support vectorized Parquet scans with DEFAULT values
3e6598e8d4f is described below
commit 3e6598e8d4fbfd7db595d991f6ebad92eb2fa33f
Author: Daniel Tenedorio <da...@databricks.com>
AuthorDate: Fri Jun 3 11:44:55 2022 -0700
[SPARK-39265][SQL] Support vectorized Parquet scans with DEFAULT values
### What changes were proposed in this pull request?
Support vectorized Parquet scans when the table schema has associated DEFAULT column values.
Example:
```
create table t(i int) using parquet;
insert into t values(42);
alter table t add column s string default concat('abc', def');
select * from t;
> 42, 'abcdef'
```
### Why are the changes needed?
This change makes it easier to build, query, and maintain tables backed by Parquet data.
### Does this PR introduce _any_ user-facing change?
Yes.
### How was this patch tested?
This PR includes new test coverage.
Closes #36672 from dtenedor/default-parquet-vectorized.
Authored-by: Daniel Tenedorio <da...@databricks.com>
Signed-off-by: Gengliang Wang <ge...@apache.org>
---
.../datasources/parquet/ParquetColumnVector.java | 31 +++++++------
.../parquet/SpecificParquetRecordReaderBase.java | 5 ++-
.../parquet/VectorizedParquetRecordReader.java | 6 ++-
.../execution/vectorized/WritableColumnVector.java | 52 ++++++++++++++++++++++
.../execution/datasources/DataSourceStrategy.scala | 10 +++--
.../sql/internal/BaseSessionStateBuilder.scala | 2 +-
.../sql/sources/DataSourceAnalysisSuite.scala | 3 +-
.../org/apache/spark/sql/sources/InsertSuite.scala | 17 ++++---
.../spark/sql/hive/HiveSessionStateBuilder.scala | 2 +-
9 files changed, 97 insertions(+), 31 deletions(-)
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java
index c8399d9137f..2ad8cdfcca6 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java
@@ -55,22 +55,14 @@ final class ParquetColumnVector {
/** Reader for this column - only set if 'isPrimitive' is true */
private VectorizedColumnReader columnReader;
- ParquetColumnVector(
- ParquetColumn column,
- WritableColumnVector vector,
- int capacity,
- MemoryMode memoryMode,
- Set<ParquetColumn> missingColumns) {
- this(column, vector, capacity, memoryMode, missingColumns, true);
- }
-
ParquetColumnVector(
ParquetColumn column,
WritableColumnVector vector,
int capacity,
MemoryMode memoryMode,
Set<ParquetColumn> missingColumns,
- boolean isTopLevel) {
+ boolean isTopLevel,
+ Object defaultValue) {
DataType sparkType = column.sparkType();
if (!sparkType.sameType(vector.dataType())) {
throw new IllegalArgumentException("Spark type: " + sparkType +
@@ -83,8 +75,21 @@ final class ParquetColumnVector {
this.isPrimitive = column.isPrimitive();
if (missingColumns.contains(column)) {
- vector.setAllNull();
- return;
+ if (defaultValue == null) {
+ vector.setAllNull();
+ return;
+ }
+ // For Parquet tables whose columns have associated DEFAULT values, this reader must return
+ // those values instead of NULL when the corresponding columns are not present in storage.
+ // Here we write the 'defaultValue' to each element in the new WritableColumnVector using
+ // the appendObjects method. This delegates to some specific append* method depending on the
+ // type of 'defaultValue'; for example, if 'defaultValue' is a Float, then we call the
+ // appendFloats method.
+ if (!vector.appendObjects(capacity, defaultValue).isPresent()) {
+ throw new IllegalArgumentException("Cannot assign default column value to result " +
+ "column batch in vectorized Parquet reader because the data type is not supported: " +
+ defaultValue);
+ }
}
if (isPrimitive) {
@@ -101,7 +106,7 @@ final class ParquetColumnVector {
for (int i = 0; i < column.children().size(); i++) {
ParquetColumnVector childCv = new ParquetColumnVector(column.children().apply(i),
- vector.getChild(i), capacity, memoryMode, missingColumns, false);
+ vector.getChild(i), capacity, memoryMode, missingColumns, false, null);
children.add(childCv);
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index 61aab6c5398..6ea1a0c37b1 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -71,6 +71,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
protected MessageType fileSchema;
protected MessageType requestedSchema;
protected StructType sparkSchema;
+ protected StructType sparkRequestedSchema;
// Keep track of the version of the parquet writer. An older version wrote
// corrupt delta byte arrays, and the version check is needed to detect that.
protected ParsedVersion writerVersion;
@@ -113,10 +114,10 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
fileReader.setRequestedSchema(requestedSchema);
String sparkRequestedSchemaString =
configuration.get(ParquetReadSupport$.MODULE$.SPARK_ROW_REQUESTED_SCHEMA());
- StructType sparkRequestedSchema = StructType$.MODULE$.fromString(sparkRequestedSchemaString);
+ this.sparkRequestedSchema = StructType$.MODULE$.fromString(sparkRequestedSchemaString);
ParquetToSparkSchemaConverter converter = new ParquetToSparkSchemaConverter(configuration);
this.parquetColumn = converter.convertParquetColumn(requestedSchema,
- Option.apply(sparkRequestedSchema));
+ Option.apply(this.sparkRequestedSchema));
this.sparkSchema = (StructType) parquetColumn.sparkType();
this.totalRowCount = fileReader.getFilteredRecordCount();
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
index 80f6f88810a..6a30876a3bc 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
@@ -259,8 +259,12 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
columnVectors = new ParquetColumnVector[sparkSchema.fields().length];
for (int i = 0; i < columnVectors.length; i++) {
+ Object defaultValue = null;
+ if (sparkRequestedSchema != null) {
+ defaultValue = sparkRequestedSchema.existenceDefaultValues()[i];
+ }
columnVectors[i] = new ParquetColumnVector(parquetColumn.children().apply(i),
- vectors[i], capacity, memMode, missingColumns);
+ vectors[i], capacity, memMode, missingColumns, true, defaultValue);
}
if (partitionColumns != null) {
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
index 88930d509cf..5debc1adacd 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.vectorized;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
+import java.util.Optional;
import com.google.common.annotations.VisibleForTesting;
@@ -690,6 +691,57 @@ public abstract class WritableColumnVector extends ColumnVector {
return elementsAppended;
}
+ /**
+ * Appends multiple copies of a Java Object to the vector using the corresponding append* method
+ * above.
+ * @param length: The number of instances to append
+ * @param value value to append to the vector
+ * @return the number of values appended if the value maps to one of the append* methods above,
+ * or Optional.empty() otherwise.
+ */
+ public Optional<Integer> appendObjects(int length, Object value) {
+ if (value instanceof Boolean) {
+ return Optional.of(appendBooleans(length, (Boolean) value));
+ }
+ if (value instanceof Byte) {
+ return Optional.of(appendBytes(length, (Byte) value));
+ }
+ if (value instanceof Decimal) {
+ Decimal decimal = (Decimal) value;
+ long unscaled = decimal.toUnscaledLong();
+ if (decimal.precision() < 10) {
+ return Optional.of(appendInts(length, (int) unscaled));
+ } else {
+ return Optional.of(appendLongs(length, unscaled));
+ }
+ }
+ if (value instanceof Double) {
+ return Optional.of(appendDoubles(length, (Double) value));
+ }
+ if (value instanceof Float) {
+ return Optional.of(appendFloats(length, (Float) value));
+ }
+ if (value instanceof Integer) {
+ return Optional.of(appendInts(length, (Integer) value));
+ }
+ if (value instanceof Long) {
+ return Optional.of(appendLongs(length, (Long) value));
+ }
+ if (value instanceof Short) {
+ return Optional.of(appendShorts(length, (Short) value));
+ }
+ if (value instanceof UTF8String) {
+ UTF8String utf8 = (UTF8String) value;
+ byte[] bytes = utf8.getBytes();
+ int result = 0;
+ for (int i = 0; i < length; ++i) {
+ result += appendByteArray(bytes, 0, bytes.length);
+ }
+ return Optional.of(result);
+ }
+ return Optional.empty();
+ }
+
// `WritableColumnVector` puts the data of array in the first child column vector, and puts the
// array offsets and lengths in the current column vector.
@Override
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index bf5882bd63f..429b7072cae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.planning.ScanOperation
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
-import org.apache.spark.sql.catalyst.util.V2ExpressionBuilder
+import org.apache.spark.sql.catalyst.util.{ResolveDefaultColumns, V2ExpressionBuilder}
import org.apache.spark.sql.connector.catalog.SupportsRead
import org.apache.spark.sql.connector.catalog.TableCapability._
import org.apache.spark.sql.connector.expressions.{Expression => V2Expression, FieldReference, NullOrdering, SortDirection, SortOrder => V2SortOrder, SortValue}
@@ -61,7 +61,7 @@ import org.apache.spark.unsafe.types.UTF8String
* Note that, this rule must be run after `PreprocessTableCreation` and
* `PreprocessTableInsertion`.
*/
-object DataSourceAnalysis extends Rule[LogicalPlan] {
+case class DataSourceAnalysis(analyzer: Analyzer) extends Rule[LogicalPlan] {
def resolver: Resolver = conf.resolver
@@ -147,7 +147,11 @@ object DataSourceAnalysis extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case CreateTable(tableDesc, mode, None) if DDLUtils.isDatasourceTable(tableDesc) =>
- CreateDataSourceTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore)
+ val newTableDesc: CatalogTable =
+ tableDesc.copy(schema =
+ ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
+ analyzer, tableDesc.schema, "CREATE TABLE"))
+ CreateDataSourceTableCommand(newTableDesc, ignoreIfExists = mode == SaveMode.Ignore)
case CreateTable(tableDesc, mode, Some(query))
if query.resolved && DDLUtils.isDatasourceTable(tableDesc) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
index 92725cda49c..2271990741d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
@@ -195,7 +195,7 @@ abstract class BaseSessionStateBuilder(
DetectAmbiguousSelfJoin +:
PreprocessTableCreation(session) +:
PreprocessTableInsertion +:
- DataSourceAnalysis +:
+ DataSourceAnalysis(this) +:
ReplaceCharWithVarchar +:
customPostHocResolutionRules
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceAnalysisSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceAnalysisSuite.scala
index 2df79e3da80..152d096afdf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceAnalysisSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceAnalysisSuite.scala
@@ -21,6 +21,7 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, Expression, Literal}
import org.apache.spark.sql.catalyst.plans.SQLHelper
@@ -70,7 +71,7 @@ class DataSourceAnalysisSuite extends SparkFunSuite with BeforeAndAfterAll with
Cast(e, dt, Option(SQLConf.get.sessionLocalTimeZone))
}
}
- val rule = DataSourceAnalysis
+ val rule = DataSourceAnalysis(SimpleAnalyzer)
testRule(
"convertStaticPartitions only handle INSERT having at least static partitions",
caseSensitive) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index c5313a642f9..35a6f8f8a0b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -1029,24 +1029,22 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
}
// The default value fails to analyze.
withTable("t") {
- sql("create table t(i boolean, s bigint default badvalue) using parquet")
assert(intercept[AnalysisException] {
- sql("insert into t values (default, default)")
+ sql("create table t(i boolean, s bigint default badvalue) using parquet")
}.getMessage.contains(Errors.COMMON_SUBSTRING))
}
// The default value analyzes to a table not in the catalog.
withTable("t") {
- sql("create table t(i boolean, s bigint default (select min(x) from badtable)) using parquet")
assert(intercept[AnalysisException] {
- sql("insert into t values (default, default)")
+ sql("create table t(i boolean, s bigint default (select min(x) from badtable)) " +
+ "using parquet")
}.getMessage.contains(Errors.COMMON_SUBSTRING))
}
// The default value parses but refers to a table from the catalog.
withTable("t", "other") {
sql("create table other(x string) using parquet")
- sql("create table t(i boolean, s bigint default (select min(x) from other)) using parquet")
assert(intercept[AnalysisException] {
- sql("insert into t values (default, default)")
+ sql("create table t(i boolean, s bigint default (select min(x) from other)) using parquet")
}.getMessage.contains(Errors.COMMON_SUBSTRING))
}
// The default value has an explicit alias. It fails to evaluate when inlined into the VALUES
@@ -1083,10 +1081,9 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
}
// The default value parses but the type is not coercible.
withTable("t") {
- sql("create table t(i boolean, s bigint default false) using parquet")
assert(intercept[AnalysisException] {
- sql("insert into t values (default, default)")
- }.getMessage.contains("provided a value of incompatible type"))
+ sql("create table t(i boolean, s bigint default false) using parquet")
+ }.getMessage.contains(Errors.COMMON_SUBSTRING))
}
// The number of columns in the INSERT INTO statement is greater than the number of columns in
// the table.
@@ -1617,6 +1614,8 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
TestCase(
dataSource = "parquet",
Seq(
+ Config(
+ None),
Config(
Some(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false"),
insertNullsToStorage = false)))
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
index 510d4c13117..b554958572a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
@@ -100,7 +100,7 @@ class HiveSessionStateBuilder(
RelationConversions(catalog) +:
PreprocessTableCreation(session) +:
PreprocessTableInsertion +:
- DataSourceAnalysis +:
+ DataSourceAnalysis(this) +:
HiveAnalysis +:
ReplaceCharWithVarchar +:
customPostHocResolutionRules
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org