You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ge...@apache.org on 2022/07/12 19:17:07 UTC

[spark] branch master updated: [SPARK-39557][SQL] Support ARRAY, STRUCT, MAP types as DEFAULT values

This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f2c50ea0b5 [SPARK-39557][SQL] Support ARRAY, STRUCT, MAP types as DEFAULT values
1f2c50ea0b5 is described below

commit 1f2c50ea0b56b5e99129091952661c8545f2fd96
Author: Daniel Tenedorio <da...@databricks.com>
AuthorDate: Tue Jul 12 12:16:36 2022 -0700

    [SPARK-39557][SQL] Support ARRAY, STRUCT, MAP types as DEFAULT values
    
    ### What changes were proposed in this pull request?
    
    Support ARRAY, STRUCT, MAP types as DEFAULT values.
    
    Previously these types were not supported and DDL commands that attempted to use them returned error messages. Now they work wherever DEFAULT values are supported.
    
    Examples:
    
    ```
    CREATE TABLE t(i BOOLEAN) USING PARQUET;
    INSERT INTO t SELECT FALSE;
    ALTER TABLE t ADD COLUMN s ARRAY<INT> DEFAULT ARRAY(1, 2);
    SELECT * FROM t;
    > false, [1, 2]
    
    CREATE TABLE t(I BOOLEAN) USING PARQUET;
    INSERT INTO t SELECT FALSE;
    ALTER TABLE t ADD COLUMN s STRUCT<x BOOLEAN, y STRING> DEFAULT STRUCT(TRUE, 'ABC')");
    SELECT * FROM t;
    > false, {true, abc}
    
    CREATE TABLE t(i BOOLEAN) USING PARQUET;
    INSERT INTO t SELECT FALSE;
    ALTER TABLE t ADD COLUMN s MAP<BOOLEAN, STRING> DEFAULT MAP(TRUE, 'ABC');
    SELECT * FROM t;
    > false, {true -> abc}
    ```
    
    ### Why are the changes needed?
    
    This new functionality expands the usefulness of DEFAULT column values and Spark SQL in general.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes.
    
    ### How was this patch tested?
    
    This PR adds test both positive and negative unit test coverage.
    
    Closes #36960 from dtenedor/array-struct-map-defaults.
    
    Authored-by: Daniel Tenedorio <da...@databricks.com>
    Signed-off-by: Gengliang Wang <ge...@apache.org>
---
 .../spark/sql/catalyst/expressions/literals.scala  |  25 ++++
 .../catalyst/util/ResolveDefaultColumnsUtil.scala  |   4 +-
 .../aggregate/AggregateExpressionSuite.scala       |   6 +-
 .../apache/spark/sql/types/StructTypeSuite.scala   |  13 +-
 .../execution/vectorized/WritableColumnVector.java |  51 +++++++
 .../org/apache/spark/sql/sources/InsertSuite.scala | 154 +++++++++++++++++++++
 6 files changed, 242 insertions(+), 11 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
index af10a18e4d1..03678773ccc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
@@ -485,6 +485,31 @@ case class Literal (value: Any, dataType: DataType) extends LeafExpression {
       toDayTimeIntervalString(i, ANSI_STYLE, startField, endField)
     case (i: Int, YearMonthIntervalType(startField, endField)) =>
       toYearMonthIntervalString(i, ANSI_STYLE, startField, endField)
+    case (data: GenericArrayData, arrayType: ArrayType) =>
+      val arrayValues: Array[String] =
+        data.array.map {
+          Literal(_, arrayType.elementType).sql
+        }
+      s"ARRAY(${arrayValues.mkString(", ")})"
+    case (row: GenericInternalRow, structType: StructType) =>
+      val structNames: Array[String] = structType.fields.map(_.name)
+      val structValues: Array[String] =
+        row.values.zip(structType.fields.map(_.dataType)).map {
+          case (value: Any, fieldType: DataType) =>
+            Literal(value, fieldType).sql
+        }
+      val structFields: Array[String] =
+        structNames.zip(structValues).map { kv => s"${kv._1}, ${kv._2}" }
+      s"NAMED_STRUCT(${structFields.mkString(", ")})"
+    case (data: ArrayBasedMapData, mapType: MapType) =>
+      val keyData = data.keyArray.asInstanceOf[GenericArrayData]
+      val valueData = data.valueArray.asInstanceOf[GenericArrayData]
+      val keysAndValues: Array[String] =
+        keyData.array.zip(valueData.array).map {
+          case (key: Any, value: Any) =>
+            s"${Literal(key, mapType.keyType).sql}, ${Literal(value, mapType.valueType).sql}"
+        }
+      s"MAP(${keysAndValues.mkString(", ")})"
     case _ => value.toString
   }
 }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
