You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2023/06/21 14:34:44 UTC
[spark] branch master updated: [SPARK-43742][SQL][FOLLOWUP] Do not use null literal as default value for non-nullable columns
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 9b1e124b0bd [SPARK-43742][SQL][FOLLOWUP] Do not use null literal as default value for non-nullable columns
9b1e124b0bd is described below
commit 9b1e124b0bd0082e7ee13de56c4380783f816834
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Wed Jun 21 07:34:32 2023 -0700
[SPARK-43742][SQL][FOLLOWUP] Do not use null literal as default value for non-nullable columns
### What changes were proposed in this pull request?
A followup of https://github.com/apache/spark/pull/41262 to fix a mistake. If a column has no default value and is not nullable, we should fail if people want to use its default value via the explicit `DEFAULT` name, and do not fill missing columns in INSERT.
### Why are the changes needed?
fix a wrong behavior
### Does this PR introduce _any_ user-facing change?
yes, otherwise the DML command will fail later at runtime.
### How was this patch tested?
new tests
Closes #41656 from cloud-fan/def-val.
Lead-authored-by: Wenchen Fan <we...@databricks.com>
Co-authored-by: Wenchen Fan <cl...@gmail.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
core/src/main/resources/error/error-classes.json | 6 +
.../sql/catalyst/analysis/AssignmentUtils.scala | 3 +-
.../catalyst/analysis/TableOutputResolver.scala | 4 +-
.../catalyst/util/ResolveDefaultColumnsUtil.scala | 47 ++-
.../execution/command/PlanResolutionSuite.scala | 370 ++++++++++++---------
5 files changed, 262 insertions(+), 168 deletions(-)
diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json
index e35adcfbb5a..1d2f25b72f3 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -1777,6 +1777,12 @@
},
"sqlState" : "46110"
},
+ "NO_DEFAULT_COLUMN_VALUE_AVAILABLE" : {
+ "message" : [
+ "Can't determine the default value for <colName> since it is not nullable and it has no default value."
+ ],
+ "sqlState" : "42608"
+ },
"NO_HANDLER_FOR_UDAF" : {
"message" : [
"No handler for UDAF '<functionName>'. Use sparkSession.udf.register(...) instead."
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala
index 069cef6b361..fa953c90532 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala
@@ -104,7 +104,8 @@ object AssignmentUtils extends SQLConfHelper with CastSupport {
case assignment if assignment.key.semanticEquals(attr) => assignment
}
val resolvedValue = if (matchingAssignments.isEmpty) {
- val defaultExpr = getDefaultValueExprOrNullLit(attr, conf)
+ val defaultExpr = getDefaultValueExprOrNullLit(
+ attr, conf.useNullsForMissingDefaultColumnValues)
if (defaultExpr.isEmpty) {
errors += s"No assignment for '${attr.name}'"
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
index 3b721cf5d0d..6718020685b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
@@ -67,7 +67,7 @@ object TableOutputResolver {
val fillDefaultValue = supportColDefaultValue && actualExpectedCols.size > query.output.size
val queryOutputCols = if (fillDefaultValue) {
query.output ++ actualExpectedCols.drop(query.output.size).flatMap { expectedCol =>
- getDefaultValueExprOrNullLit(expectedCol, conf)
+ getDefaultValueExprOrNullLit(expectedCol, conf.useNullsForMissingDefaultColumnValues)
}
} else {
query.output
@@ -185,7 +185,7 @@ object TableOutputResolver {
val newColPath = colPath :+ expectedCol.name
if (matched.isEmpty) {
val defaultExpr = if (fillDefaultValue) {
- getDefaultValueExprOrNullLit(expectedCol, conf)
+ getDefaultValueExprOrNullLit(expectedCol, conf.useNullsForMissingDefaultColumnValues)
} else {
None
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
index 2169137685d..26efa8c8df2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION
import org.apache.spark.sql.connector.catalog.{CatalogManager, FunctionCatalog, Identifier, TableCatalog, TableCatalogCapability}
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
-import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.connector.V1Function
import org.apache.spark.sql.types._
@@ -41,7 +41,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
/**
* This object contains fields to help process DEFAULT columns.
*/
-object ResolveDefaultColumns {
+object ResolveDefaultColumns extends QueryErrorsBase {
// This column metadata indicates the default value associated with a particular table column that
// is in effect at any given time. Its value begins at the time of the initial CREATE/REPLACE
// TABLE statement with DEFAULT column definition(s), if any. It then changes whenever an ALTER
@@ -210,15 +210,23 @@ object ResolveDefaultColumns {
/**
* Generates the expression of the default value for the given field. If there is no
- * user-specified default value for this field, returns null literal.
+ * user-specified default value for this field and the field is nullable, returns null
+ * literal, otherwise an exception is thrown.
*/
def getDefaultValueExprOrNullLit(field: StructField): Expression = {
- getDefaultValueExprOpt(field).getOrElse(Literal(null, field.dataType))
+ val defaultValue = getDefaultValueExprOrNullLit(field, useNullAsDefault = true)
+ if (defaultValue.isEmpty) {
+ throw new AnalysisException(
+ errorClass = "NO_DEFAULT_COLUMN_VALUE_AVAILABLE",
+ messageParameters = Map("colName" -> toSQLId(Seq(field.name))))
+ }
+ defaultValue.get
}
/**
- * Generates the expression of the default value for the given column. If there is no
- * user-specified default value for this field, returns null literal.
+ * Generates the expression of the default value for the given attribute. If there is no
+ * user-specified default value for this attribute and the attribute is nullable, returns null
+ * literal, otherwise an exception is thrown.
*/
def getDefaultValueExprOrNullLit(attr: Attribute): Expression = {
val field = StructField(attr.name, attr.dataType, attr.nullable, attr.metadata)
@@ -226,19 +234,30 @@ object ResolveDefaultColumns {
}
/**
- * Generates the aliased expression of the default value for the given column. If there is no
- * user-specified default value for this column, returns a null literal or None w.r.t. the config
- * `USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES`.
+ * Generates the expression of the default value for the given field. If there is no
+ * user-specified default value for this field, returns null literal if `useNullAsDefault` is
+ * true and the field is nullable.
*/
- def getDefaultValueExprOrNullLit(attr: Attribute, conf: SQLConf): Option[NamedExpression] = {
- val field = StructField(attr.name, attr.dataType, attr.nullable, attr.metadata)
+ def getDefaultValueExprOrNullLit(
+ field: StructField, useNullAsDefault: Boolean): Option[NamedExpression] = {
getDefaultValueExprOpt(field).orElse {
- if (conf.useNullsForMissingDefaultColumnValues) {
- Some(Literal(null, attr.dataType))
+ if (useNullAsDefault && field.nullable) {
+ Some(Literal(null, field.dataType))
} else {
None
}
- }.map(expr => Alias(expr, attr.name)())
+ }.map(expr => Alias(expr, field.name)())
+ }
+
+ /**
+ * Generates the expression of the default value for the given attribute. If there is no
+ * user-specified default value for this attribute, returns null literal if `useNullAsDefault` is
+ * true and the attribute is nullable.
+ */
+ def getDefaultValueExprOrNullLit(
+ attr: Attribute, useNullAsDefault: Boolean): Option[NamedExpression] = {
+ val field = StructField(attr.name, attr.dataType, attr.nullable, attr.metadata)
+ getDefaultValueExprOrNullLit(field, useNullAsDefault)
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
index ee1c70a05d6..fc85ec40dbd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -71,8 +71,9 @@ class PlanResolutionSuite extends AnalysisTest {
private val table2: Table = {
val t = mock(classOf[Table])
when(t.columns()).thenReturn(
- Array(Column.create("i", IntegerType), Column.create("x", StringType)))
+ Array(Column.create("i", IntegerType), Column.create("x", StringType, false)))
when(t.partitioning()).thenReturn(Array.empty[Transform])
+ when(t.name()).thenReturn("tab2")
t
}
@@ -1024,12 +1025,6 @@ class PlanResolutionSuite extends AnalysisTest {
val sql5 = s"UPDATE $tblName SET name=DEFAULT, age=DEFAULT"
// Note: 'i' and 's' are the names of the columns in 'tblName'.
val sql6 = s"UPDATE $tblName SET i=DEFAULT, s=DEFAULT"
- val sql7 = s"UPDATE testcat.defaultvalues SET i=DEFAULT, s=DEFAULT"
- // UPDATE condition won't resolve column "DEFAULT"
- val sql8 = s"UPDATE testcat.defaultvalues SET i=DEFAULT, s=DEFAULT WHERE i=DEFAULT"
- val sql9 = s"UPDATE testcat.defaultvalues2 SET i=DEFAULT"
- // Table with ACCEPT_ANY_SCHEMA can also resolve the column DEFAULT.
- val sql10 = s"UPDATE testcat.v2TableWithAcceptAnySchemaCapability SET i=DEFAULT"
val parsed1 = parseAndResolve(sql1)
val parsed2 = parseAndResolve(sql2)
@@ -1037,9 +1032,6 @@ class PlanResolutionSuite extends AnalysisTest {
val parsed4 = parseAndResolve(sql4)
val parsed5 = parseAndResolve(sql5)
val parsed6 = parseAndResolve(sql6)
- val parsed7 = parseAndResolve(sql7)
- val parsed9 = parseAndResolve(sql9)
- val parsed10 = parseAndResolve(sql10)
parsed1 match {
case UpdateTable(
@@ -1125,50 +1117,6 @@ class PlanResolutionSuite extends AnalysisTest {
case _ => fail("Expect UpdateTable, but got:\n" + parsed6.treeString)
}
-
- parsed7 match {
- case UpdateTable(
- _,
- Seq(
- Assignment(i: AttributeReference, Literal(true, BooleanType)),
- Assignment(s: AttributeReference, Literal(42, IntegerType))),
- None) =>
- assert(i.name == "i")
- assert(s.name == "s")
-
- case _ => fail("Expect UpdateTable, but got:\n" + parsed7.treeString)
- }
-
- checkError(
- exception = intercept[AnalysisException] {
- parseAndResolve(sql8, checkAnalysis = true)
- },
- errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
- parameters = Map("objectName" -> "`DEFAULT`", "proposal" -> "`i`, `s`"),
- context = ExpectedContext(
- fragment = "DEFAULT",
- start = 62,
- stop = 68))
-
- parsed9 match {
- case UpdateTable(
- _,
- Seq(Assignment(i: AttributeReference, Literal(null, StringType))),
- None) =>
- assert(i.name == "i")
-
- case _ => fail("Expect UpdateTable, but got:\n" + parsed9.treeString)
- }
-
- parsed10 match {
- case UpdateTable(
- _,
- Seq(Assignment(i: AttributeReference, Literal(null, IntegerType))),
- None) =>
- assert(i.name == "i")
-
- case _ => fail("Expect UpdateTable, but got:\n" + parsed10.treeString)
- }
}
val sql1 = "UPDATE non_existing SET id=1"
@@ -1201,6 +1149,71 @@ class PlanResolutionSuite extends AnalysisTest {
}
case _ => fail("Expect UpdateTable, but got:\n" + parsed2.treeString)
}
+
+ val sql3 = "UPDATE testcat.defaultvalues SET i=DEFAULT, s=DEFAULT"
+ val sql4 = "UPDATE testcat.defaultvalues2 SET i=DEFAULT"
+ // Table with ACCEPT_ANY_SCHEMA can also resolve the column DEFAULT.
+ val sql5 = "UPDATE testcat.v2TableWithAcceptAnySchemaCapability SET i=DEFAULT"
+
+ val parsed3 = parseAndResolve(sql3)
+ val parsed4 = parseAndResolve(sql4)
+ val parsed5 = parseAndResolve(sql5)
+
+ parsed3 match {
+ case UpdateTable(
+ _,
+ Seq(
+ Assignment(i: AttributeReference, Literal(true, BooleanType)),
+ Assignment(s: AttributeReference, Literal(42, IntegerType))),
+ None) =>
+ assert(i.name == "i")
+ assert(s.name == "s")
+
+ case _ => fail("Expect UpdateTable, but got:\n" + parsed3.treeString)
+ }
+
+ parsed4 match {
+ case UpdateTable(
+ _,
+ Seq(Assignment(i: AttributeReference, Literal(null, StringType))),
+ None) =>
+ assert(i.name == "i")
+
+ case _ => fail("Expect UpdateTable, but got:\n" + parsed4.treeString)
+ }
+
+ parsed5 match {
+ case UpdateTable(
+ _,
+ Seq(Assignment(i: AttributeReference, Literal(null, IntegerType))),
+ None) =>
+ assert(i.name == "i")
+
+ case _ => fail("Expect UpdateTable, but got:\n" + parsed5.treeString)
+ }
+
+ // Negative cases.
+ // UPDATE condition won't resolve column "DEFAULT"
+ val sql6 = "UPDATE testcat.defaultvalues SET i=DEFAULT, s=DEFAULT WHERE i=DEFAULT"
+ checkError(
+ exception = intercept[AnalysisException] {
+ parseAndResolve(sql6, checkAnalysis = true)
+ },
+ errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+ parameters = Map("objectName" -> "`DEFAULT`", "proposal" -> "`i`, `s`"),
+ context = ExpectedContext(
+ fragment = "DEFAULT",
+ start = 62,
+ stop = 68))
+
+ val sql7 = "UPDATE testcat.tab2 SET x=DEFAULT"
+ checkError(
+ exception = intercept[AnalysisException] {
+ parseAndResolve(sql7, checkAnalysis = true)
+ },
+ errorClass = "NO_DEFAULT_COLUMN_VALUE_AVAILABLE",
+ parameters = Map("colName" -> "`x`")
+ )
}
test("SPARK-38869 INSERT INTO table with ACCEPT_ANY_SCHEMA capability") {
@@ -1228,6 +1241,41 @@ class PlanResolutionSuite extends AnalysisTest {
}
}
+ test("INSERT INTO table with default column value") {
+ val sql1 = "INSERT INTO testcat.defaultvalues VALUES (DEFAULT, DEFAULT)"
+ parseAndResolve(sql1) match {
+ // The top-most Project just adds aliases.
+ case AppendData(_: DataSourceV2Relation, Project(_, l: LocalRelation), _, _, _, _) =>
+ assert(l.data.length == 1)
+ val row = l.data.head
+ assert(row.numFields == 2)
+ assert(row.getBoolean(0) == true)
+ assert(row.getInt(1) == 42)
+ case other => fail("Expected AppendData, but got:\n" + other.treeString)
+ }
+
+ val sql2 = "INSERT INTO testcat.tab2 VALUES (1, DEFAULT)"
+ checkError(
+ exception = intercept[AnalysisException] {
+ parseAndResolve(sql2, checkAnalysis = true)
+ },
+ errorClass = "NO_DEFAULT_COLUMN_VALUE_AVAILABLE",
+ parameters = Map("colName" -> "`x`")
+ )
+
+ val sql3 = "INSERT INTO testcat.tab2 VALUES (1)"
+ checkError(
+ exception = intercept[AnalysisException] {
+ parseAndResolve(sql3, checkAnalysis = true)
+ },
+ errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
+ parameters = Map(
+ "tableName" -> "`tab2`",
+ "tableColumns" -> "`i`, `x`",
+ "dataColumns" -> "`col1`")
+ )
+ }
+
test("InsertIntoStatement byName") {
val tblName = "testcat.tab1"
val insertSql = s"INSERT INTO $tblName(i, s) VALUES (3, 'a')"
@@ -1789,90 +1837,6 @@ class PlanResolutionSuite extends AnalysisTest {
case other =>
fail("Expect MergeIntoTable, but got:\n" + other.treeString)
}
-
- // DEFAULT column reference in the merge condition:
- // This MERGE INTO command includes an ON clause with a DEFAULT column reference. This
- // DEFAULT column won't be resolved.
- val mergeWithDefaultReferenceInMergeCondition =
- s"""MERGE INTO testcat.tab AS target
- |USING testcat.tab1 AS source
- |ON target.i = DEFAULT
- |WHEN MATCHED AND (target.s = 31) THEN DELETE
- |WHEN MATCHED AND (target.s = 31)
- | THEN UPDATE SET target.s = DEFAULT
- |WHEN NOT MATCHED AND (source.s='insert')
- | THEN INSERT (target.i, target.s) values (DEFAULT, DEFAULT)
- |WHEN NOT MATCHED BY SOURCE AND (target.s = 31) THEN DELETE
- |WHEN NOT MATCHED BY SOURCE AND (target.s = 31)
- | THEN UPDATE SET target.s = DEFAULT""".stripMargin
- checkError(
- exception = intercept[AnalysisException] {
- parseAndResolve(mergeWithDefaultReferenceInMergeCondition, checkAnalysis = true)
- },
- errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
- parameters = Map("objectName" -> "`DEFAULT`",
- "proposal" -> "`target`.`i`, `source`.`i`, `target`.`s`, `source`.`s`"),
- context = ExpectedContext(
- fragment = "DEFAULT",
- start = 76,
- stop = 82))
-
- // DEFAULT column reference within a complex expression:
- // This MERGE INTO command includes a WHEN MATCHED clause with a DEFAULT column reference as
- // of a complex expression (DEFAULT + 1). This is invalid and column won't be resolved.
- val mergeWithDefaultReferenceAsPartOfComplexExpression =
- s"""MERGE INTO testcat.tab AS target
- |USING testcat.tab1 AS source
- |ON target.i = source.i
- |WHEN MATCHED AND (target.s = 31) THEN DELETE
- |WHEN MATCHED AND (target.s = 31)
- | THEN UPDATE SET target.s = DEFAULT + 1
- |WHEN NOT MATCHED AND (source.s='insert')
- | THEN INSERT (target.i, target.s) values (DEFAULT, DEFAULT)
- |WHEN NOT MATCHED BY SOURCE AND (target.s = 31) THEN DELETE
- |WHEN NOT MATCHED BY SOURCE AND (target.s = 31)
- | THEN UPDATE SET target.s = DEFAULT + 1""".stripMargin
- checkError(
- exception = intercept[AnalysisException] {
- parseAndResolve(mergeWithDefaultReferenceAsPartOfComplexExpression)
- },
- errorClass = "_LEGACY_ERROR_TEMP_1343",
- parameters = Map.empty)
-
- // Ambiguous DEFAULT column reference when the table itself contains a column named
- // "DEFAULT".
- val mergeIntoTableWithColumnNamedDefault =
- s"""
- |MERGE INTO testcat.tablewithcolumnnameddefault AS target
- |USING testcat.tab1 AS source
- |ON default = source.i
- |WHEN MATCHED AND (target.s = 32) THEN DELETE
- |WHEN MATCHED AND (target.s = 32)
- | THEN UPDATE SET target.s = DEFAULT
- |WHEN NOT MATCHED AND (source.s='insert')
- | THEN INSERT (target.s) values (DEFAULT)
- |WHEN NOT MATCHED BY SOURCE AND (target.s = 32) THEN DELETE
- |WHEN NOT MATCHED BY SOURCE AND (target.s = 32)
- | THEN UPDATE SET target.s = DEFAULT
- """.stripMargin
- parseAndResolve(mergeIntoTableWithColumnNamedDefault, withDefault = true) match {
- case m: MergeIntoTable =>
- val target = m.targetTable
- val d = target.output.find(_.name == "default").get.asInstanceOf[AttributeReference]
- m.mergeCondition match {
- case EqualTo(Cast(l: AttributeReference, _, _, _), _) =>
- assert(l.sameRef(d))
- case Literal(_, BooleanType) => // this is acceptable as a merge condition
- case other =>
- fail("unexpected merge condition " + other)
- }
- assert(m.matchedActions.length == 2)
- assert(m.notMatchedActions.length == 1)
- assert(m.notMatchedBySourceActions.length == 2)
-
- case other =>
- fail("Expect MergeIntoTable, but got:\n" + other.treeString)
- }
}
// DEFAULT columns (explicit):
@@ -1880,19 +1844,19 @@ class PlanResolutionSuite extends AnalysisTest {
// DEFAULT column references in the below MERGE INTO command should resolve to the corresponding
// values. This test case covers that behavior.
val mergeDefaultWithExplicitDefaultColumns =
- s"""
- |MERGE INTO testcat.defaultvalues AS target
- |USING testcat.tab1 AS source
- |ON target.i = source.i
- |WHEN MATCHED AND (target.s = 31) THEN DELETE
- |WHEN MATCHED AND (target.s = 31)
- | THEN UPDATE SET target.s = DEFAULT
- |WHEN NOT MATCHED AND (source.s='insert')
- | THEN INSERT (target.i, target.s) values (DEFAULT, DEFAULT)
- |WHEN NOT MATCHED BY SOURCE AND (target.s = 31) THEN DELETE
- |WHEN NOT MATCHED BY SOURCE AND (target.s = 31)
- | THEN UPDATE SET target.s = DEFAULT
- """.stripMargin
+ """
+ |MERGE INTO testcat.defaultvalues AS target
+ |USING testcat.tab1 AS source
+ |ON target.i = source.i
+ |WHEN MATCHED AND (target.s = 31) THEN DELETE
+ |WHEN MATCHED AND (target.s = 31)
+ | THEN UPDATE SET target.s = DEFAULT
+ |WHEN NOT MATCHED AND (source.s='insert')
+ | THEN INSERT (target.i, target.s) values (DEFAULT, DEFAULT)
+ |WHEN NOT MATCHED BY SOURCE AND (target.s = 31) THEN DELETE
+ |WHEN NOT MATCHED BY SOURCE AND (target.s = 31)
+ | THEN UPDATE SET target.s = DEFAULT
+ |""".stripMargin
parseAndResolve(mergeDefaultWithExplicitDefaultColumns) match {
case m: MergeIntoTable =>
val cond = m.mergeCondition
@@ -1942,6 +1906,110 @@ class PlanResolutionSuite extends AnalysisTest {
fail("Expect MergeIntoTable, but got:\n" + other.treeString)
}
+ // DEFAULT column reference in the merge condition:
+ // This MERGE INTO command includes an ON clause with a DEFAULT column reference. This
+ // DEFAULT column won't be resolved.
+ val mergeWithDefaultReferenceInMergeCondition =
+ """
+ |MERGE INTO testcat.tab AS target
+ |USING testcat.tab1 AS source
+ |ON target.i = DEFAULT
+ |WHEN MATCHED AND (target.s = 31) THEN DELETE
+ |WHEN MATCHED AND (target.s = 31)
+ | THEN UPDATE SET target.s = DEFAULT
+ |WHEN NOT MATCHED AND (source.s='insert')
+ | THEN INSERT (target.i, target.s) values (DEFAULT, DEFAULT)
+ |WHEN NOT MATCHED BY SOURCE AND (target.s = 31) THEN DELETE
+ |WHEN NOT MATCHED BY SOURCE AND (target.s = 31)
+ | THEN UPDATE SET target.s = DEFAULT
+ |""".stripMargin
+ checkError(
+ exception = intercept[AnalysisException] {
+ parseAndResolve(mergeWithDefaultReferenceInMergeCondition, checkAnalysis = true)
+ },
+ errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+ parameters = Map("objectName" -> "`DEFAULT`",
+ "proposal" -> "`target`.`i`, `source`.`i`, `target`.`s`, `source`.`s`"),
+ context = ExpectedContext(
+ fragment = "DEFAULT",
+ start = 77,
+ stop = 83))
+
+ // DEFAULT column reference within a complex expression:
+ // This MERGE INTO command includes a WHEN MATCHED clause with a DEFAULT column reference as
+ // of a complex expression (DEFAULT + 1). This is invalid and column won't be resolved.
+ val mergeWithDefaultReferenceAsPartOfComplexExpression =
+ """
+ |MERGE INTO testcat.tab AS target
+ |USING testcat.tab1 AS source
+ |ON target.i = source.i
+ |WHEN MATCHED AND (target.s = 31) THEN DELETE
+ |WHEN MATCHED AND (target.s = 31)
+ | THEN UPDATE SET target.s = DEFAULT + 1
+ |WHEN NOT MATCHED AND (source.s='insert')
+ | THEN INSERT (target.i, target.s) values (DEFAULT, DEFAULT)
+ |WHEN NOT MATCHED BY SOURCE AND (target.s = 31) THEN DELETE
+ |WHEN NOT MATCHED BY SOURCE AND (target.s = 31)
+ | THEN UPDATE SET target.s = DEFAULT + 1
+ |""".stripMargin
+ checkError(
+ exception = intercept[AnalysisException] {
+ parseAndResolve(mergeWithDefaultReferenceAsPartOfComplexExpression)
+ },
+ errorClass = "_LEGACY_ERROR_TEMP_1343",
+ parameters = Map.empty)
+
+ val mergeWithDefaultReferenceForNonNullableCol =
+ """
+ |MERGE INTO testcat.tab2 AS target
+ |USING testcat.tab1 AS source
+ |ON target.i = source.i
+ |WHEN NOT MATCHED AND (source.s = 'insert')
+ | THEN INSERT (target.i, target.x) VALUES (1, DEFAULT)
+ |""".stripMargin
+ checkError(
+ exception = intercept[AnalysisException] {
+ parseAndResolve(mergeWithDefaultReferenceForNonNullableCol)
+ },
+ errorClass = "NO_DEFAULT_COLUMN_VALUE_AVAILABLE",
+ parameters = Map("colName" -> "`x`")
+ )
+
+ // Ambiguous DEFAULT column reference when the table itself contains a column named
+ // "DEFAULT".
+ val mergeIntoTableWithColumnNamedDefault =
+ """
+ |MERGE INTO testcat.tablewithcolumnnameddefault AS target
+ |USING testcat.tab1 AS source
+ |ON default = source.i
+ |WHEN MATCHED AND (target.s = 32) THEN DELETE
+ |WHEN MATCHED AND (target.s = 32)
+ | THEN UPDATE SET target.s = DEFAULT
+ |WHEN NOT MATCHED AND (source.s='insert')
+ | THEN INSERT (target.s) values (DEFAULT)
+ |WHEN NOT MATCHED BY SOURCE AND (target.s = 32) THEN DELETE
+ |WHEN NOT MATCHED BY SOURCE AND (target.s = 32)
+ | THEN UPDATE SET target.s = DEFAULT
+ |""".stripMargin
+ parseAndResolve(mergeIntoTableWithColumnNamedDefault, withDefault = true) match {
+ case m: MergeIntoTable =>
+ val target = m.targetTable
+ val d = target.output.find(_.name == "default").get.asInstanceOf[AttributeReference]
+ m.mergeCondition match {
+ case EqualTo(Cast(l: AttributeReference, _, _, _), _) =>
+ assert(l.sameRef(d))
+ case Literal(_, BooleanType) => // this is acceptable as a merge condition
+ case other =>
+ fail("unexpected merge condition " + other)
+ }
+ assert(m.matchedActions.length == 2)
+ assert(m.notMatchedActions.length == 1)
+ assert(m.notMatchedBySourceActions.length == 2)
+
+ case other =>
+ fail("Expect MergeIntoTable, but got:\n" + other.treeString)
+ }
+
// no aliases
Seq(("v2Table", "v2Table1"), ("testcat.tab", "testcat.tab1")).foreach { pair =>
def referenceNames(target: String, column: String): String = target match {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org