You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2023/06/21 14:34:44 UTC

[spark] branch master updated: [SPARK-43742][SQL][FOLLOWUP] Do not use null literal as default value for non-nullable columns

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b1e124b0bd [SPARK-43742][SQL][FOLLOWUP] Do not use null literal as default value for non-nullable columns
9b1e124b0bd is described below

commit 9b1e124b0bd0082e7ee13de56c4380783f816834
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Wed Jun 21 07:34:32 2023 -0700

    [SPARK-43742][SQL][FOLLOWUP] Do not use null literal as default value for non-nullable columns
    
    ### What changes were proposed in this pull request?
    
    A followup of https://github.com/apache/spark/pull/41262 to fix a mistake. If a column has no default value and is not nullable, we should fail if people want to use its default value via the explicit `DEFAULT` name, and do not fill missing columns in INSERT.
    
    ### Why are the changes needed?
    
    fix a wrong behavior
    
    ### Does this PR introduce _any_ user-facing change?
    
    yes, otherwise the DML command will fail later at runtime.
    
    ### How was this patch tested?
    
    new tests
    
    Closes #41656 from cloud-fan/def-val.
    
    Lead-authored-by: Wenchen Fan <we...@databricks.com>
    Co-authored-by: Wenchen Fan <cl...@gmail.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 core/src/main/resources/error/error-classes.json   |   6 +
 .../sql/catalyst/analysis/AssignmentUtils.scala    |   3 +-
 .../catalyst/analysis/TableOutputResolver.scala    |   4 +-
 .../catalyst/util/ResolveDefaultColumnsUtil.scala  |  47 ++-
 .../execution/command/PlanResolutionSuite.scala    | 370 ++++++++++++---------
 5 files changed, 262 insertions(+), 168 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json
index e35adcfbb5a..1d2f25b72f3 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -1777,6 +1777,12 @@
     },
     "sqlState" : "46110"
   },