index 2c3b1f35fb4..4865fe378cc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
@@ -202,12 +202,12 @@ object ResolveDefaultColumns {
       val defaultValue: Option[String] = field.getExistenceDefaultValue()
       defaultValue.map { text: String =>
         val expr = try {
-          val expr = CatalystSqlParser.parseExpression(text)
+          val expr = analyze(field, "")
           expr match {
             case _: ExprLiteral | _: Cast => expr
           }
         } catch {
-          case _: ParseException | _: MatchError =>
+          case _: AnalysisException | _: MatchError =>
             throw QueryCompilationErrors.failedToParseExistenceDefaultAsLiteral(field.name, text)
         }
         // The expression should be a literal value by this point, possibly wrapped in a cast
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/AggregateExpressionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/AggregateExpressionSuite.scala
index 9be586a31fc..410b11eda50 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/AggregateExpressionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/AggregateExpressionSuite.scala
@@ -43,7 +43,7 @@ class AggregateExpressionSuite extends SparkFunSuite {
     val checkResult3 = RegrR2(Literal(3.0D), Literal(Array(0))).checkInputDataTypes()
     assert(checkResult3.isInstanceOf[TypeCheckResult.TypeCheckFailure])
     assert(checkResult3.asInstanceOf[TypeCheckResult.TypeCheckFailure].message
-      .contains("argument 2 requires double type, however, '[0]' is of array<int> type"))
+      .contains("argument 2 requires double type, however, 'ARRAY(0)' is of array<int> type"))
     assert(RegrR2(Literal(3.0D), Literal(1d)).checkInputDataTypes() ===
       TypeCheckResult.TypeCheckSuccess)
   }
@@ -60,7 +60,7 @@ class AggregateExpressionSuite extends SparkFunSuite {
     val checkResult3 = RegrSlope(Literal(3.0D), Literal(Array(0))).checkInputDataTypes()
     assert(checkResult3.isInstanceOf[TypeCheckResult.TypeCheckFailure])
     assert(checkResult3.asInstanceOf[TypeCheckResult.TypeCheckFailure].message
-      .contains("argument 2 requires double type, however, '[0]' is of array<int> type"))
+      .contains("argument 2 requires double type, however, 'ARRAY(0)' is of array<int> type"))
     assert(RegrSlope(Literal(3.0D), Literal(1D)).checkInputDataTypes() ===
       TypeCheckResult.TypeCheckSuccess)
   }
@@ -77,7 +77,7 @@ class AggregateExpressionSuite extends SparkFunSuite {
     val checkResult3 = RegrIntercept(Literal(3.0D), Literal(Array(0))).checkInputDataTypes()
     assert(checkResult3.isInstanceOf[TypeCheckResult.TypeCheckFailure])
     assert(checkResult3.asInstanceOf[TypeCheckResult.TypeCheckFailure].message
-      .contains("argument 2 requires double type, however, '[0]' is of array<int> type"))
+      .contains("argument 2 requires double type, however, 'ARRAY(0)' is of array<int> type"))
     assert(RegrIntercept(Literal(3.0D), Literal(1D)).checkInputDataTypes() ===
       TypeCheckResult.TypeCheckSuccess)
   }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala
index 940a8e5e2ec..5ae890ef5fc 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala
@@ -449,7 +449,8 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper {
       StructField("c1", LongType, true,
         new MetadataBuilder()
           .putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, "CAST(42 AS BIGINT)")
-          .putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "CAST(42 AS BIGINT")
+          .putString(
+            ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "CAST(42 AS BIGINT)")
           .build()),
       StructField("c2", StringType, true,
         new MetadataBuilder()
@@ -462,8 +463,9 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper {
     assert(source1.existenceDefaultValues(1) == UTF8String.fromString("abc"))
     assert(source1.existenceDefaultValues(2) == null)
 
-    // Negative test: StructType.defaultValues fails because the existence default value parses and
-    // resolves successfully, but evaluates to a non-literal expression.
+    // Positive test: StructType.defaultValues works because the existence default value parses and
+    // resolves successfully, then evaluates to a non-literal expression: this is constant-folded at
+    // reference time.
     val source2 = StructType(
       Array(StructField("c1", IntegerType, true,
         new MetadataBuilder()
@@ -471,9 +473,8 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper {
           .putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "1 + 1")
           .build())))
     val error = "fails to parse as a valid literal value"
-  assert(intercept[AnalysisException] {
-      source2.existenceDefaultValues
-    }.getMessage.contains(error))
+    assert(source2.existenceDefaultValues.size == 1)
+    assert(source2.existenceDefaultValues(0) == 2)
 
     // Negative test: StructType.defaultValues fails because the existence default value fails to
     // parse.
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
index 5debc1adacd..a8e4aad60c2 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
@@ -23,6 +23,9 @@ import java.util.Optional;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData;
+import org.apache.spark.sql.catalyst.util.GenericArrayData;
 import org.apache.spark.sql.internal.SQLConf;
 import org.apache.spark.sql.types.*;
 import org.apache.spark.sql.vectorized.ColumnVector;
@@ -739,6 +742,54 @@ public abstract class WritableColumnVector extends ColumnVector {
       }
       return Optional.of(result);
     }
+    if (value instanceof GenericArrayData) {
+      GenericArrayData arrayData = (GenericArrayData) value;
+      int result = 0;
+      for (int i = 0; i < length; ++i) {
+        appendArray(arrayData.numElements());
+        for (Object element : arrayData.array()) {
+          if (!arrayData().appendObjects(1, element).isPresent()) {
+            return Optional.empty();
+          }
+        }
+        result += arrayData.numElements();
+      }
+      return Optional.of(result);
+    }
+    if (value instanceof GenericInternalRow) {
+      GenericInternalRow row = (GenericInternalRow) value;
+      int result = 0;
+      for (int i = 0; i < length; ++i) {
+        appendStruct(false);
+        for (int j = 0; j < row.values().length; ++j) {
+          Object element = row.values()[j];
+          if (!childColumns[j].appendObjects(1, element).isPresent()) {
+            return Optional.empty();
+          }
+        }
+        result += row.values().length;
+      }
+      return Optional.of(result);
+    }
+    if (value instanceof ArrayBasedMapData) {
+      ArrayBasedMapData data = (ArrayBasedMapData) value;
+      appendArray(length);
+      int result = 0;
+      for (int i = 0; i < length; ++i) {
+        for (Object key : data.keyArray().array()) {
+          if (!childColumns[0].appendObjects(1, key).isPresent()) {
+            return Optional.empty();
+          }
+        }
+        for (Object val: data.valueArray().array()) {
+          if (!childColumns[1].appendObjects(1, val).isPresent()) {
+            return Optional.empty();
+          }
+        }
+        result += data.keyArray().numElements();
+      }
+      return Optional.of(result);
+    }
     return Optional.empty();
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 576611cade5..725141eeeeb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -1671,6 +1671,160 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
     }
   }
 
