You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ge...@apache.org on 2022/06/03 18:45:07 UTC

[spark] branch master updated: [SPARK-39265][SQL] Support vectorized Parquet scans with DEFAULT values

This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e6598e8d4f [SPARK-39265][SQL] Support vectorized Parquet scans with DEFAULT values
3e6598e8d4f is described below

commit 3e6598e8d4fbfd7db595d991f6ebad92eb2fa33f
Author: Daniel Tenedorio <da...@databricks.com>
AuthorDate: Fri Jun 3 11:44:55 2022 -0700

    [SPARK-39265][SQL] Support vectorized Parquet scans with DEFAULT values
    
    ### What changes were proposed in this pull request?
    
    Support vectorized Parquet scans when the table schema has associated DEFAULT column values.
    
    Example:
    
    ```
    create table t(i int) using parquet;
    insert into t values(42);
    alter table t add column s string default concat('abc', def');
    select * from t;
    > 42, 'abcdef'
    ```
    
    ### Why are the changes needed?
    
    This change makes it easier to build, query, and maintain tables backed by Parquet data.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes.
    
    ### How was this patch tested?
    
    This PR includes new test coverage.
    
    Closes #36672 from dtenedor/default-parquet-vectorized.
    
    Authored-by: Daniel Tenedorio <da...@databricks.com>
    Signed-off-by: Gengliang Wang <ge...@apache.org>
---
 .../datasources/parquet/ParquetColumnVector.java   | 31 +++++++------
 .../parquet/SpecificParquetRecordReaderBase.java   |  5 ++-
 .../parquet/VectorizedParquetRecordReader.java     |  6 ++-
 .../execution/vectorized/WritableColumnVector.java | 52 ++++++++++++++++++++++
 .../execution/datasources/DataSourceStrategy.scala | 10 +++--
 .../sql/internal/BaseSessionStateBuilder.scala     |  2 +-
 .../sql/sources/DataSourceAnalysisSuite.scala      |  3 +-
 .../org/apache/spark/sql/sources/InsertSuite.scala | 17 ++++---
 .../spark/sql/hive/HiveSessionStateBuilder.scala   |  2 +-
 9 files changed, 97 insertions(+), 31 deletions(-)

diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java
index c8399d9137f..2ad8cdfcca6 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java
@@ -55,22 +55,14 @@ final class ParquetColumnVector {
   /** Reader for this column - only set if 'isPrimitive' is true */
   private VectorizedColumnReader columnReader;
 
-  ParquetColumnVector(
-      ParquetColumn column,
-      WritableColumnVector vector,
-      int capacity,
-      MemoryMode memoryMode,
-      Set<ParquetColumn> missingColumns) {
-    this(column, vector, capacity, memoryMode, missingColumns, true);
-  }
-
   ParquetColumnVector(
       ParquetColumn column,
       WritableColumnVector vector,
       int capacity,
       MemoryMode memoryMode,
       Set<ParquetColumn> missingColumns,
-      boolean isTopLevel) {
+      boolean isTopLevel,
+      Object defaultValue) {
     DataType sparkType = column.sparkType();
     if (!sparkType.sameType(vector.dataType())) {
       throw new IllegalArgumentException("Spark type: " + sparkType +
@@ -83,8 +75,21 @@ final class ParquetColumnVector {
     this.isPrimitive = column.isPrimitive();
 
     if (missingColumns.contains(column)) {
-      vector.setAllNull();
-      return;
+      if (defaultValue == null) {
+        vector.setAllNull();
+        return;
+      }
+      // For Parquet tables whose columns have associated DEFAULT values, this reader must return
+      // those values instead of NULL when the corresponding columns are not present in storage.
+      // Here we write the 'defaultValue' to each element in the new WritableColumnVector using
+      // the appendObjects method. This delegates to some specific append* method depending on the
+      // type of 'defaultValue'; for example, if 'defaultValue' is a Float, then we call the
+      // appendFloats method.
+      if (!vector.appendObjects(capacity, defaultValue).isPresent()) {
+        throw new IllegalArgumentException("Cannot assign default column value to result " +
+          "column batch in vectorized Parquet reader because the data type is not supported: " +
+          defaultValue);
+      }
     }
 
     if (isPrimitive) {
@@ -101,7 +106,7 @@ final class ParquetColumnVector {
 
       for (int i = 0; i < column.children().size(); i++) {
         ParquetColumnVector childCv = new ParquetColumnVector(column.children().apply(i),
-          vector.getChild(i), capacity, memoryMode, missingColumns, false);
+          vector.getChild(i), capacity, memoryMode, missingColumns, false, null);
         children.add(childCv);
 
 
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index 61aab6c5398..6ea1a0c37b1 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -71,6 +71,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
   protected MessageType fileSchema;
   protected MessageType requestedSchema;
   protected StructType sparkSchema;
+  protected StructType sparkRequestedSchema;
   // Keep track of the version of the parquet writer. An older version wrote
   // corrupt delta byte arrays, and the version check is needed to detect that.
   protected ParsedVersion writerVersion;
@@ -113,10 +114,10 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
     fileReader.setRequestedSchema(requestedSchema);
     String sparkRequestedSchemaString =
         configuration.get(ParquetReadSupport$.MODULE$.SPARK_ROW_REQUESTED_SCHEMA());
-    StructType sparkRequestedSchema = StructType$.MODULE$.fromString(sparkRequestedSchemaString);
+    this.sparkRequestedSchema = StructType$.MODULE$.fromString(sparkRequestedSchemaString);
     ParquetToSparkSchemaConverter converter = new ParquetToSparkSchemaConverter(configuration);
     this.parquetColumn = converter.convertParquetColumn(requestedSchema,
-      Option.apply(sparkRequestedSchema));
+      Option.apply(this.sparkRequestedSchema));
     this.sparkSchema = (StructType) parquetColumn.sparkType();
     this.totalRowCount = fileReader.getFilteredRecordCount();
 
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
index 80f6f88810a..6a30876a3bc 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
@@ -259,8 +259,12 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
 
     columnVectors = new ParquetColumnVector[sparkSchema.fields().length];
     for (int i = 0; i < columnVectors.length; i++) {
+      Object defaultValue = null;
+      if (sparkRequestedSchema != null) {
+        defaultValue = sparkRequestedSchema.existenceDefaultValues()[i];
+      }
       columnVectors[i] = new ParquetColumnVector(parquetColumn.children().apply(i),
-        vectors[i], capacity, memMode, missingColumns);
+        vectors[i], capacity, memMode, missingColumns, true, defaultValue);
     }
 
     if (partitionColumns != null) {
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
index 88930d509cf..5debc1adacd 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.vectorized;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
+import java.util.Optional;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -690,6 +691,57 @@ public abstract class WritableColumnVector extends ColumnVector {
     return elementsAppended;
   }
 
+  /**
+   * Appends multiple copies of a Java Object to the vector using the corresponding append* method
+   * above.
+   * @param length: The number of instances to append
+   * @param value value to append to the vector
+   * @return the number of values appended if the value maps to one of the append* methods above,
+   * or Optional.empty() otherwise.
+   */
+  public Optional<Integer> appendObjects(int length, Object value) {
+    if (value instanceof Boolean) {
+      return Optional.of(appendBooleans(length, (Boolean) value));
+    }
+    if (value instanceof Byte) {
+      return Optional.of(appendBytes(length, (Byte) value));
+    }
+    if (value instanceof Decimal) {
+      Decimal decimal = (Decimal) value;
+      long unscaled = decimal.toUnscaledLong();
+      if (decimal.precision() < 10) {
+        return Optional.of(appendInts(length, (int) unscaled));
+      } else {
+        return Optional.of(appendLongs(length, unscaled));
+      }
+    }
+    if (value instanceof Double) {
+      return Optional.of(appendDoubles(length, (Double) value));
+    }
+    if (value instanceof Float) {
+      return Optional.of(appendFloats(length, (Float) value));
+    }
+    if (value instanceof Integer) {
+      return Optional.of(appendInts(length, (Integer) value));
+    }
+    if (value instanceof Long) {
+      return Optional.of(appendLongs(length, (Long) value));
+    }
+    if (value instanceof Short) {
+      return Optional.of(appendShorts(length, (Short) value));
+    }
+    if (value instanceof UTF8String) {
+      UTF8String utf8 = (UTF8String) value;
+      byte[] bytes = utf8.getBytes();
+      int result = 0;
+      for (int i = 0; i < length; ++i) {
+        result += appendByteArray(bytes, 0, bytes.length);
+      }
+      return Optional.of(result);
+    }
+    return Optional.empty();
+  }
+
   // `WritableColumnVector` puts the data of array in the first child column vector, and puts the
   // array offsets and lengths in the current column vector.
   @Override
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index bf5882bd63f..429b7072cae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.planning.ScanOperation
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, Project}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
-import org.apache.spark.sql.catalyst.util.V2ExpressionBuilder
+import org.apache.spark.sql.catalyst.util.{ResolveDefaultColumns, V2ExpressionBuilder}
 import org.apache.spark.sql.connector.catalog.SupportsRead
 import org.apache.spark.sql.connector.catalog.TableCapability._
 import org.apache.spark.sql.connector.expressions.{Expression => V2Expression, FieldReference, NullOrdering, SortDirection, SortOrder => V2SortOrder, SortValue}
@@ -61,7 +61,7 @@ import org.apache.spark.unsafe.types.UTF8String
  * Note that, this rule must be run after `PreprocessTableCreation` and
  * `PreprocessTableInsertion`.
  */
-object DataSourceAnalysis extends Rule[LogicalPlan] {
+case class DataSourceAnalysis(analyzer: Analyzer) extends Rule[LogicalPlan] {
 
   def resolver: Resolver = conf.resolver
 
@@ -147,7 +147,11 @@ object DataSourceAnalysis extends Rule[LogicalPlan] {
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
     case CreateTable(tableDesc, mode, None) if DDLUtils.isDatasourceTable(tableDesc) =>
-      CreateDataSourceTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore)
+      val newTableDesc: CatalogTable =
+        tableDesc.copy(schema =
+          ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
+            analyzer, tableDesc.schema, "CREATE TABLE"))
+      CreateDataSourceTableCommand(newTableDesc, ignoreIfExists = mode == SaveMode.Ignore)
 
     case CreateTable(tableDesc, mode, Some(query))
         if query.resolved && DDLUtils.isDatasourceTable(tableDesc) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
index 92725cda49c..2271990741d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
@@ -195,7 +195,7 @@ abstract class BaseSessionStateBuilder(
       DetectAmbiguousSelfJoin +:
         PreprocessTableCreation(session) +:
         PreprocessTableInsertion +:
-        DataSourceAnalysis +:
+        DataSourceAnalysis(this) +:
         ReplaceCharWithVarchar +:
         customPostHocResolutionRules
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceAnalysisSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceAnalysisSuite.scala
index 2df79e3da80..152d096afdf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceAnalysisSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceAnalysisSuite.scala
@@ -21,6 +21,7 @@ import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, Expression, Literal}
 import org.apache.spark.sql.catalyst.plans.SQLHelper
@@ -70,7 +71,7 @@ class DataSourceAnalysisSuite extends SparkFunSuite with BeforeAndAfterAll with
           Cast(e, dt, Option(SQLConf.get.sessionLocalTimeZone))
       }
     }
-    val rule = DataSourceAnalysis
+    val rule = DataSourceAnalysis(SimpleAnalyzer)
     testRule(
       "convertStaticPartitions only handle INSERT having at least static partitions",
         caseSensitive) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index c5313a642f9..35a6f8f8a0b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -1029,24 +1029,22 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
     }
     // The default value fails to analyze.
     withTable("t") {
-      sql("create table t(i boolean, s bigint default badvalue) using parquet")
       assert(intercept[AnalysisException] {
-        sql("insert into t values (default, default)")
+        sql("create table t(i boolean, s bigint default badvalue) using parquet")
       }.getMessage.contains(Errors.COMMON_SUBSTRING))
     }
     // The default value analyzes to a table not in the catalog.
     withTable("t") {
-      sql("create table t(i boolean, s bigint default (select min(x) from badtable)) using parquet")
       assert(intercept[AnalysisException] {
-        sql("insert into t values (default, default)")
+        sql("create table t(i boolean, s bigint default (select min(x) from badtable)) " +
+          "using parquet")
       }.getMessage.contains(Errors.COMMON_SUBSTRING))
     }
     // The default value parses but refers to a table from the catalog.
     withTable("t", "other") {
       sql("create table other(x string) using parquet")
-      sql("create table t(i boolean, s bigint default (select min(x) from other)) using parquet")
       assert(intercept[AnalysisException] {
-        sql("insert into t values (default, default)")
+        sql("create table t(i boolean, s bigint default (select min(x) from other)) using parquet")
       }.getMessage.contains(Errors.COMMON_SUBSTRING))
     }
     // The default value has an explicit alias. It fails to evaluate when inlined into the VALUES
@@ -1083,10 +1081,9 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
     }
     // The default value parses but the type is not coercible.
     withTable("t") {
-      sql("create table t(i boolean, s bigint default false) using parquet")
       assert(intercept[AnalysisException] {
-        sql("insert into t values (default, default)")
-      }.getMessage.contains("provided a value of incompatible type"))
+        sql("create table t(i boolean, s bigint default false) using parquet")
+      }.getMessage.contains(Errors.COMMON_SUBSTRING))
     }
     // The number of columns in the INSERT INTO statement is greater than the number of columns in
     // the table.
@@ -1617,6 +1614,8 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
       TestCase(
         dataSource = "parquet",
         Seq(
+          Config(
+            None),
           Config(
             Some(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false"),
             insertNullsToStorage = false)))
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
index 510d4c13117..b554958572a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
@@ -100,7 +100,7 @@ class HiveSessionStateBuilder(
         RelationConversions(catalog) +:
         PreprocessTableCreation(session) +:
         PreprocessTableInsertion +:
-        DataSourceAnalysis +:
+        DataSourceAnalysis(this) +:
         HiveAnalysis +:
         ReplaceCharWithVarchar +:
         customPostHocResolutionRules


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org