+  "NO_DEFAULT_COLUMN_VALUE_AVAILABLE" : {
+    "message" : [
+      "Can't determine the default value for <colName> since it is not nullable and it has no default value."
+    ],
+    "sqlState" : "42608"
+  },
   "NO_HANDLER_FOR_UDAF" : {
     "message" : [
       "No handler for UDAF '<functionName>'. Use sparkSession.udf.register(...) instead."
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala
index 069cef6b361..fa953c90532 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala
@@ -104,7 +104,8 @@ object AssignmentUtils extends SQLConfHelper with CastSupport {
         case assignment if assignment.key.semanticEquals(attr) => assignment
       }
       val resolvedValue = if (matchingAssignments.isEmpty) {
-        val defaultExpr = getDefaultValueExprOrNullLit(attr, conf)
+        val defaultExpr = getDefaultValueExprOrNullLit(
+          attr, conf.useNullsForMissingDefaultColumnValues)
         if (defaultExpr.isEmpty) {
           errors += s"No assignment for '${attr.name}'"
         }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
index 3b721cf5d0d..6718020685b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
@@ -67,7 +67,7 @@ object TableOutputResolver {
       val fillDefaultValue = supportColDefaultValue && actualExpectedCols.size > query.output.size
       val queryOutputCols = if (fillDefaultValue) {
         query.output ++ actualExpectedCols.drop(query.output.size).flatMap { expectedCol =>
-          getDefaultValueExprOrNullLit(expectedCol, conf)
+          getDefaultValueExprOrNullLit(expectedCol, conf.useNullsForMissingDefaultColumnValues)
         }
       } else {
         query.output
@@ -185,7 +185,7 @@ object TableOutputResolver {
       val newColPath = colPath :+ expectedCol.name
       if (matched.isEmpty) {
         val defaultExpr = if (fillDefaultValue) {
-          getDefaultValueExprOrNullLit(expectedCol, conf)
+          getDefaultValueExprOrNullLit(expectedCol, conf.useNullsForMissingDefaultColumnValues)
         } else {
           None
         }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
index 2169137685d..26efa8c8df2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION
 import org.apache.spark.sql.connector.catalog.{CatalogManager, FunctionCatalog, Identifier, TableCatalog, TableCatalogCapability}
 import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
-import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.connector.V1Function
 import org.apache.spark.sql.types._
@@ -41,7 +41,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
 /**
  * This object contains fields to help process DEFAULT columns.
  */
-object ResolveDefaultColumns {
+object ResolveDefaultColumns extends QueryErrorsBase {
   // This column metadata indicates the default value associated with a particular table column that
   // is in effect at any given time. Its value begins at the time of the initial CREATE/REPLACE
   // TABLE statement with DEFAULT column definition(s), if any. It then changes whenever an ALTER
@@ -210,15 +210,23 @@ object ResolveDefaultColumns {
 
   /**
    * Generates the expression of the default value for the given field. If there is no
-   * user-specified default value for this field, returns null literal.
+   * user-specified default value for this field and the field is nullable, returns null
+   * literal, otherwise an exception is thrown.
    */
   def getDefaultValueExprOrNullLit(field: StructField): Expression = {
-    getDefaultValueExprOpt(field).getOrElse(Literal(null, field.dataType))
+    val defaultValue = getDefaultValueExprOrNullLit(field, useNullAsDefault = true)
+    if (defaultValue.isEmpty) {
+      throw new AnalysisException(
+        errorClass = "NO_DEFAULT_COLUMN_VALUE_AVAILABLE",
+        messageParameters = Map("colName" -> toSQLId(Seq(field.name))))
+    }
+    defaultValue.get
   }
 
   /**
-   * Generates the expression of the default value for the given column. If there is no
-   * user-specified default value for this field, returns null literal.
+   * Generates the expression of the default value for the given attribute. If there is no
+   * user-specified default value for this attribute and the attribute is nullable, returns null
+   * literal, otherwise an exception is thrown.
    */
   def getDefaultValueExprOrNullLit(attr: Attribute): Expression = {
     val field = StructField(attr.name, attr.dataType, attr.nullable, attr.metadata)
@@ -226,19 +234,30 @@ object ResolveDefaultColumns {
   }
 
   /**
-   * Generates the aliased expression of the default value for the given column. If there is no
-   * user-specified default value for this column, returns a null literal or None w.r.t. the config
-   * `USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES`.
+   * Generates the expression of the default value for the given field. If there is no
+   * user-specified default value for this field, returns null literal if `useNullAsDefault` is
+   * true and the field is nullable.
    */
-  def getDefaultValueExprOrNullLit(attr: Attribute, conf: SQLConf): Option[NamedExpression] = {
-    val field = StructField(attr.name, attr.dataType, attr.nullable, attr.metadata)
+  def getDefaultValueExprOrNullLit(
+      field: StructField, useNullAsDefault: Boolean): Option[NamedExpression] = {
     getDefaultValueExprOpt(field).orElse {
-      if (conf.useNullsForMissingDefaultColumnValues) {
-        Some(Literal(null, attr.dataType))
+      if (useNullAsDefault && field.nullable) {
+        Some(Literal(null, field.dataType))
       } else {
         None
       }
-    }.map(expr => Alias(expr, attr.name)())
+    }.map(expr => Alias(expr, field.name)())
+  }
+
+  /**
+   * Generates the expression of the default value for the given attribute. If there is no
+   * user-specified default value for this attribute, returns null literal if `useNullAsDefault` is
+   * true and the attribute is nullable.
+   */
+  def getDefaultValueExprOrNullLit(
+      attr: Attribute, useNullAsDefault: Boolean): Option[NamedExpression] = {
+    val field = StructField(attr.name, attr.dataType, attr.nullable, attr.metadata)
+    getDefaultValueExprOrNullLit(field, useNullAsDefault)
   }
 
   /**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
index ee1c70a05d6..fc85ec40dbd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -71,8 +71,9 @@ class PlanResolutionSuite extends AnalysisTest {
   private val table2: Table = {
     val t = mock(classOf[Table])
     when(t.columns()).thenReturn(
-      Array(Column.create("i", IntegerType), Column.create("x", StringType)))
+      Array(Column.create("i", IntegerType), Column.create("x", StringType, false)))
     when(t.partitioning()).thenReturn(Array.empty[Transform])
+    when(t.name()).thenReturn("tab2")
     t
   }
 
@@ -1024,12 +1025,6 @@ class PlanResolutionSuite extends AnalysisTest {
       val sql5 = s"UPDATE $tblName SET name=DEFAULT, age=DEFAULT"
       // Note: 'i' and 's' are the names of the columns in 'tblName'.
       val sql6 = s"UPDATE $tblName SET i=DEFAULT, s=DEFAULT"
-      val sql7 = s"UPDATE testcat.defaultvalues SET i=DEFAULT, s=DEFAULT"
-      // UPDATE condition won't resolve column "DEFAULT"
-      val sql8 = s"UPDATE testcat.defaultvalues SET i=DEFAULT, s=DEFAULT WHERE i=DEFAULT"
-      val sql9 = s"UPDATE testcat.defaultvalues2 SET i=DEFAULT"
-      // Table with ACCEPT_ANY_SCHEMA can also resolve the column DEFAULT.
-      val sql10 = s"UPDATE testcat.v2TableWithAcceptAnySchemaCapability SET i=DEFAULT"
 
       val parsed1 = parseAndResolve(sql1)
       val parsed2 = parseAndResolve(sql2)
@@ -1037,9 +1032,6 @@ class PlanResolutionSuite extends AnalysisTest {
       val parsed4 = parseAndResolve(sql4)
       val parsed5 = parseAndResolve(sql5)
       val parsed6 = parseAndResolve(sql6)
-      val parsed7 = parseAndResolve(sql7)
-      val parsed9 = parseAndResolve(sql9)
-      val parsed10 = parseAndResolve(sql10)
 
       parsed1 match {
         case UpdateTable(
@@ -1125,50 +1117,6 @@ class PlanResolutionSuite extends AnalysisTest {
 
         case _ => fail("Expect UpdateTable, but got:\n" + parsed6.treeString)
       }
-
-      parsed7 match {
-        case UpdateTable(
-          _,
-          Seq(
-            Assignment(i: AttributeReference, Literal(true, BooleanType)),
-            Assignment(s: AttributeReference, Literal(42, IntegerType))),
-          None) =>
-          assert(i.name == "i")
-          assert(s.name == "s")
-
-        case _ => fail("Expect UpdateTable, but got:\n" + parsed7.treeString)
-      }
-
-      checkError(
-        exception = intercept[AnalysisException] {
-          parseAndResolve(sql8, checkAnalysis = true)
-        },
-        errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
-        parameters = Map("objectName" -> "`DEFAULT`", "proposal" -> "`i`, `s`"),
-        context = ExpectedContext(
-          fragment = "DEFAULT",
-          start = 62,
-          stop = 68))
-
-      parsed9 match {
-        case UpdateTable(
-          _,
-          Seq(Assignment(i: AttributeReference, Literal(null, StringType))),
-          None) =>
-          assert(i.name == "i")
-
-        case _ => fail("Expect UpdateTable, but got:\n" + parsed9.treeString)
-      }
-
-      parsed10 match {
-        case UpdateTable(
-          _,
-          Seq(Assignment(i: AttributeReference, Literal(null, IntegerType))),
-          None) =>
-          assert(i.name == "i")
-
-        case _ => fail("Expect UpdateTable, but got:\n" + parsed10.treeString)
-      }
     }
 
     val sql1 = "UPDATE non_existing SET id=1"
@@ -1201,6 +1149,71 @@ class PlanResolutionSuite extends AnalysisTest {
         }
       case _ => fail("Expect UpdateTable, but got:\n" + parsed2.treeString)
     }
+
+    val sql3 = "UPDATE testcat.defaultvalues SET i=DEFAULT, s=DEFAULT"
+    val sql4 = "UPDATE testcat.defaultvalues2 SET i=DEFAULT"
+    // Table with ACCEPT_ANY_SCHEMA can also resolve the column DEFAULT.
+    val sql5 = "UPDATE testcat.v2TableWithAcceptAnySchemaCapability SET i=DEFAULT"
+
+    val parsed3 = parseAndResolve(sql3)
+    val parsed4 = parseAndResolve(sql4)
+    val parsed5 = parseAndResolve(sql5)
+
+    parsed3 match {
+      case UpdateTable(
+      _,
+      Seq(
+      Assignment(i: AttributeReference, Literal(true, BooleanType)),
+      Assignment(s: AttributeReference, Literal(42, IntegerType))),
+      None) =>
+        assert(i.name == "i")
+        assert(s.name == "s")
+
+      case _ => fail("Expect UpdateTable, but got:\n" + parsed3.treeString)
+    }
+
+    parsed4 match {
+      case UpdateTable(
+      _,
+      Seq(Assignment(i: AttributeReference, Literal(null, StringType))),
+      None) =>
+        assert(i.name == "i")
+
+      case _ => fail("Expect UpdateTable, but got:\n" + parsed4.treeString)
+    }
+
+    parsed5 match {
+      case UpdateTable(
+      _,
+      Seq(Assignment(i: AttributeReference, Literal(null, IntegerType))),
+      None) =>
+        assert(i.name == "i")
+
+      case _ => fail("Expect UpdateTable, but got:\n" + parsed5.treeString)
+    }
+
+    // Negative cases.
+    // UPDATE condition won't resolve column "DEFAULT"
+    val sql6 = "UPDATE testcat.defaultvalues SET i=DEFAULT, s=DEFAULT WHERE i=DEFAULT"
+    checkError(
+      exception = intercept[AnalysisException] {
+        parseAndResolve(sql6, checkAnalysis = true)
+      },
+      errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+      parameters = Map("objectName" -> "`DEFAULT`", "proposal" -> "`i`, `s`"),
+      context = ExpectedContext(
+        fragment = "DEFAULT",
+        start = 62,
+        stop = 68))
+
+    val sql7 = "UPDATE testcat.tab2 SET x=DEFAULT"
+    checkError(
+      exception = intercept[AnalysisException] {
+        parseAndResolve(sql7, checkAnalysis = true)
+      },
+      errorClass = "NO_DEFAULT_COLUMN_VALUE_AVAILABLE",
+      parameters = Map("colName" -> "`x`")
+    )
   }
 
   test("SPARK-38869 INSERT INTO table with ACCEPT_ANY_SCHEMA capability") {
@@ -1228,6 +1241,41 @@ class PlanResolutionSuite extends AnalysisTest {
     }
   }
 
+  test("INSERT INTO table with default column value") {
+    val sql1 = "INSERT INTO testcat.defaultvalues VALUES (DEFAULT, DEFAULT)"
+    parseAndResolve(sql1) match {
+      // The top-most Project just adds aliases.
+      case AppendData(_: DataSourceV2Relation, Project(_, l: LocalRelation), _, _, _, _) =>
+        assert(l.data.length == 1)
+        val row = l.data.head
+        assert(row.numFields == 2)
+        assert(row.getBoolean(0) == true)
+        assert(row.getInt(1) == 42)
+      case other => fail("Expected AppendData, but got:\n" + other.treeString)
+    }
+
+    val sql2 = "INSERT INTO testcat.tab2 VALUES (1, DEFAULT)"
+    checkError(
+      exception = intercept[AnalysisException] {
+        parseAndResolve(sql2, checkAnalysis = true)
+      },
+      errorClass = "NO_DEFAULT_COLUMN_VALUE_AVAILABLE",
+      parameters = Map("colName" -> "`x`")
+    )
+
+    val sql3 = "INSERT INTO testcat.tab2 VALUES (1)"
+    checkError(
+      exception = intercept[AnalysisException] {
+        parseAndResolve(sql3, checkAnalysis = true)
+      },
+      errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
+      parameters = Map(
+        "tableName" -> "`tab2`",
+        "tableColumns" -> "`i`, `x`",
+        "dataColumns" -> "`col1`")
+    )
+  }
+
   test("InsertIntoStatement byName") {
     val tblName = "testcat.tab1"
     val insertSql = s"INSERT INTO $tblName(i, s) VALUES (3, 'a')"
@@ -1789,90 +1837,6 @@ class PlanResolutionSuite extends AnalysisTest {
           case other =>
             fail("Expect MergeIntoTable, but got:\n" + other.treeString)
         }
-
-        // DEFAULT column reference in the merge condition:
-        // This MERGE INTO command includes an ON clause with a DEFAULT column reference. This
-        // DEFAULT column won't be resolved.
-        val mergeWithDefaultReferenceInMergeCondition =
-          s"""MERGE INTO testcat.tab AS target
-             |USING testcat.tab1 AS source
-             |ON target.i = DEFAULT
-             |WHEN MATCHED AND (target.s = 31) THEN DELETE
-             |WHEN MATCHED AND (target.s = 31)
-             |  THEN UPDATE SET target.s = DEFAULT
-             |WHEN NOT MATCHED AND (source.s='insert')
-             |  THEN INSERT (target.i, target.s) values (DEFAULT, DEFAULT)
-             |WHEN NOT MATCHED BY SOURCE AND (target.s = 31) THEN DELETE
-             |WHEN NOT MATCHED BY SOURCE AND (target.s = 31)
-             |  THEN UPDATE SET target.s = DEFAULT""".stripMargin
-        checkError(
-          exception = intercept[AnalysisException] {
-            parseAndResolve(mergeWithDefaultReferenceInMergeCondition, checkAnalysis = true)
-          },
-          errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
-          parameters = Map("objectName" -> "`DEFAULT`",
-            "proposal" -> "`target`.`i`, `source`.`i`, `target`.`s`, `source`.`s`"),
-          context = ExpectedContext(
-            fragment = "DEFAULT",
-            start = 76,
-            stop = 82))
-
-        // DEFAULT column reference within a complex expression:
-        // This MERGE INTO command includes a WHEN MATCHED clause with a DEFAULT column reference as
-        // of a complex expression (DEFAULT + 1). This is invalid and column won't be resolved.
-        val mergeWithDefaultReferenceAsPartOfComplexExpression =
-          s"""MERGE INTO testcat.tab AS target
-             |USING testcat.tab1 AS source
-             |ON target.i = source.i
-             |WHEN MATCHED AND (target.s = 31) THEN DELETE
-             |WHEN MATCHED AND (target.s = 31)
-             |  THEN UPDATE SET target.s = DEFAULT + 1
-             |WHEN NOT MATCHED AND (source.s='insert')
-             |  THEN INSERT (target.i, target.s) values (DEFAULT, DEFAULT)
-             |WHEN NOT MATCHED BY SOURCE AND (target.s = 31) THEN DELETE
-             |WHEN NOT MATCHED BY SOURCE AND (target.s = 31)
-             |  THEN UPDATE SET target.s = DEFAULT + 1""".stripMargin
-        checkError(
-          exception = intercept[AnalysisException] {
-            parseAndResolve(mergeWithDefaultReferenceAsPartOfComplexExpression)
-          },
-          errorClass = "_LEGACY_ERROR_TEMP_1343",
-          parameters = Map.empty)
-
-        // Ambiguous DEFAULT column reference when the table itself contains a column named
-        // "DEFAULT".
-        val mergeIntoTableWithColumnNamedDefault =
-        s"""
-           |MERGE INTO testcat.tablewithcolumnnameddefault AS target
-           |USING testcat.tab1 AS source
-           |ON default = source.i
-           |WHEN MATCHED AND (target.s = 32) THEN DELETE
-           |WHEN MATCHED AND (target.s = 32)
-           |  THEN UPDATE SET target.s = DEFAULT
-           |WHEN NOT MATCHED AND (source.s='insert')
-           |  THEN INSERT (target.s) values (DEFAULT)
-           |WHEN NOT MATCHED BY SOURCE AND (target.s = 32) THEN DELETE
-           |WHEN NOT MATCHED BY SOURCE AND (target.s = 32)
-           |  THEN UPDATE SET target.s = DEFAULT
-             """.stripMargin
-        parseAndResolve(mergeIntoTableWithColumnNamedDefault, withDefault = true) match {
-          case m: MergeIntoTable =>
-            val target = m.targetTable
-            val d = target.output.find(_.name == "default").get.asInstanceOf[AttributeReference]
-            m.mergeCondition match {
-              case EqualTo(Cast(l: AttributeReference, _, _, _), _) =>
-                assert(l.sameRef(d))
-              case Literal(_, BooleanType) => // this is acceptable as a merge condition
-              case other =>
-                fail("unexpected merge condition " + other)
-            }
-            assert(m.matchedActions.length == 2)
-            assert(m.notMatchedActions.length == 1)
-            assert(m.notMatchedBySourceActions.length == 2)
-
-          case other =>
-            fail("Expect MergeIntoTable, but got:\n" + other.treeString)
-        }
     }
 
     // DEFAULT columns (explicit):
@@ -1880,19 +1844,19 @@ class PlanResolutionSuite extends AnalysisTest {
     // DEFAULT column references in the below MERGE INTO command should resolve to the corresponding
     // values. This test case covers that behavior.
     val mergeDefaultWithExplicitDefaultColumns =
-      s"""
-         |MERGE INTO testcat.defaultvalues AS target
-         |USING testcat.tab1 AS source
-         |ON target.i = source.i
-         |WHEN MATCHED AND (target.s = 31) THEN DELETE
-         |WHEN MATCHED AND (target.s = 31)
-         |  THEN UPDATE SET target.s = DEFAULT
-         |WHEN NOT MATCHED AND (source.s='insert')
-         |  THEN INSERT (target.i, target.s) values (DEFAULT, DEFAULT)
-         |WHEN NOT MATCHED BY SOURCE AND (target.s = 31) THEN DELETE
-         |WHEN NOT MATCHED BY SOURCE AND (target.s = 31)
-         |  THEN UPDATE SET target.s = DEFAULT
-           """.stripMargin
+      """
+        |MERGE INTO testcat.defaultvalues AS target
+        |USING testcat.tab1 AS source
+        |ON target.i = source.i
+        |WHEN MATCHED AND (target.s = 31) THEN DELETE
+        |WHEN MATCHED AND (target.s = 31)
+        |  THEN UPDATE SET target.s = DEFAULT
+        |WHEN NOT MATCHED AND (source.s='insert')
+        |  THEN INSERT (target.i, target.s) values (DEFAULT, DEFAULT)
+        |WHEN NOT MATCHED BY SOURCE AND (target.s = 31) THEN DELETE
+        |WHEN NOT MATCHED BY SOURCE AND (target.s = 31)
+        |  THEN UPDATE SET target.s = DEFAULT
+        |""".stripMargin
     parseAndResolve(mergeDefaultWithExplicitDefaultColumns) match {
       case m: MergeIntoTable =>
         val cond = m.mergeCondition
@@ -1942,6 +1906,110 @@ class PlanResolutionSuite extends AnalysisTest {
         fail("Expect MergeIntoTable, but got:\n" + other.treeString)
     }
 
+    // DEFAULT column reference in the merge condition:
+    // This MERGE INTO command includes an ON clause with a DEFAULT column reference. This
+    // DEFAULT column won't be resolved.
+    val mergeWithDefaultReferenceInMergeCondition =
+      """
+        |MERGE INTO testcat.tab AS target
+        |USING testcat.tab1 AS source
+        |ON target.i = DEFAULT
+        |WHEN MATCHED AND (target.s = 31) THEN DELETE
+        |WHEN MATCHED AND (target.s = 31)
+        |  THEN UPDATE SET target.s = DEFAULT
+        |WHEN NOT MATCHED AND (source.s='insert')
+        |  THEN INSERT (target.i, target.s) values (DEFAULT, DEFAULT)
+        |WHEN NOT MATCHED BY SOURCE AND (target.s = 31) THEN DELETE
+        |WHEN NOT MATCHED BY SOURCE AND (target.s = 31)
+        |  THEN UPDATE SET target.s = DEFAULT
+        |""".stripMargin
+    checkError(
+      exception = intercept[AnalysisException] {
+        parseAndResolve(mergeWithDefaultReferenceInMergeCondition, checkAnalysis = true)
+      },
+      errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+      parameters = Map("objectName" -> "`DEFAULT`",
+        "proposal" -> "`target`.`i`, `source`.`i`, `target`.`s`, `source`.`s`"),
+      context = ExpectedContext(
+        fragment = "DEFAULT",
+        start = 77,
+        stop = 83))
+
+    // DEFAULT column reference within a complex expression:
+    // This MERGE INTO command includes a WHEN MATCHED clause with a DEFAULT column reference as
+    // of a complex expression (DEFAULT + 1). This is invalid and column won't be resolved.
+    val mergeWithDefaultReferenceAsPartOfComplexExpression =
+      """
+        |MERGE INTO testcat.tab AS target
+        |USING testcat.tab1 AS source
+        |ON target.i = source.i
+        |WHEN MATCHED AND (target.s = 31) THEN DELETE
+        |WHEN MATCHED AND (target.s = 31)
+        |  THEN UPDATE SET target.s = DEFAULT + 1
+        |WHEN NOT MATCHED AND (source.s='insert')
+        |  THEN INSERT (target.i, target.s) values (DEFAULT, DEFAULT)
+        |WHEN NOT MATCHED BY SOURCE AND (target.s = 31) THEN DELETE
+        |WHEN NOT MATCHED BY SOURCE AND (target.s = 31)
+        |  THEN UPDATE SET target.s = DEFAULT + 1
+        |""".stripMargin
+    checkError(
+      exception = intercept[AnalysisException] {
+        parseAndResolve(mergeWithDefaultReferenceAsPartOfComplexExpression)
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_1343",
+      parameters = Map.empty)
+
+    val mergeWithDefaultReferenceForNonNullableCol =
+      """
+        |MERGE INTO testcat.tab2 AS target
+        |USING testcat.tab1 AS source
+        |ON target.i = source.i
+        |WHEN NOT MATCHED AND (source.s = 'insert')
+        |  THEN INSERT (target.i, target.x) VALUES (1, DEFAULT)
+        |""".stripMargin
+    checkError(
+      exception = intercept[AnalysisException] {
+        parseAndResolve(mergeWithDefaultReferenceForNonNullableCol)
+      },
+      errorClass = "NO_DEFAULT_COLUMN_VALUE_AVAILABLE",
+      parameters = Map("colName" -> "`x`")
+    )
+
+    // Ambiguous DEFAULT column reference when the table itself contains a column named
+    // "DEFAULT".
+    val mergeIntoTableWithColumnNamedDefault =
+      """
+        |MERGE INTO testcat.tablewithcolumnnameddefault AS target
+        |USING testcat.tab1 AS source
+        |ON default = source.i
+        |WHEN MATCHED AND (target.s = 32) THEN DELETE
+        |WHEN MATCHED AND (target.s = 32)
+        |  THEN UPDATE SET target.s = DEFAULT
+        |WHEN NOT MATCHED AND (source.s='insert')
+        |  THEN INSERT (target.s) values (DEFAULT)
+        |WHEN NOT MATCHED BY SOURCE AND (target.s = 32) THEN DELETE
+        |WHEN NOT MATCHED BY SOURCE AND (target.s = 32)
+        |  THEN UPDATE SET target.s = DEFAULT
+        |""".stripMargin
+    parseAndResolve(mergeIntoTableWithColumnNamedDefault, withDefault = true) match {
+      case m: MergeIntoTable =>
+        val target = m.targetTable
+        val d = target.output.find(_.name == "default").get.asInstanceOf[AttributeReference]
+        m.mergeCondition match {
+          case EqualTo(Cast(l: AttributeReference, _, _, _), _) =>
+            assert(l.sameRef(d))
+          case Literal(_, BooleanType) => // this is acceptable as a merge condition
+          case other =>
+            fail("unexpected merge condition " + other)
+        }
+        assert(m.matchedActions.length == 2)
+        assert(m.notMatchedActions.length == 1)
+        assert(m.notMatchedBySourceActions.length == 2)
+
+      case other =>
+        fail("Expect MergeIntoTable, but got:\n" + other.treeString)
+    }
+
     // no aliases
     Seq(("v2Table", "v2Table1"), ("testcat.tab", "testcat.tab1")).foreach { pair =>
       def referenceNames(target: String, column: String): String = target match {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org