+  test("SPARK-39557 INSERT INTO statements with tables with array defaults") {
+    // Positive tests: array types are supported as default values.
+    case class TestCase(
+        dataSource: String,
+        insertNullsToStorage: Boolean = true)
+    Seq(
+      TestCase(
+        "parquet"),
+      TestCase(
+        "orc",
+        false)).foreach { testCase =>
+      val dataSource = testCase.dataSource
+      withTable("t") {
+        sql(s"create table t(i boolean) using $dataSource")
+        sql("insert into t select false")
+        sql("alter table t add column s array<int> default array(1, 2)")
+        checkAnswer(spark.table("t"), Row(false, Seq(1, 2)))
+      }
+    }
+    // Negative tests: provided array element types must match their corresponding DEFAULT
+    // declarations, if applicable.
+    val incompatibleDefault =
+    "Failed to execute ALTER TABLE ADD COLUMNS command because the destination table column s " +
+      "has a DEFAULT value with type"
+    withTable("t") {
+      sql("create table t(i boolean) using parquet")
+      sql("insert into t select false")
+      assert(intercept[AnalysisException] {
+        sql("alter table t add column s array<int> default array('abc', 'def')")
+      }.getMessage.contains(incompatibleDefault))
+    }
+  }
+
+  test("SPARK-39557 INSERT INTO statements with tables with struct defaults") {
+    // Positive tests: struct types are supported as default values.
+    case class TestCase(
+        dataSource: String,
+        insertNullsToStorage: Boolean = true)
+    Seq(
+      TestCase(
+        "parquet"),
+      TestCase(
+        "orc",
+        false)).foreach { testCase =>
+      val dataSource = testCase.dataSource
+      withTable("t") {
+        sql(s"create table t(i boolean) using $dataSource")
+        sql("insert into t select false")
+        sql("alter table t add column s struct<x boolean, y string> default struct(true, 'abc')")
+        checkAnswer(spark.table("t"), Row(false, Row(true, "abc")))
+      }
+    }
+    // Negative tests: provided map element types must match their corresponding DEFAULT
+    // declarations, if applicable.
+    val incompatibleDefault =
+    "Failed to execute ALTER TABLE ADD COLUMNS command because the destination table column s " +
+      "has a DEFAULT value with type"
+    withTable("t") {
+      sql("create table t(i boolean) using parquet")
+      sql("insert into t select false")
+      assert(intercept[AnalysisException] {
+        sql("alter table t add column s struct<x boolean, y string> default struct(42, 56)")
+      }.getMessage.contains(incompatibleDefault))
+    }
+  }
+
+  test("SPARK-39557 INSERT INTO statements with tables with map defaults") {
+    // Positive tests: map types are supported as default values.
+    case class TestCase(
+        dataSource: String,
+        insertNullsToStorage: Boolean = true)
+    Seq(
+      TestCase(
+        "parquet"),
+      TestCase(
+        "orc",
+        false)).foreach { testCase =>
+      val dataSource = testCase.dataSource
+      withTable("t") {
+        sql(s"create table t(i boolean) using $dataSource")
+        sql("insert into t select false")
+        sql("alter table t add column s map<boolean, string> default map(true, 'abc')")
+        checkAnswer(spark.table("t"), Row(false, Map(true -> "abc")))
+      }
+      withTable("t") {
+        sql(
+          s"""
+            create table t(
+              i int,
+              s struct<
+                x array<
+                  struct<a int, b int>>,
+              y array<
+                map<boolean, string>>>
+              default struct(
+                array(
+                  struct(1, 2)),
+                array(
+                  map(false, 'def', true, 'jkl'))))
+              using $dataSource""")
+        sql("insert into t select 1, default")
+        sql("alter table t alter column s drop default")
+        sql("insert into t select 2, default")
+        sql(
+          """
+            alter table t alter column s
+            set default struct(
+              array(
+                struct(3, 4)),
+              array(
+                map(false, 'mno', true, 'pqr')))""")
+        sql("insert into t select 3, default")
+        sql(
+          """
+            alter table t
+            add column t array<
+              map<boolean, string>>
+            default array(
+              map(true, 'xyz'))""")
+        sql("insert into t select 4, default")
+        checkAnswer(spark.table("t"),
+          Seq(
+            Row(1,
+              Row(Seq(Row(1, 2)), Seq(Map(false -> "def", true -> "jkl"))),
+              Seq(Map(true -> "xyz"))),
+            Row(2,
+              if (testCase.insertNullsToStorage) {
+                null
+              } else {
+                Row(Seq(Row(3, 4)), Seq(Map(false -> "mno", true -> "pqr")))
+              },
+              Seq(Map(true -> "xyz"))),
+            Row(3,
+              Row(Seq(Row(3, 4)), Seq(Map(false -> "mno", true -> "pqr"))),
+              Seq(Map(true -> "xyz"))),
+            Row(4,
+              Row(Seq(Row(3, 4)), Seq(Map(false -> "mno", true -> "pqr"))),
+              Seq(Map(true -> "xyz")))))
+      }
+    }
+    // Negative tests: provided map element types must match their corresponding DEFAULT
+    // declarations, if applicable.
+    val incompatibleDefault =
+    "Failed to execute ALTER TABLE ADD COLUMNS command because the destination table column s " +
+      "has a DEFAULT value with type"
+    withTable("t") {
+      sql("create table t(i boolean) using parquet")
+      sql("insert into t select false")
+      assert(intercept[AnalysisException] {
+        sql("alter table t add column s map<boolean, string> default map(42, 56)")
+      }.getMessage.contains(incompatibleDefault))
+    }
+  }
+
   test("SPARK-39643 Prohibit subquery expressions in DEFAULT values") {
     Seq(
       "create table t(a string default (select 'abc')) using parquet",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org