You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/11/01 16:21:44 UTC

[spark] branch master updated: [SPARK-40921][SQL] Add WHEN NOT MATCHED BY SOURCE clause to MERGE INTO

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c7fa8471b34 [SPARK-40921][SQL] Add WHEN NOT MATCHED BY SOURCE clause to MERGE INTO
c7fa8471b34 is described below

commit c7fa8471b3453b674b28927490430bab8d2a7bac
Author: Johan Lasperas <jo...@databricks.com>
AuthorDate: Wed Nov 2 00:21:25 2022 +0800

    [SPARK-40921][SQL] Add WHEN NOT MATCHED BY SOURCE clause to MERGE INTO
    
    ### What changes were proposed in this pull request?
    This change adds a third type of WHEN clause to the MERGE INTO command that allows updating or deleting rows from the target table that have no match in the source table based on the merge condition.
    
    The following example updates all rows from the target table that have a match in the source table using the source value. For target rows that have no match in the source table, the 'state' column of rows that were created before '2022-10-26' is set to 'active', while rows created before that date are deleted from the target table.
    ```
    MERGE INTO target
    USING source
    ON target.key = source.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED BY SOURCE AND target.create_at > '2022-10-26' THEN UPDATE SET target.status = 'active'
    WHEN NOT MATCHED BY SOURCE THEN DELETE
    ```
    
    In addition, the existing WHEN NOT MATCHED clause can now also include an optional BY TARGET qualifier that has no effect on its semantic other than allowing a more consistent use of the clauses together:
    ```
    WHEN NOT MATCHED BY TARGET THEN INSERT *
    ```
    is equivalent to:
    ```
    WHEN NOT MATCHED THEN INSERT *
    ```
    The updated SQL syntax for the MERGE INTO command is described more precisely in the user-facing change section below.
    
    The changes proposed in this pull request are two-fold:
    1. Update SQL parsing to handle the new clause introduced. This results in a new field `notMatchedBySourceActions` being populated in the logical plan node of the MERGE INTO command `MergeIntoTable`.
    2. Handle the newly added merge clause during analysis. In particular, resolve all attribute references used in WHEN NOT MATCHED BY SOURCE conditions and actions. The attributes used in a NOT MATCHED BY SOURCE clause may only refer to the target table.
    
    ### Why are the changes needed?
    The new clause opens up uses cases leveraging the merge command to sync a target from a source table by conditionally deleting or updating records that are not present in the source. As an example, the following command incrementally syncs the target table from the source table for the past 5 days:
    ```
    MERGE INTO target
    USING (SELECT `columns`  FROM source  WHERE created_at >= (current_date - INTERVAL ‘5’ DAY)  AS tmp_name
    ON FALSE
    WHEN NOT MATCHED BY SOURCE AND (current_date - INTERVAL ‘5’ DAY) THEN DELETE
    WHEN NOT MATCHED BY TARGET THEN INSERT `columns`
    ```
    After running this command, all rows older than 5 days in the target table are left unmodified while rows newer than 5 days that are either not already in the target table or not in the source table anymore are inserted and deleted, respectively.
    
    ### Does this PR introduce _any_ user-facing change?
    Two user-facing changes are introduced in the MERGE INTO syntax:
    - WHEN NOT MATCHED BY SOURCE clause.
    - Optional BY TARGET qualifier for WHEN NOT MATCHED clauses.
    
    The updated Spark SQL syntax is:
    ```
    MERGE INTO target_table_name [target_alias]
       USING source_table_reference [source_alias]
       ON merge_condition
       { WHEN MATCHED [ AND matched_condition ] THEN matched_action |
         WHEN NOT MATCHED [BY TARGET] [ AND not_matched_condition ] THEN not_matched_action
         WHEN NOT MATCHED BY SOURCE [ AND not_matched_by_source_condition ] THEN not_matched_by_source_action } [...]
    
    matched_action
     { DELETE |
       UPDATE SET * |
       UPDATE SET { column = [ expr | DEFAULT ] } [, ...] }
    
    not_matched_action
     { INSERT * |
       INSERT (column1 [, ...] ) VALUES ( expr | DEFAULT ] [, ...] )
    
    not_matched_by_source_action
     { DELETE |
       UPDATE SET { column = [ expr | DEFAULT ] } [, ...] }
    ```
    This syntax replicates the semantics used by other vendors, see:
    - [Microsoft T-SQL Merge](https://learn.microsoft.com/en-us/sql/t-sql/statements/merge-transact-sql)
    - [Google BigQuery Merge](https://cloud.google.com/bigquery/docs/reference/standard-sql/dml-syntax#merge_statement)
    
    ### How was this patch tested?
    Tests are extended or added to cover the following aspect of the change:
    - Parsing (in DDLParserSuite.scala):
      - Existing tests are extended to cover parsing of WHEN NOT MATCHED BY SOURCE clauses in a range of cases. This covers parsing the clause with optional conditions and a variety of UPDATE and DELETE actions.
      - New tests are added to cover NOT MATCHED BY TARGET and invalid action UPDATE SET * for WHEN NOT MATCHED BY SOURCE.
    - Analysis (in PlanResolutionSuite.scala):
      - Existing tests are extended to also cover attribute reference resolution for WHEN NOT MATCHED BY SOURCE conditions and actions together with other type of clauses.
      - New tests are added to cover reference resolution specific to WHEN NOT MATCHED BY SOURCE clauses:
        - Unqualified reference to a column present both in the target and source table is not ambiguous in WHEN NOT MATCHED BY SOURCE conditions or actions since it can only refer to the target table.
        - Reference to columns in the source table are invalid in WHEN NOT MATCHED BY SOURCE.
    
    Closes #38400 from johanl-db/SPARK-40921-when-not-matched-by-source.
    
    Authored-by: Johan Lasperas <jo...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 core/src/main/resources/error/error-classes.json   |  38 ++-
 docs/sql-ref-ansi-compliance.md                    |   2 +
 .../spark/sql/catalyst/parser/SqlBaseLexer.g4      |   2 +
 .../spark/sql/catalyst/parser/SqlBaseParser.g4     |  18 +-
 .../spark/sql/catalyst/analysis/Analyzer.scala     |  50 ++--
 .../catalyst/analysis/DeduplicateRelations.scala   |   4 +-
 .../catalyst/analysis/ResolveDefaultColumns.scala  |  10 +-
 .../ReplaceNullWithFalseInPredicate.scala          |   9 +-
 .../spark/sql/catalyst/parser/AstBuilder.scala     |  34 ++-
 .../sql/catalyst/plans/logical/v2Commands.scala    |   3 +-
 .../spark/sql/errors/QueryParsingErrors.scala      |  22 +-
 .../sql/catalyst/analysis/AnalysisSuite.scala      |   1 +
 .../PullupCorrelatedPredicatesSuite.scala          |   5 +-
 .../ReplaceNullWithFalseInPredicateSuite.scala     |  24 +-
 .../spark/sql/catalyst/parser/DDLParserSuite.scala | 120 ++++++++-
 .../execution/command/PlanResolutionSuite.scala    | 277 +++++++++++++++++----
 16 files changed, 491 insertions(+), 128 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json
index 5abd0bd9630..81a0630096d 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -599,6 +599,24 @@
       "More than one row returned by a subquery used as an expression."
     ]
   },
+  "NON_LAST_MATCHED_CLAUSE_OMIT_CONDITION" : {
+    "message" : [
+      "When there are more than one MATCHED clauses in a MERGE statement, only the last MATCHED clause can omit the condition."
+    ],
+    "sqlState" : "42000"
+  },
+  "NON_LAST_NOT_MATCHED_BY_SOURCE_CLAUSE_OMIT_CONDITION" : {
+    "message" : [
+      "When there are more than one NOT MATCHED BY SOURCE clauses in a MERGE statement, only the last NOT MATCHED BY SOURCE clause can omit the condition."
+    ],
+    "sqlState" : "42000"
+  },
+  "NON_LAST_NOT_MATCHED_BY_TARGET_CLAUSE_OMIT_CONDITION" : {
+    "message" : [
+      "When there are more than one NOT MATCHED [BY TARGET] clauses in a MERGE statement, only the last NOT MATCHED [BY TARGET] clause can omit the condition."
+    ],
+    "sqlState" : "42000"
+  },
   "NON_LITERAL_PIVOT_VALUES" : {
     "message" : [
       "Literal expressions required for pivot values, found <expression>."
@@ -1178,36 +1196,16 @@
       "Empty source for merge: you should specify a source table/subquery in merge."
     ]
   },
-  "_LEGACY_ERROR_TEMP_0005" : {
-    "message" : [
-      "Unrecognized matched action: <matchedAction>."
-    ]
-  },
   "_LEGACY_ERROR_TEMP_0006" : {
     "message" : [
       "The number of inserted values cannot match the fields."
     ]
   },
-  "_LEGACY_ERROR_TEMP_0007" : {
-    "message" : [
-      "Unrecognized not matched action: <notMatchedAction>."
-    ]
-  },
   "_LEGACY_ERROR_TEMP_0008" : {
     "message" : [
       "There must be at least one WHEN clause in a MERGE statement."
     ]
   },
-  "_LEGACY_ERROR_TEMP_0009" : {
-    "message" : [
-      "When there are more than one MATCHED clauses in a MERGE statement, only the last MATCHED clause can omit the condition."
-    ]
-  },
-  "_LEGACY_ERROR_TEMP_0010" : {
-    "message" : [
-      "When there are more than one NOT MATCHED clauses in a MERGE statement, only the last NOT MATCHED clause can omit the condition."
-    ]
-  },
   "_LEGACY_ERROR_TEMP_0011" : {
     "message" : [
       "Combination of ORDER BY/SORT BY/DISTRIBUTE BY/CLUSTER BY is not supported."
diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md
index ce27e642a0c..a59d145d551 100644
--- a/docs/sql-ref-ansi-compliance.md
+++ b/docs/sql-ref-ansi-compliance.md
@@ -578,6 +578,7 @@ Below is a list of all the keywords in Spark SQL.
 |SOME|reserved|non-reserved|reserved|
 |SORT|non-reserved|non-reserved|non-reserved|
 |SORTED|non-reserved|non-reserved|non-reserved|
+|SOURCE|non-reserved|non-reserved|non-reserved|
 |START|non-reserved|non-reserved|reserved|
 |STATISTICS|non-reserved|non-reserved|non-reserved|
 |STORED|non-reserved|non-reserved|non-reserved|
@@ -591,6 +592,7 @@ Below is a list of all the keywords in Spark SQL.
 |TABLE|reserved|non-reserved|reserved|
 |TABLES|non-reserved|non-reserved|non-reserved|
 |TABLESAMPLE|non-reserved|non-reserved|reserved|
+|TARGET|non-reserved|non-reserved|non-reserved|
 |TBLPROPERTIES|non-reserved|non-reserved|non-reserved|
 |TEMP|non-reserved|non-reserved|not a keyword|
 |TEMPORARY|non-reserved|non-reserved|non-reserved|
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
index 2235ff2a51e..e11391fa5f4 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
@@ -315,6 +315,7 @@ SKEWED: 'SKEWED';
 SOME: 'SOME';
 SORT: 'SORT';
 SORTED: 'SORTED';
+SOURCE: 'SOURCE';
 START: 'START';
 STATISTICS: 'STATISTICS';
 STORED: 'STORED';
@@ -328,6 +329,7 @@ SYSTEM_VERSION: 'SYSTEM_VERSION';
 TABLE: 'TABLE';
 TABLES: 'TABLES';
 TABLESAMPLE: 'TABLESAMPLE';
+TARGET: 'TARGET';
 TBLPROPERTIES: 'TBLPROPERTIES';
 TEMPORARY: 'TEMPORARY' | 'TEMP';
 TERMINATED: 'TERMINATED';
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
index 7beda692cd3..9bd6e01c80a 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
@@ -440,7 +440,8 @@ dmlStatementNoWith
           LEFT_PAREN sourceQuery=query RIGHT_PAREN) sourceAlias=tableAlias
         ON mergeCondition=booleanExpression
         matchedClause*
-        notMatchedClause*                                                          #mergeIntoTable
+        notMatchedClause*
+        notMatchedBySourceClause*                                                  #mergeIntoTable
     ;
 
 queryOrganization
@@ -537,7 +538,11 @@ matchedClause
     : WHEN MATCHED (AND matchedCond=booleanExpression)? THEN matchedAction
     ;
 notMatchedClause
-    : WHEN NOT MATCHED (AND notMatchedCond=booleanExpression)? THEN notMatchedAction
+    : WHEN NOT MATCHED (BY TARGET)? (AND notMatchedCond=booleanExpression)? THEN notMatchedAction
+    ;
+
+notMatchedBySourceClause
+    : WHEN NOT MATCHED BY SOURCE (AND notMatchedBySourceCond=booleanExpression)? THEN notMatchedBySourceAction
     ;
 
 matchedAction
@@ -552,6 +557,11 @@ notMatchedAction
         VALUES LEFT_PAREN expression (COMMA expression)* RIGHT_PAREN
     ;
 
+notMatchedBySourceAction
+    : DELETE
+    | UPDATE SET assignmentList
+    ;
+
 assignmentList
     : assignment (COMMA assignment)*
     ;
@@ -1320,6 +1330,7 @@ ansiNonReserved
     | SKEWED
     | SORT
     | SORTED
+    | SOURCE
     | START
     | STATISTICS
     | STORED
@@ -1332,6 +1343,7 @@ ansiNonReserved
     | SYSTEM_VERSION
     | TABLES
     | TABLESAMPLE
+    | TARGET
     | TBLPROPERTIES
     | TEMPORARY
     | TERMINATED
@@ -1610,6 +1622,7 @@ nonReserved
     | SOME
     | SORT
     | SORTED
+    | SOURCE
     | START
     | STATISTICS
     | STORED
@@ -1623,6 +1636,7 @@ nonReserved
     | TABLE
     | TABLES
     | TABLESAMPLE
+    | TARGET
     | TBLPROPERTIES
     | TEMPORARY
     | TERMINATED
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index eca304c9d1b..d2a2dc5dc14 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1511,7 +1511,7 @@ class Analyzer(override val catalogManager: CatalogManager)
         // implementation and should be resolved based on the table schema.
         o.copy(deleteExpr = resolveExpressionByPlanOutput(o.deleteExpr, o.table))
 
-      case m @ MergeIntoTable(targetTable, sourceTable, _, _, _)
+      case m @ MergeIntoTable(targetTable, sourceTable, _, _, _, _)
         if !m.resolved && targetTable.resolved && sourceTable.resolved =>
 
         EliminateSubqueryAliases(targetTable) match {
@@ -1533,15 +1533,15 @@ class Analyzer(override val catalogManager: CatalogManager)
                 UpdateAction(
                   resolvedUpdateCondition,
                   // The update value can access columns from both target and source tables.
-                  resolveAssignments(assignments, m, resolveValuesWithSourceOnly = false))
+                  resolveAssignments(assignments, m, MergeResolvePolicy.BOTH))
               case UpdateStarAction(updateCondition) =>
                 val assignments = targetTable.output.map { attr =>
                   Assignment(attr, UnresolvedAttribute(Seq(attr.name)))
                 }
                 UpdateAction(
                   updateCondition.map(resolveExpressionByPlanChildren(_, m)),
-                  // For UPDATE *, the value must from source table.
-                  resolveAssignments(assignments, m, resolveValuesWithSourceOnly = true))
+                  // For UPDATE *, the value must be from source table.
+                  resolveAssignments(assignments, m, MergeResolvePolicy.SOURCE))
               case o => o
             }
             val newNotMatchedActions = m.notMatchedActions.map {
@@ -1549,27 +1549,43 @@ class Analyzer(override val catalogManager: CatalogManager)
                 // The insert action is used when not matched, so its condition and value can only
                 // access columns from the source table.
                 val resolvedInsertCondition = insertCondition.map(
-                  resolveExpressionByPlanChildren(_, Project(Nil, m.sourceTable)))
+                  resolveExpressionByPlanOutput(_, m.sourceTable))
                 InsertAction(
                   resolvedInsertCondition,
-                  resolveAssignments(assignments, m, resolveValuesWithSourceOnly = true))
+                  resolveAssignments(assignments, m, MergeResolvePolicy.SOURCE))
               case InsertStarAction(insertCondition) =>
                 // The insert action is used when not matched, so its condition and value can only
                 // access columns from the source table.
                 val resolvedInsertCondition = insertCondition.map(
-                  resolveExpressionByPlanChildren(_, Project(Nil, m.sourceTable)))
+                  resolveExpressionByPlanOutput(_, m.sourceTable))
                 val assignments = targetTable.output.map { attr =>
                   Assignment(attr, UnresolvedAttribute(Seq(attr.name)))
                 }
                 InsertAction(
                   resolvedInsertCondition,
-                  resolveAssignments(assignments, m, resolveValuesWithSourceOnly = true))
+                  resolveAssignments(assignments, m, MergeResolvePolicy.SOURCE))
               case o => o
             }
+            val newNotMatchedBySourceActions = m.notMatchedBySourceActions.map {
+              case DeleteAction(deleteCondition) =>
+                val resolvedDeleteCondition = deleteCondition.map(
+                  resolveExpressionByPlanOutput(_, targetTable))
+                DeleteAction(resolvedDeleteCondition)
+              case UpdateAction(updateCondition, assignments) =>
+                val resolvedUpdateCondition = updateCondition.map(
+                  resolveExpressionByPlanOutput(_, targetTable))
+                UpdateAction(
+                  resolvedUpdateCondition,
+                  // The update value can access columns from the target table only.
+                  resolveAssignments(assignments, m, MergeResolvePolicy.TARGET))
+              case o => o
+            }
+
             val resolvedMergeCondition = resolveExpressionByPlanChildren(m.mergeCondition, m)
             m.copy(mergeCondition = resolvedMergeCondition,
               matchedActions = newMatchedActions,
-              notMatchedActions = newNotMatchedActions)
+              notMatchedActions = newNotMatchedActions,
+              notMatchedBySourceActions = newNotMatchedBySourceActions)
         }
 
       // Skip the having clause here, this will be handled in ResolveAggregateFunctions.
@@ -1580,10 +1596,14 @@ class Analyzer(override val catalogManager: CatalogManager)
         q.mapExpressions(resolveExpressionByPlanChildren(_, q))
     }
 
+    private object MergeResolvePolicy extends Enumeration {
+      val BOTH, SOURCE, TARGET = Value
+    }
+
     def resolveAssignments(
         assignments: Seq[Assignment],
         mergeInto: MergeIntoTable,
-        resolveValuesWithSourceOnly: Boolean): Seq[Assignment] = {
+        resolvePolicy: MergeResolvePolicy.Value): Seq[Assignment] = {
       assignments.map { assign =>
         val resolvedKey = assign.key match {
           case c if !c.resolved =>
@@ -1591,13 +1611,13 @@ class Analyzer(override val catalogManager: CatalogManager)
           case o => o
         }
         val resolvedValue = assign.value match {
-          // The update values may contain target and/or source references.
           case c if !c.resolved =>
-            if (resolveValuesWithSourceOnly) {
-              resolveMergeExprOrFail(c, Project(Nil, mergeInto.sourceTable))
-            } else {
-              resolveMergeExprOrFail(c, mergeInto)
+            val resolvePlan = resolvePolicy match {
+              case MergeResolvePolicy.BOTH => mergeInto
+              case MergeResolvePolicy.SOURCE => Project(Nil, mergeInto.sourceTable)
+              case MergeResolvePolicy.TARGET => Project(Nil, mergeInto.targetTable)
             }
+            resolveMergeExprOrFail(c, resolvePlan)
           case o => o
         }
         Assignment(resolvedKey, resolvedValue)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
index 1ffe421e19b..909ec908020 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
@@ -80,8 +80,8 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
           }
         }
         u.copy(children = newChildren)
-      case m @ MergeIntoTable(targetTable, sourceTable, _, _, _) if !m.duplicateResolved =>
-        m.copy(sourceTable = dedupRight(targetTable, sourceTable))
+      case merge: MergeIntoTable if !merge.duplicateResolved =>
+        merge.copy(sourceTable = dedupRight(merge.targetTable, merge.sourceTable))
     }
   }
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala
index b7c7f0d3772..a04844c6526 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala
@@ -203,9 +203,17 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl
         r
       }.getOrElse(action)
     }
+    val newNotMatchedBySourceActions: Seq[MergeAction] =
+      m.notMatchedBySourceActions.map { action: MergeAction =>
+      replaceExplicitDefaultValuesInMergeAction(action, columnNamesToExpressions).map { r =>
+        replaced = true
+        r
+      }.getOrElse(action)
+    }
     if (replaced) {
       m.copy(matchedActions = newMatchedActions,
-        notMatchedActions = newNotMatchedActions)
+        notMatchedActions = newNotMatchedActions,
+        notMatchedBySourceActions = newNotMatchedBySourceActions)
     } else {
       m
     }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala
index d060a8be5da..3ffaca6f54c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala
@@ -57,11 +57,12 @@ object ReplaceNullWithFalseInPredicate extends Rule[LogicalPlan] {
     case rd @ ReplaceData(_, cond, _, _, _) => rd.copy(condition = replaceNullWithFalse(cond))
     case d @ DeleteFromTable(_, cond) => d.copy(condition = replaceNullWithFalse(cond))
     case u @ UpdateTable(_, _, Some(cond)) => u.copy(condition = Some(replaceNullWithFalse(cond)))
-    case m @ MergeIntoTable(_, _, mergeCond, matchedActions, notMatchedActions) =>
+    case m: MergeIntoTable =>
       m.copy(
-        mergeCondition = replaceNullWithFalse(mergeCond),
-        matchedActions = replaceNullWithFalse(matchedActions),
-        notMatchedActions = replaceNullWithFalse(notMatchedActions))
+        mergeCondition = replaceNullWithFalse(m.mergeCondition),
+        matchedActions = replaceNullWithFalse(m.matchedActions),
+        notMatchedActions = replaceNullWithFalse(m.notMatchedActions),
+        notMatchedBySourceActions = replaceNullWithFalse(m.notMatchedBySourceActions))
     case p: LogicalPlan => p.transformExpressionsWithPruning(
       _.containsAnyPattern(NULL_LITERAL, TRUE_OR_FALSE_LITERAL), ruleId) {
       // For `EqualNullSafe` with a `TrueLiteral`, whether the other side is null or false has no
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 84911bf59c7..8edb1702028 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -29,6 +29,7 @@ import org.antlr.v4.runtime.tree.{ParseTree, RuleNode, TerminalNode}
 import org.apache.commons.codec.DecoderException
 import org.apache.commons.codec.binary.Hex
 
+import org.apache.spark.SparkException
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, TableIdentifier}
@@ -425,8 +426,8 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
             UpdateAction(condition, withAssignments(clause.matchedAction().assignmentList()))
           }
         } else {
-          // It should not be here.
-          throw QueryParsingErrors.unrecognizedMatchedActionError(clause)
+          throw SparkException.internalError(
+            s"Unrecognized matched action: ${clause.matchedAction().getText}")
         }
       }
     }
@@ -446,12 +447,27 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
             InsertAction(condition, columns.zip(values).map(kv => Assignment(kv._1, kv._2)).toSeq)
           }
         } else {
-          // It should not be here.
-          throw QueryParsingErrors.unrecognizedNotMatchedActionError(clause)
+          throw SparkException.internalError(
+            s"Unrecognized matched action: ${clause.notMatchedAction().getText}")
         }
       }
     }
-    if (matchedActions.isEmpty && notMatchedActions.isEmpty) {
+    val notMatchedBySourceActions = ctx.notMatchedBySourceClause().asScala.map {
+      clause => {
+        val notMatchedBySourceAction = clause.notMatchedBySourceAction()
+        if (notMatchedBySourceAction.DELETE() != null) {
+          DeleteAction(Option(clause.notMatchedBySourceCond).map(expression))
+        } else if (notMatchedBySourceAction.UPDATE() != null) {
+          val condition = Option(clause.notMatchedBySourceCond).map(expression)
+          UpdateAction(condition,
+            withAssignments(clause.notMatchedBySourceAction().assignmentList()))
+        } else {
+          throw SparkException.internalError(
+            s"Unrecognized matched action: ${clause.notMatchedBySourceAction().getText}")
+        }
+      }
+    }
+    if (matchedActions.isEmpty && notMatchedActions.isEmpty && notMatchedBySourceActions.isEmpty) {
       throw QueryParsingErrors.mergeStatementWithoutWhenClauseError(ctx)
     }
     // children being empty means that the condition is not set
@@ -463,13 +479,19 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
     if (notMatchedActionSize >= 2 && !notMatchedActions.init.forall(_.condition.nonEmpty)) {
       throw QueryParsingErrors.nonLastNotMatchedClauseOmitConditionError(ctx)
     }
+    val notMatchedBySourceActionSize = notMatchedBySourceActions.length
+    if (notMatchedBySourceActionSize >= 2 &&
+     !notMatchedBySourceActions.init.forall(_.condition.nonEmpty)) {
+      throw QueryParsingErrors.nonLastNotMatchedBySourceClauseOmitConditionError(ctx)
+    }
 
     MergeIntoTable(
       aliasedTarget,
       aliasedSource,
       mergeCondition,
       matchedActions.toSeq,
-      notMatchedActions.toSeq)
+      notMatchedActions.toSeq,
+      notMatchedBySourceActions.toSeq)
   }
 
   /**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index e5bc96eab8a..cc800e67933 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -566,7 +566,8 @@ case class MergeIntoTable(
     sourceTable: LogicalPlan,
     mergeCondition: Expression,
     matchedActions: Seq[MergeAction],
-    notMatchedActions: Seq[MergeAction]) extends BinaryCommand with SupportsSubquery {
+    notMatchedActions: Seq[MergeAction],
+    notMatchedBySourceActions: Seq[MergeAction]) extends BinaryCommand with SupportsSubquery {
   def duplicateResolved: Boolean = targetTable.outputSet.intersect(sourceTable.outputSet).isEmpty
 
   def skipSchemaResolution: Boolean = targetTable match {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
index a17b4e592b2..204b28f3725 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
@@ -49,34 +49,24 @@ private[sql] object QueryParsingErrors extends QueryErrorsBase {
     new ParseException(errorClass = "_LEGACY_ERROR_TEMP_0004", ctx.source)
   }
 
-  def unrecognizedMatchedActionError(ctx: MatchedClauseContext): Throwable = {
-    new ParseException(
-      errorClass = "_LEGACY_ERROR_TEMP_0005",
-      messageParameters = Map("matchedAction" -> ctx.matchedAction().getText),
-      ctx.matchedAction())
-  }
-
   def insertedValueNumberNotMatchFieldNumberError(ctx: NotMatchedClauseContext): Throwable = {
     new ParseException(errorClass = "_LEGACY_ERROR_TEMP_0006", ctx.notMatchedAction())
   }
 
-  def unrecognizedNotMatchedActionError(ctx: NotMatchedClauseContext): Throwable = {
-    new ParseException(
-      errorClass = "_LEGACY_ERROR_TEMP_0007",
-      messageParameters = Map("matchedAction" -> ctx.notMatchedAction().getText),
-      ctx.notMatchedAction())
-  }
-
   def mergeStatementWithoutWhenClauseError(ctx: MergeIntoTableContext): Throwable = {
     new ParseException(errorClass = "_LEGACY_ERROR_TEMP_0008", ctx)
   }
 
   def nonLastMatchedClauseOmitConditionError(ctx: MergeIntoTableContext): Throwable = {
-    new ParseException(errorClass = "_LEGACY_ERROR_TEMP_0009", ctx)
+    new ParseException(errorClass = "NON_LAST_MATCHED_CLAUSE_OMIT_CONDITION", ctx)
   }
 
   def nonLastNotMatchedClauseOmitConditionError(ctx: MergeIntoTableContext): Throwable = {
-    new ParseException(errorClass = "_LEGACY_ERROR_TEMP_0010", ctx)
+    new ParseException(errorClass = "NON_LAST_NOT_MATCHED_BY_TARGET_CLAUSE_OMIT_CONDITION", ctx)
+  }
+
+  def nonLastNotMatchedBySourceClauseOmitConditionError(ctx: MergeIntoTableContext): Throwable = {
+    new ParseException(errorClass = "NON_LAST_NOT_MATCHED_BY_SOURCE_CLAUSE_OMIT_CONDITION", ctx)
   }
 
   def emptyPartitionKeyError(key: String, ctx: PartitionSpecContext): Throwable = {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index c1106a26545..d60bd7a0981 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -681,6 +681,7 @@ class AnalysisSuite extends AnalysisTest with Matchers {
         testRelation,
         cond,
         UpdateAction(Some(cond), Assignment($"a", $"a") :: Nil) :: Nil,
+        Nil,
         Nil
       ),
       "Reference 'a' is ambiguous" :: Nil)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullupCorrelatedPredicatesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullupCorrelatedPredicatesSuite.scala
index 98ac8aa3eca..29bc46eaa3e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullupCorrelatedPredicatesSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullupCorrelatedPredicatesSuite.scala
@@ -157,7 +157,8 @@ class PullupCorrelatedPredicatesSuite extends PlanTest {
       testRelation2,
       cond,
       Seq(DeleteAction(None)),
-      Seq(InsertAction(None, Seq(Assignment($"a", $"c"), Assignment($"b", $"d")))))
+      Seq(InsertAction(None, Seq(Assignment($"a", $"c"), Assignment($"b", $"d")))),
+      Seq(DeleteAction(None)))
     val analyzedMergePlan = mergePlan.analyze
     assert(analyzedMergePlan.resolved)
 
@@ -165,7 +166,7 @@ class PullupCorrelatedPredicatesSuite extends PlanTest {
     assert(optimized.resolved)
 
     optimized match {
-      case MergeIntoTable(_, _, s: InSubquery, _, _) =>
+      case MergeIntoTable(_, _, s: InSubquery, _, _, _) =>
         val outerRefs = SubExprUtils.getOuterReferences(s.query.plan)
         assert(outerRefs.isEmpty, "should be no outer refs")
       case other =>
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicateSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicateSuite.scala
index 2557a7ea7fe..4c1a8a53ac2 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicateSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicateSuite.scala
@@ -480,10 +480,24 @@ class ReplaceNullWithFalseInPredicateSuite extends PlanTest {
       val notMatchedAssignments = Seq(
         Assignment($"i", $"d")
       )
+      val notMatchedBySourceAssignments = Seq(
+        Assignment($"i", $"i"),
+        Assignment($"b", $"b"),
+        Assignment($"a", $"a"),
+        Assignment($"m", $"m")
+      )
       val matchedActions = UpdateAction(Some(expr), matchedAssignments) ::
         DeleteAction(Some(expr)) :: Nil
       val notMatchedActions = InsertAction(None, notMatchedAssignments) :: Nil
-      MergeIntoTable(target, source, mergeCondition = expr, matchedActions, notMatchedActions)
+      val notMatchedBySourceActions = UpdateAction(Some(expr), matchedAssignments) ::
+        DeleteAction(Some(expr)) :: Nil
+      MergeIntoTable(
+        target,
+        source,
+        mergeCondition = expr,
+        matchedActions,
+        notMatchedActions,
+        notMatchedBySourceActions)
     }
     val originalPlan = func(testRelation, anotherTestRelation, originalCond).analyze
     val optimizedPlan = Optimize.execute(originalPlan)
@@ -499,7 +513,13 @@ class ReplaceNullWithFalseInPredicateSuite extends PlanTest {
       // However, the source must have all the columns present in target for star resolution.
       val source = LocalRelation($"i".int, $"b".boolean, $"a".array(IntegerType))
       val target = LocalRelation($"a".array(IntegerType))
-      MergeIntoTable(target, source, mergeCondition = expr, matchedActions, notMatchedActions)
+      MergeIntoTable(
+        target,
+        source,
+        mergeCondition = expr,
+        matchedActions,
+        notMatchedActions,
+        Seq.empty)
     }
     val originalPlanWithStar = mergePlanWithStar(originalCond).analyze
     val optimizedPlanWithStar = Optimize.execute(originalPlanWithStar)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index 1683fee9e2c..d20556962fa 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -1734,6 +1734,9 @@ class DDLParserSuite extends AnalysisTest {
         |WHEN MATCHED AND (target.col2='update') THEN UPDATE SET target.col2 = source.col2
         |WHEN NOT MATCHED AND (target.col2='insert')
         |THEN INSERT (target.col1, target.col2) values (source.col1, source.col2)
+        |WHEN NOT MATCHED BY SOURCE AND (target.col3='delete') THEN DELETE
+        |WHEN NOT MATCHED BY SOURCE AND (target.col3='update')
+        |THEN UPDATE SET target.col3 = 'delete'
       """.stripMargin,
       MergeIntoTable(
         SubqueryAlias("target", UnresolvedRelation(Seq("testcat1", "ns1", "ns2", "tbl"))),
@@ -1745,7 +1748,10 @@ class DDLParserSuite extends AnalysisTest {
               UnresolvedAttribute("source.col2"))))),
         Seq(InsertAction(Some(EqualTo(UnresolvedAttribute("target.col2"), Literal("insert"))),
           Seq(Assignment(UnresolvedAttribute("target.col1"), UnresolvedAttribute("source.col1")),
-            Assignment(UnresolvedAttribute("target.col2"), UnresolvedAttribute("source.col2")))))))
+            Assignment(UnresolvedAttribute("target.col2"), UnresolvedAttribute("source.col2"))))),
+        Seq(DeleteAction(Some(EqualTo(UnresolvedAttribute("target.col3"), Literal("delete")))),
+          UpdateAction(Some(EqualTo(UnresolvedAttribute("target.col3"), Literal("update"))),
+            Seq(Assignment(UnresolvedAttribute("target.col3"), Literal("delete")))))))
   }
 
   test("merge into table: using subquery") {
@@ -1758,6 +1764,9 @@ class DDLParserSuite extends AnalysisTest {
         |WHEN MATCHED AND (target.col2='update') THEN UPDATE SET target.col2 = source.col2
         |WHEN NOT MATCHED AND (target.col2='insert')
         |THEN INSERT (target.col1, target.col2) values (source.col1, source.col2)
+        |WHEN NOT MATCHED BY SOURCE AND (target.col3='delete') THEN DELETE
+        |WHEN NOT MATCHED BY SOURCE AND (target.col3='update')
+        |THEN UPDATE SET target.col3 = 'delete'
       """.stripMargin,
       MergeIntoTable(
         SubqueryAlias("target", UnresolvedRelation(Seq("testcat1", "ns1", "ns2", "tbl"))),
@@ -1770,7 +1779,10 @@ class DDLParserSuite extends AnalysisTest {
               UnresolvedAttribute("source.col2"))))),
         Seq(InsertAction(Some(EqualTo(UnresolvedAttribute("target.col2"), Literal("insert"))),
           Seq(Assignment(UnresolvedAttribute("target.col1"), UnresolvedAttribute("source.col1")),
-            Assignment(UnresolvedAttribute("target.col2"), UnresolvedAttribute("source.col2")))))))
+            Assignment(UnresolvedAttribute("target.col2"), UnresolvedAttribute("source.col2"))))),
+        Seq(DeleteAction(Some(EqualTo(UnresolvedAttribute("target.col3"), Literal("delete")))),
+          UpdateAction(Some(EqualTo(UnresolvedAttribute("target.col3"), Literal("update"))),
+            Seq(Assignment(UnresolvedAttribute("target.col3"), Literal("delete")))))))
   }
 
   test("merge into table: cte") {
@@ -1783,6 +1795,9 @@ class DDLParserSuite extends AnalysisTest {
         |WHEN MATCHED AND (target.col2='update') THEN UPDATE SET target.col2 = source.col2
         |WHEN NOT MATCHED AND (target.col2='insert')
         |THEN INSERT (target.col1, target.col2) values (source.col1, source.col2)
+        |WHEN NOT MATCHED BY SOURCE AND (target.col3='delete') THEN DELETE
+        |WHEN NOT MATCHED BY SOURCE AND (target.col3='update')
+        |THEN UPDATE SET target.col3 = 'delete'
       """.stripMargin,
       MergeIntoTable(
         SubqueryAlias("target", UnresolvedRelation(Seq("testcat1", "ns1", "ns2", "tbl"))),
@@ -1797,7 +1812,10 @@ class DDLParserSuite extends AnalysisTest {
               UnresolvedAttribute("source.col2"))))),
         Seq(InsertAction(Some(EqualTo(UnresolvedAttribute("target.col2"), Literal("insert"))),
           Seq(Assignment(UnresolvedAttribute("target.col1"), UnresolvedAttribute("source.col1")),
-            Assignment(UnresolvedAttribute("target.col2"), UnresolvedAttribute("source.col2")))))))
+            Assignment(UnresolvedAttribute("target.col2"), UnresolvedAttribute("source.col2"))))),
+        Seq(DeleteAction(Some(EqualTo(UnresolvedAttribute("target.col3"), Literal("delete")))),
+          UpdateAction(Some(EqualTo(UnresolvedAttribute("target.col3"), Literal("update"))),
+            Seq(Assignment(UnresolvedAttribute("target.col3"), Literal("delete")))))))
   }
 
   test("merge into table: no additional condition") {
@@ -1809,6 +1827,7 @@ class DDLParserSuite extends AnalysisTest {
         |WHEN MATCHED THEN UPDATE SET target.col2 = source.col2
         |WHEN NOT MATCHED
         |THEN INSERT (target.col1, target.col2) values (source.col1, source.col2)
+        |WHEN NOT MATCHED BY SOURCE THEN DELETE
       """.stripMargin,
     MergeIntoTable(
       SubqueryAlias("target", UnresolvedRelation(Seq("testcat1", "ns1", "ns2", "tbl"))),
@@ -1818,7 +1837,8 @@ class DDLParserSuite extends AnalysisTest {
         Seq(Assignment(UnresolvedAttribute("target.col2"), UnresolvedAttribute("source.col2"))))),
       Seq(InsertAction(None,
         Seq(Assignment(UnresolvedAttribute("target.col1"), UnresolvedAttribute("source.col1")),
-          Assignment(UnresolvedAttribute("target.col2"), UnresolvedAttribute("source.col2")))))))
+          Assignment(UnresolvedAttribute("target.col2"), UnresolvedAttribute("source.col2"))))),
+      Seq(DeleteAction(None))))
   }
 
   test("merge into table: star") {
@@ -1838,7 +1858,49 @@ class DDLParserSuite extends AnalysisTest {
       EqualTo(UnresolvedAttribute("target.col1"), UnresolvedAttribute("source.col1")),
       Seq(DeleteAction(Some(EqualTo(UnresolvedAttribute("target.col2"), Literal("delete")))),
         UpdateStarAction(Some(EqualTo(UnresolvedAttribute("target.col2"), Literal("update"))))),
-      Seq(InsertStarAction(Some(EqualTo(UnresolvedAttribute("target.col2"), Literal("insert")))))))
+      Seq(InsertStarAction(Some(EqualTo(UnresolvedAttribute("target.col2"), Literal("insert"))))),
+      Seq.empty))
+  }
+
+  test("merge into table: invalid star in not matched by source") {
+    val sql = """
+        |MERGE INTO testcat1.ns1.ns2.tbl AS target
+        |USING testcat2.ns1.ns2.tbl AS source
+        |ON target.col1 = source.col1
+        |WHEN NOT MATCHED BY SOURCE THEN UPDATE *
+      """.stripMargin
+    checkError(
+      exception = parseException(sql),
+      errorClass = "PARSE_SYNTAX_ERROR",
+      parameters = Map("error" -> "'*'", "hint" -> ""))
+  }
+
+  test("merge into table: not matched by target") {
+    parseCompare(
+      """
+        |MERGE INTO testcat1.ns1.ns2.tbl AS target
+        |USING testcat2.ns1.ns2.tbl AS source
+        |ON target.col1 = source.col1
+        |WHEN NOT MATCHED BY TARGET AND (target.col3='insert1')
+        |THEN INSERT (target.col1, target.col2) VALUES (source.col1, 0)
+        |WHEN NOT MATCHED AND (target.col3='insert2')
+        |THEN INSERT (target.col1, target.col2) VALUES (1, source.col2)
+        |WHEN NOT MATCHED BY TARGET
+        |THEN INSERT *
+      """.stripMargin,
+      MergeIntoTable(
+        SubqueryAlias("target", UnresolvedRelation(Seq("testcat1", "ns1", "ns2", "tbl"))),
+        SubqueryAlias("source", UnresolvedRelation(Seq("testcat2", "ns1", "ns2", "tbl"))),
+        EqualTo(UnresolvedAttribute("target.col1"), UnresolvedAttribute("source.col1")),
+        Seq.empty,
+        Seq(InsertAction(Some(EqualTo(UnresolvedAttribute("target.col3"), Literal("insert1"))),
+            Seq(Assignment(UnresolvedAttribute("target.col1"), UnresolvedAttribute("source.col1")),
+              Assignment(UnresolvedAttribute("target.col2"), Literal(0)))),
+          InsertAction(Some(EqualTo(UnresolvedAttribute("target.col3"), Literal("insert2"))),
+            Seq(Assignment(UnresolvedAttribute("target.col1"), Literal(1)),
+              Assignment(UnresolvedAttribute("target.col2"), UnresolvedAttribute("source.col2")))),
+          InsertStarAction(None)),
+        Seq.empty))
   }
 
   test("merge into table: columns aliases are not allowed") {
@@ -1863,7 +1925,7 @@ class DDLParserSuite extends AnalysisTest {
     }
   }
 
-  test("merge into table: multi matched and not matched clauses") {
+  test("merge into table: multi matched, not matched and not matched by source clauses") {
     parseCompare(
       """
         |MERGE INTO testcat1.ns1.ns2.tbl AS target
@@ -1876,6 +1938,9 @@ class DDLParserSuite extends AnalysisTest {
         |THEN INSERT (target.col1, target.col2) values (source.col1, 1)
         |WHEN NOT MATCHED AND (target.col2='insert2')
         |THEN INSERT (target.col1, target.col2) values (source.col1, 2)
+        |WHEN NOT MATCHED BY SOURCE AND (target.col3='delete') THEN DELETE
+        |WHEN NOT MATCHED BY SOURCE AND (target.col3='update1') THEN UPDATE SET target.col3 = 1
+        |WHEN NOT MATCHED BY SOURCE AND (target.col3='update2') THEN UPDATE SET target.col3 = 2
       """.stripMargin,
       MergeIntoTable(
         SubqueryAlias("target", UnresolvedRelation(Seq("testcat1", "ns1", "ns2", "tbl"))),
@@ -1891,7 +1956,12 @@ class DDLParserSuite extends AnalysisTest {
             Assignment(UnresolvedAttribute("target.col2"), Literal(1)))),
           InsertAction(Some(EqualTo(UnresolvedAttribute("target.col2"), Literal("insert2"))),
             Seq(Assignment(UnresolvedAttribute("target.col1"), UnresolvedAttribute("source.col1")),
-              Assignment(UnresolvedAttribute("target.col2"), Literal(2)))))))
+              Assignment(UnresolvedAttribute("target.col2"), Literal(2))))),
+        Seq(DeleteAction(Some(EqualTo(UnresolvedAttribute("target.col3"), Literal("delete")))),
+          UpdateAction(Some(EqualTo(UnresolvedAttribute("target.col3"), Literal("update1"))),
+            Seq(Assignment(UnresolvedAttribute("target.col3"), Literal(1)))),
+          UpdateAction(Some(EqualTo(UnresolvedAttribute("target.col3"), Literal("update2"))),
+            Seq(Assignment(UnresolvedAttribute("target.col3"), Literal(2)))))))
   }
 
   test("merge into table: only the last matched clause can omit the condition") {
@@ -1906,7 +1976,7 @@ class DDLParserSuite extends AnalysisTest {
         |THEN INSERT (target.col1, target.col2) values (source.col1, source.col2)""".stripMargin
     checkError(
       exception = parseException(sql),
-      errorClass = "_LEGACY_ERROR_TEMP_0009",
+      errorClass = "NON_LAST_MATCHED_CLAUSE_OMIT_CONDITION",
       parameters = Map.empty,
       context = ExpectedContext(
         fragment = sql,
@@ -1929,7 +1999,7 @@ class DDLParserSuite extends AnalysisTest {
         |THEN INSERT (target.col1, target.col2) values (source.col1, source.col2)""".stripMargin
     checkError(
       exception = parseException(sql),
-      errorClass = "_LEGACY_ERROR_TEMP_0010",
+      errorClass = "NON_LAST_NOT_MATCHED_BY_TARGET_CLAUSE_OMIT_CONDITION",
       parameters = Map.empty,
       context = ExpectedContext(
         fragment = sql,
@@ -1937,6 +2007,30 @@ class DDLParserSuite extends AnalysisTest {
         stop = 494))
   }
 
+  test("merge into table: only the last not matched by source clause can omit the " +
+       "condition") {
+    val sql =
+      """MERGE INTO testcat1.ns1.ns2.tbl AS target
+        |USING testcat2.ns1.ns2.tbl AS source
+        |ON target.col1 = source.col1
+        |WHEN MATCHED AND (target.col2 == 'update') THEN UPDATE SET target.col2 = source.col2
+        |WHEN MATCHED THEN DELETE
+        |WHEN NOT MATCHED AND (target.col2='insert')
+        |THEN INSERT (target.col1, target.col2) values (source.col1, source.col2)
+        |WHEN NOT MATCHED BY SOURCE AND (target.col3='update')
+        |THEN UPDATE SET target.col3 = 'delete'
+        |WHEN NOT MATCHED BY SOURCE THEN UPDATE SET target.col3 = 'update'
+        |WHEN NOT MATCHED BY SOURCE THEN DELETE""".stripMargin
+    checkError(
+      exception = parseException(sql),
+      errorClass = "NON_LAST_NOT_MATCHED_BY_SOURCE_CLAUSE_OMIT_CONDITION",
+      parameters = Map.empty,
+      context = ExpectedContext(
+        fragment = sql,
+        start = 0,
+        stop = 531))
+  }
+
   test("merge into table: there must be a when (not) matched condition") {
     val sql =
       """MERGE INTO testcat1.ns1.ns2.tbl AS target
@@ -2569,6 +2663,8 @@ class DDLParserSuite extends AnalysisTest {
         |WHEN MATCHED AND (target.col2='update') THEN UPDATE SET target.col2 = DEFAULT
         |WHEN NOT MATCHED AND (target.col2='insert')
         |THEN INSERT (target.col1, target.col2) VALUES (source.col1, DEFAULT)
+        |WHEN NOT MATCHED BY SOURCE AND (target.col2='delete') THEN DELETE
+        |WHEN NOT MATCHED BY SOURCE AND (target.col2='update') THEN UPDATE SET target.col2 = DEFAULT
       """.stripMargin,
       MergeIntoTable(
         SubqueryAlias("target", UnresolvedRelation(Seq("testcat1", "ns1", "ns2", "tbl"))),
@@ -2580,7 +2676,11 @@ class DDLParserSuite extends AnalysisTest {
               UnresolvedAttribute("DEFAULT"))))),
         Seq(InsertAction(Some(EqualTo(UnresolvedAttribute("target.col2"), Literal("insert"))),
           Seq(Assignment(UnresolvedAttribute("target.col1"), UnresolvedAttribute("source.col1")),
-            Assignment(UnresolvedAttribute("target.col2"), UnresolvedAttribute("DEFAULT")))))))
+            Assignment(UnresolvedAttribute("target.col2"), UnresolvedAttribute("DEFAULT"))))),
+        Seq(DeleteAction(Some(EqualTo(UnresolvedAttribute("target.col2"), Literal("delete")))),
+          UpdateAction(Some(EqualTo(UnresolvedAttribute("target.col2"), Literal("update"))),
+            Seq(Assignment(UnresolvedAttribute("target.col2"),
+              UnresolvedAttribute("DEFAULT")))))))
   }
 
   test("SPARK-40944: Relax ordering constraint for CREATE TABLE column options") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
index a326811a797..a3c34b22a28 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -1441,31 +1441,35 @@ class PlanResolutionSuite extends AnalysisTest {
   }
 
   test("MERGE INTO TABLE - primary") {
-    def checkResolution(
+    def getAttributes(plan: LogicalPlan): (AttributeReference, AttributeReference) =
+      (plan.output.find(_.name == "i").get.asInstanceOf[AttributeReference],
+        plan.output.find(_.name == "s").get.asInstanceOf[AttributeReference])
+
+    def checkMergeConditionResolution(
         target: LogicalPlan,
         source: LogicalPlan,
-        mergeCondition: Expression,
-        deleteCondAttr: Option[AttributeReference],
-        updateCondAttr: Option[AttributeReference],
-        insertCondAttr: Option[AttributeReference],
-        updateAssigns: Seq[Assignment],
-        insertAssigns: Seq[Assignment],
-        starInUpdate: Boolean = false): Unit = {
-      val ti = target.output.find(_.name == "i").get.asInstanceOf[AttributeReference]
-      val ts = target.output.find(_.name == "s").get.asInstanceOf[AttributeReference]
-      val si = source.output.find(_.name == "i").get.asInstanceOf[AttributeReference]
-      val ss = source.output.find(_.name == "s").get.asInstanceOf[AttributeReference]
-
+        mergeCondition: Expression): Unit = {
+      val (si, _) = getAttributes(source)
+      val (ti, _) = getAttributes(target)
       mergeCondition match {
         case EqualTo(l: AttributeReference, r: AttributeReference) =>
           assert(l.sameRef(ti) && r.sameRef(si))
         case Literal(_, BooleanType) => // this is acceptable as a merge condition
         case other => fail("unexpected merge condition " + other)
       }
+    }
 
+    def checkMatchedClausesResolution(
+        target: LogicalPlan,
+        source: LogicalPlan,
+        deleteCondAttr: Option[AttributeReference],
+        updateCondAttr: Option[AttributeReference],
+        updateAssigns: Seq[Assignment],
+        starInUpdate: Boolean = false): Unit = {
+      val (si, ss) = getAttributes(source)
+      val (ti, ts) = getAttributes(target)
       deleteCondAttr.foreach(a => assert(a.sameRef(ts)))
       updateCondAttr.foreach(a => assert(a.sameRef(ts)))
-      insertCondAttr.foreach(a => assert(a.sameRef(ss)))
 
       if (starInUpdate) {
         assert(updateAssigns.size == 2)
@@ -1478,6 +1482,16 @@ class PlanResolutionSuite extends AnalysisTest {
         assert(updateAssigns.head.key.asInstanceOf[AttributeReference].sameRef(ts))
         assert(updateAssigns.head.value.asInstanceOf[AttributeReference].sameRef(ss))
       }
+    }
+
+    def checkNotMatchedClausesResolution(
+        target: LogicalPlan,
+        source: LogicalPlan,
+        insertCondAttr: Option[AttributeReference],
+        insertAssigns: Seq[Assignment]): Unit = {
+      val (si, ss) = getAttributes(source)
+      val (ti, ts) = getAttributes(target)
+      insertCondAttr.foreach(a => assert(a.sameRef(ss)))
       assert(insertAssigns.size == 2)
       assert(insertAssigns(0).key.asInstanceOf[AttributeReference].sameRef(ti))
       assert(insertAssigns(0).value.asInstanceOf[AttributeReference].sameRef(si))
@@ -1485,6 +1499,18 @@ class PlanResolutionSuite extends AnalysisTest {
       assert(insertAssigns(1).value.asInstanceOf[AttributeReference].sameRef(ss))
     }
 
+    def checkNotMatchedBySourceClausesResolution(
+        target: LogicalPlan,
+        deleteCondAttr: Option[AttributeReference],
+        updateCondAttr: Option[AttributeReference],
+        updateAssigns: Seq[Assignment]): Unit = {
+      val (_, ts) = getAttributes(target)
+      deleteCondAttr.foreach(a => assert(a.sameRef(ts)))
+      updateCondAttr.foreach(a => assert(a.sameRef(ts)))
+      assert(updateAssigns.size == 1)
+      assert(updateAssigns.head.key.asInstanceOf[AttributeReference].sameRef(ts))
+    }
+
     Seq(("v2Table", "v2Table1"), ("testcat.tab", "testcat.tab1")).foreach {
       case(target, source) =>
         // basic
@@ -1497,6 +1523,8 @@ class PlanResolutionSuite extends AnalysisTest {
              |WHEN MATCHED AND (target.s='update') THEN UPDATE SET target.s = source.s
              |WHEN NOT MATCHED AND (source.s='insert')
              |  THEN INSERT (target.i, target.s) values (source.i, source.s)
+             |WHEN NOT MATCHED BY SOURCE AND (target.s='delete') THEN DELETE
+             |WHEN NOT MATCHED BY SOURCE AND (target.s='update') THEN UPDATE SET target.s = 'delete'
            """.stripMargin
         parseAndResolve(sql1) match {
           case MergeIntoTable(
@@ -1507,9 +1535,15 @@ class PlanResolutionSuite extends AnalysisTest {
                 UpdateAction(Some(EqualTo(ul: AttributeReference, StringLiteral("update"))),
                   updateAssigns)),
               Seq(InsertAction(Some(EqualTo(il: AttributeReference, StringLiteral("insert"))),
-                insertAssigns))) =>
-            checkResolution(target, source, mergeCondition, Some(dl), Some(ul), Some(il),
-              updateAssigns, insertAssigns)
+                  insertAssigns)),
+              Seq(DeleteAction(Some(EqualTo(ndl: AttributeReference, StringLiteral("delete")))),
+                UpdateAction(Some(EqualTo(nul: AttributeReference, StringLiteral("update"))),
+                  notMatchedBySourceUpdateAssigns))) =>
+            checkMergeConditionResolution(target, source, mergeCondition)
+            checkMatchedClausesResolution(target, source, Some(dl), Some(ul), updateAssigns)
+            checkNotMatchedClausesResolution(target, source, Some(il), insertAssigns)
+            checkNotMatchedBySourceClausesResolution(target, Some(ndl), Some(nul),
+              notMatchedBySourceUpdateAssigns)
 
           case other => fail("Expect MergeIntoTable, but got:\n" + other.treeString)
         }
@@ -1533,16 +1567,20 @@ class PlanResolutionSuite extends AnalysisTest {
                 UpdateAction(Some(EqualTo(ul: AttributeReference,
                   StringLiteral("update"))), updateAssigns)),
               Seq(InsertAction(Some(EqualTo(il: AttributeReference, StringLiteral("insert"))),
-                insertAssigns))) =>
-            checkResolution(target, source, mergeCondition, Some(dl), Some(ul), Some(il),
-              updateAssigns, insertAssigns, starInUpdate = true)
+                  insertAssigns)),
+              Seq()) =>
+            checkMergeConditionResolution(target, source, mergeCondition)
+            checkMatchedClausesResolution(target, source, Some(dl), Some(ul), updateAssigns,
+              starInUpdate = true)
+            checkNotMatchedClausesResolution(target, source, Some(il), insertAssigns)
 
           case other => fail("Expect MergeIntoTable, but got:\n" + other.treeString)
         }
 
         // merge with star should get resolved into specific actions even if there
         // is no other unresolved expression in the merge
-        parseAndResolve(s"""
+        parseAndResolve(
+          s"""
              |MERGE INTO $target AS target
              |USING $source AS source
              |ON true
@@ -1554,10 +1592,12 @@ class PlanResolutionSuite extends AnalysisTest {
               SubqueryAlias(AliasIdentifier("source", Seq()), AsDataSourceV2Relation(source)),
               mergeCondition,
               Seq(UpdateAction(None, updateAssigns)),
-              Seq(InsertAction(None, insertAssigns))) =>
-
-            checkResolution(target, source, mergeCondition, None, None, None,
-              updateAssigns, insertAssigns, starInUpdate = true)
+              Seq(InsertAction(None, insertAssigns)),
+              Seq()) =>
+            checkMergeConditionResolution(target, source, mergeCondition)
+            checkMatchedClausesResolution(target, source, None, None, updateAssigns,
+              starInUpdate = true)
+            checkNotMatchedClausesResolution(target, source, None, insertAssigns)
 
           case other => fail("Expect MergeIntoTable, but got:\n" + other.treeString)
         }
@@ -1571,6 +1611,8 @@ class PlanResolutionSuite extends AnalysisTest {
              |WHEN MATCHED AND (target.s='delete') THEN DELETE
              |WHEN MATCHED THEN UPDATE SET target.s = source.s
              |WHEN NOT MATCHED THEN INSERT (target.i, target.s) values (source.i, source.s)
+             |WHEN NOT MATCHED BY SOURCE AND (target.s='delete') THEN DELETE
+             |WHEN NOT MATCHED BY SOURCE THEN UPDATE SET target.s = 'delete'
            """.stripMargin
         parseAndResolve(sql3) match {
           case MergeIntoTable(
@@ -1578,9 +1620,14 @@ class PlanResolutionSuite extends AnalysisTest {
               SubqueryAlias(AliasIdentifier("source", Seq()), AsDataSourceV2Relation(source)),
               mergeCondition,
               Seq(DeleteAction(Some(_)), UpdateAction(None, updateAssigns)),
-              Seq(InsertAction(None, insertAssigns))) =>
-            checkResolution(target, source, mergeCondition, None, None, None,
-              updateAssigns, insertAssigns)
+              Seq(InsertAction(None, insertAssigns)),
+              Seq(DeleteAction(Some(EqualTo(_: AttributeReference, StringLiteral("delete")))),
+                UpdateAction(None, notMatchedBySourceUpdateAssigns))) =>
+            checkMergeConditionResolution(target, source, mergeCondition)
+            checkMatchedClausesResolution(target, source, None, None, updateAssigns)
+            checkNotMatchedClausesResolution(target, source, None, insertAssigns)
+            checkNotMatchedBySourceClausesResolution(target, None, None,
+              notMatchedBySourceUpdateAssigns)
 
           case other => fail("Expect MergeIntoTable, but got:\n" + other.treeString)
         }
@@ -1595,6 +1642,8 @@ class PlanResolutionSuite extends AnalysisTest {
              |WHEN MATCHED AND (target.s='update') THEN UPDATE SET target.s = source.s
              |WHEN NOT MATCHED AND (source.s='insert')
              |  THEN INSERT (target.i, target.s) values (source.i, source.s)
+             |WHEN NOT MATCHED BY SOURCE AND (target.s='delete') THEN DELETE
+             |WHEN NOT MATCHED BY SOURCE AND (target.s='update') THEN UPDATE SET target.s = 'delete'
            """.stripMargin
         parseAndResolve(sql4) match {
           case MergeIntoTable(
@@ -1605,9 +1654,15 @@ class PlanResolutionSuite extends AnalysisTest {
                 UpdateAction(Some(EqualTo(ul: AttributeReference, StringLiteral("update"))),
                   updateAssigns)),
               Seq(InsertAction(Some(EqualTo(il: AttributeReference, StringLiteral("insert"))),
-                insertAssigns))) =>
-            checkResolution(target, source, mergeCondition, Some(dl), Some(ul), Some(il),
-              updateAssigns, insertAssigns)
+                  insertAssigns)),
+              Seq(DeleteAction(Some(EqualTo(ndl: AttributeReference, StringLiteral("delete")))),
+                UpdateAction(Some(EqualTo(nul: AttributeReference, StringLiteral("update"))),
+                  notMatchedBySourceUpdateAssigns))) =>
+            checkMergeConditionResolution(target, source, mergeCondition)
+            checkMatchedClausesResolution(target, source, Some(dl), Some(ul), updateAssigns)
+            checkNotMatchedClausesResolution(target, source, Some(il), insertAssigns)
+            checkNotMatchedBySourceClausesResolution(target, Some(ndl), Some(nul),
+              notMatchedBySourceUpdateAssigns)
 
           case other => fail("Expect MergeIntoTable, but got:\n" + other.treeString)
         }
@@ -1624,6 +1679,8 @@ class PlanResolutionSuite extends AnalysisTest {
              |WHEN MATCHED AND (target.s='update') THEN UPDATE SET target.s = source.s
              |WHEN NOT MATCHED AND (source.s='insert')
              |THEN INSERT (target.i, target.s) values (source.i, source.s)
+             |WHEN NOT MATCHED BY SOURCE AND (target.s='delete') THEN DELETE
+             |WHEN NOT MATCHED BY SOURCE AND (target.s='update') THEN UPDATE SET target.s = 'delete'
            """.stripMargin
         parseAndResolve(sql5) match {
           case MergeIntoTable(
@@ -1634,11 +1691,15 @@ class PlanResolutionSuite extends AnalysisTest {
                 UpdateAction(Some(EqualTo(ul: AttributeReference, StringLiteral("update"))),
                   updateAssigns)),
               Seq(InsertAction(Some(EqualTo(il: AttributeReference, StringLiteral("insert"))),
-                insertAssigns))) =>
-            assert(source.output.map(_.name) == Seq("s", "i"))
-            checkResolution(target, source, mergeCondition, Some(dl), Some(ul), Some(il),
-              updateAssigns, insertAssigns)
-
+                  insertAssigns)),
+              Seq(DeleteAction(Some(EqualTo(ndl: AttributeReference, StringLiteral("delete")))),
+                UpdateAction(Some(EqualTo(nul: AttributeReference, StringLiteral("update"))),
+                  notMatchedBySourceUpdateAssigns))) =>
+            checkMergeConditionResolution(target, source, mergeCondition)
+            checkMatchedClausesResolution(target, source, Some(dl), Some(ul), updateAssigns)
+            checkNotMatchedClausesResolution(target, source, Some(il), insertAssigns)
+            checkNotMatchedBySourceClausesResolution(target, Some(ndl), Some(nul),
+              notMatchedBySourceUpdateAssigns)
           case other => fail("Expect MergeIntoTable, but got:\n" + other.treeString)
         }
 
@@ -1656,6 +1717,8 @@ class PlanResolutionSuite extends AnalysisTest {
              |THEN UPDATE SET target.s = DEFAULT, target.i = target.i
              |WHEN NOT MATCHED AND (source.s='insert')
              |  THEN INSERT (target.i, target.s) values (DEFAULT, DEFAULT)
+             |WHEN NOT MATCHED BY SOURCE AND (target.s='delete') THEN DELETE
+             |WHEN NOT MATCHED BY SOURCE AND (target.s='update') THEN UPDATE SET target.s = DEFAULT
            """.stripMargin
         parseAndResolve(sql6) match {
           case m: MergeIntoTable =>
@@ -1699,6 +1762,19 @@ class PlanResolutionSuite extends AnalysisTest {
                 assert(s.name == "s")
               case other => fail("unexpected not matched action " + other)
             }
+            assert(m.notMatchedBySourceActions.length == 2)
+            m.notMatchedBySourceActions(0) match {
+              case DeleteAction(Some(EqualTo(_: AttributeReference, StringLiteral("delete")))) =>
+              case other => fail("unexpected first not matched by source action " + other)
+            }
+            m.notMatchedBySourceActions(1) match {
+              case UpdateAction(Some(EqualTo(_: AttributeReference, StringLiteral("update"))),
+                Seq(Assignment(_: AttributeReference,
+                  cast @ Cast(Literal(null, _), StringType, _, EvalMode.ANSI))))
+                if cast.getTagValue(Cast.BY_TABLE_INSERTION).isDefined =>
+              case other =>
+                fail("unexpected second not matched by source action " + other)
+            }
 
           case other =>
             fail("Expect MergeIntoTable, but got:\n" + other.treeString)
@@ -1715,7 +1791,10 @@ class PlanResolutionSuite extends AnalysisTest {
              |WHEN MATCHED AND (target.s = 31)
              |  THEN UPDATE SET target.s = DEFAULT
              |WHEN NOT MATCHED AND (source.s='insert')
-             |  THEN INSERT (target.i, target.s) values (DEFAULT, DEFAULT)""".stripMargin
+             |  THEN INSERT (target.i, target.s) values (DEFAULT, DEFAULT)
+             |WHEN NOT MATCHED BY SOURCE AND (target.s = 31) THEN DELETE
+             |WHEN NOT MATCHED BY SOURCE AND (target.s = 31)
+             |  THEN UPDATE SET target.s = DEFAULT""".stripMargin
         checkError(
           exception = intercept[AnalysisException] {
             parseAndResolve(mergeWithDefaultReferenceInMergeCondition)
@@ -1734,7 +1813,10 @@ class PlanResolutionSuite extends AnalysisTest {
              |WHEN MATCHED AND (target.s = 31)
              |  THEN UPDATE SET target.s = DEFAULT + 1
              |WHEN NOT MATCHED AND (source.s='insert')
-             |  THEN INSERT (target.i, target.s) values (DEFAULT, DEFAULT)""".stripMargin
+             |  THEN INSERT (target.i, target.s) values (DEFAULT, DEFAULT)
+             |WHEN NOT MATCHED BY SOURCE AND (target.s = 31) THEN DELETE
+             |WHEN NOT MATCHED BY SOURCE AND (target.s = 31)
+             |  THEN UPDATE SET target.s = DEFAULT + 1""".stripMargin
         checkError(
           exception = intercept[AnalysisException] {
             parseAndResolve(mergeWithDefaultReferenceAsPartOfComplexExpression)
@@ -1754,6 +1836,9 @@ class PlanResolutionSuite extends AnalysisTest {
            |  THEN UPDATE SET target.s = DEFAULT
            |WHEN NOT MATCHED AND (source.s='insert')
            |  THEN INSERT (target.s) values (DEFAULT)
+           |WHEN NOT MATCHED BY SOURCE AND (target.s = 32) THEN DELETE
+           |WHEN NOT MATCHED BY SOURCE AND (target.s = 32)
+           |  THEN UPDATE SET target.s = DEFAULT
              """.stripMargin
         parseAndResolve(mergeIntoTableWithColumnNamedDefault, withDefault = true) match {
           case m: MergeIntoTable =>
@@ -1768,6 +1853,7 @@ class PlanResolutionSuite extends AnalysisTest {
             }
             assert(m.matchedActions.length == 2)
             assert(m.notMatchedActions.length == 1)
+            assert(m.notMatchedBySourceActions.length == 2)
 
           case other =>
             fail("Expect MergeIntoTable, but got:\n" + other.treeString)
@@ -1788,6 +1874,9 @@ class PlanResolutionSuite extends AnalysisTest {
          |  THEN UPDATE SET target.s = DEFAULT
          |WHEN NOT MATCHED AND (source.s='insert')
          |  THEN INSERT (target.i, target.s) values (DEFAULT, DEFAULT)
+         |WHEN NOT MATCHED BY SOURCE AND (target.s = 31) THEN DELETE
+         |WHEN NOT MATCHED BY SOURCE AND (target.s = 31)
+         |  THEN UPDATE SET target.s = DEFAULT
            """.stripMargin
     parseAndResolve(mergeDefaultWithExplicitDefaultColumns, true) match {
       case m: MergeIntoTable =>
@@ -1823,6 +1912,16 @@ class PlanResolutionSuite extends AnalysisTest {
           Assignment(_: AttributeReference, Literal(42, IntegerType)))) =>
           case other => fail("unexpected not matched action " + other)
         }
+        assert(m.notMatchedBySourceActions.length == 2)
+        m.notMatchedBySourceActions(0) match {
+          case DeleteAction(Some(EqualTo(_: AttributeReference, Literal(31, IntegerType)))) =>
+          case other => fail("unexpected first not matched by source action " + other)
+        }
+        m.notMatchedBySourceActions(1) match {
+          case UpdateAction(Some(EqualTo(_: AttributeReference, Literal(31, IntegerType))),
+          Seq(Assignment(_: AttributeReference, Literal(42, IntegerType)))) =>
+          case other => fail("unexpected second not matched by source action " + other)
+        }
 
       case other =>
         fail("Expect MergeIntoTable, but got:\n" + other.treeString)
@@ -1842,6 +1941,8 @@ class PlanResolutionSuite extends AnalysisTest {
            |WHEN MATCHED AND (${target}.s='delete') THEN DELETE
            |WHEN MATCHED THEN UPDATE SET s = 1
            |WHEN NOT MATCHED AND (s = 'a') THEN INSERT (i) values (i)
+           |WHEN NOT MATCHED BY SOURCE AND (${target}.s='delete') THEN DELETE
+           |WHEN NOT MATCHED BY SOURCE THEN UPDATE SET s = 1
          """.stripMargin
 
       parseAndResolve(sql1) match {
@@ -1849,10 +1950,11 @@ class PlanResolutionSuite extends AnalysisTest {
             AsDataSourceV2Relation(target),
             AsDataSourceV2Relation(source),
             _,
-            Seq(DeleteAction(Some(_)), UpdateAction(None, updateAssigns)),
+            Seq(DeleteAction(Some(_)), UpdateAction(None, firstUpdateAssigns)),
             Seq(InsertAction(
               Some(EqualTo(il: AttributeReference, StringLiteral("a"))),
-              insertAssigns))) =>
+            insertAssigns)),
+            Seq(DeleteAction(Some(_)), UpdateAction(None, secondUpdateAssigns))) =>
           val ti = target.output.find(_.name == "i").get
           val ts = target.output.find(_.name == "s").get
           val si = source.output.find(_.name == "i").get
@@ -1860,14 +1962,17 @@ class PlanResolutionSuite extends AnalysisTest {
 
           // INSERT condition is resolved with source table only, so column `s` is not ambiguous.
           assert(il.sameRef(ss))
-          assert(updateAssigns.size == 1)
+          assert(firstUpdateAssigns.size == 1)
           // UPDATE key is resolved with target table only, so column `s` is not ambiguous.
-          assert(updateAssigns.head.key.asInstanceOf[AttributeReference].sameRef(ts))
+          assert(firstUpdateAssigns.head.key.asInstanceOf[AttributeReference].sameRef(ts))
           assert(insertAssigns.size == 1)
           // INSERT key is resolved with target table only, so column `i` is not ambiguous.
           assert(insertAssigns.head.key.asInstanceOf[AttributeReference].sameRef(ti))
           // INSERT value is resolved with source table only, so column `i` is not ambiguous.
           assert(insertAssigns.head.value.asInstanceOf[AttributeReference].sameRef(si))
+          assert(secondUpdateAssigns.size == 1)
+          // UPDATE key is resolved with target table only, so column `s` is not ambiguous.
+          assert(secondUpdateAssigns.head.key.asInstanceOf[AttributeReference].sameRef(ts))
 
         case p => fail("Expect MergeIntoTable, but got:\n" + p.treeString)
       }
@@ -1935,6 +2040,58 @@ class PlanResolutionSuite extends AnalysisTest {
           fragment = "s",
           start = 61 + target.length + source.length,
           stop = 61 + target.length + source.length))
+
+      val sql6 =
+        s"""
+           |MERGE INTO $target
+           |USING $source
+           |ON target.i = source.i
+           |WHEN NOT MATCHED BY SOURCE AND (s = 'b') THEN DELETE
+           |WHEN NOT MATCHED BY SOURCE AND (s = 'a') THEN UPDATE SET i = 1
+         """.stripMargin
+      // not matched by source clauses are resolved using the target table only, resolving columns
+      // is not ambiguous.
+      val parsed = parseAndResolve(sql6)
+      parsed match {
+        case MergeIntoTable(
+            AsDataSourceV2Relation(target),
+            _,
+            _,
+            Seq(),
+            Seq(),
+            notMatchedBySourceActions) =>
+          assert(notMatchedBySourceActions.length == 2)
+          notMatchedBySourceActions(0) match {
+            case DeleteAction(Some(EqualTo(dl: AttributeReference, StringLiteral("b")))) =>
+              // DELETE condition is resolved with target table only, so column `s` is not
+              // ambiguous.
+              val ts = target.output.find(_.name == "s").get
+              assert(dl.sameRef(ts))
+            case other => fail("unexpected first not matched by source action " + other)
+          }
+          notMatchedBySourceActions(1) match {
+            case UpdateAction(Some(EqualTo(ul: AttributeReference, StringLiteral("a"))),
+                Seq(Assignment(us: AttributeReference, IntegerLiteral(1)))) =>
+              // UPDATE condition and assignment are resolved with target table only, so column `s`
+              // and `i` are not ambiguous.
+              val ts = target.output.find(_.name == "s").get
+              val ti = target.output.find(_.name == "i").get
+              assert(ul.sameRef(ts))
+              assert(us.sameRef(ti))
+            case other => fail("unexpected second not matched by source action " + other)
+          }
+      }
+
+      val sql7 =
+        s"""
+           |MERGE INTO $target
+           |USING $source
+           |ON 1 = 1
+           |WHEN NOT MATCHED BY SOURCE THEN UPDATE SET $target.s = $source.s
+         """.stripMargin
+      // update value in not matched by source clause can only reference the target table.
+      val e7 = intercept[AnalysisException](parseAndResolve(sql7))
+      assert(e7.message.contains(s"cannot resolve $source.s in MERGE command"))
     }
 
     val sql1 =
@@ -1985,6 +2142,7 @@ class PlanResolutionSuite extends AnalysisTest {
         |ON 1 = 1
         |WHEN MATCHED THEN UPDATE SET c1='a', c2=1
         |WHEN NOT MATCHED THEN INSERT (c1, c2) VALUES ('b', 2)
+        |WHEN NOT MATCHED BY SOURCE THEN UPDATE SET c1='a', c2=1
         |""".stripMargin
     val parsed4 = parseAndResolve(sql4)
     parsed4 match {
@@ -2015,6 +2173,19 @@ class PlanResolutionSuite extends AnalysisTest {
             assert(s2.functionName == "varcharTypeWriteSideCheck")
           case other => fail("Expect UpdateAction, but got: " + other)
         }
+        assert(m.notMatchedBySourceActions.length == 1)
+        m.notMatchedBySourceActions.head match {
+          case UpdateAction(_, Seq(
+          Assignment(_, s1: StaticInvoke), Assignment(_, s2: StaticInvoke))) =>
+            assert(s1.arguments.length == 2)
+            assert(s1.functionName == "charTypeWriteSideCheck")
+            assert(s2.arguments.length == 2)
+            assert(s2.arguments.head.isInstanceOf[Cast])
+            val cast = s2.arguments.head.asInstanceOf[Cast]
+            assert(cast.getTagValue(Cast.BY_TABLE_INSERTION).isDefined)
+            assert(s2.functionName == "varcharTypeWriteSideCheck")
+          case other => fail("Expect UpdateAction, but got: " + other)
+        }
       case other => fail("Expect MergeIntoTable, but got:\n" + other.treeString)
     }
   }
@@ -2029,6 +2200,8 @@ class PlanResolutionSuite extends AnalysisTest {
          |WHEN MATCHED AND (target.s='update') THEN UPDATE SET target.s = source.s
          |WHEN NOT MATCHED AND (target.s=DEFAULT)
          |  THEN INSERT (target.i, target.s) values (source.i, source.s)
+         |WHEN NOT MATCHED BY SOURCE AND (target.s='delete') THEN DELETE
+         |WHEN NOT MATCHED BY SOURCE AND (target.s='update') THEN UPDATE SET target.s = target.i
        """.stripMargin
 
     parseAndResolve(sql) match {
@@ -2040,21 +2213,31 @@ class PlanResolutionSuite extends AnalysisTest {
             DeleteAction(Some(EqualTo(dl: UnresolvedAttribute, StringLiteral("delete")))),
             UpdateAction(
               Some(EqualTo(ul: UnresolvedAttribute, StringLiteral("update"))),
-              updateAssigns)),
+              firstUpdateAssigns)),
           Seq(
             InsertAction(
               Some(EqualTo(il: UnresolvedAttribute, UnresolvedAttribute(Seq("DEFAULT")))),
-              insertAssigns))) =>
+              insertAssigns)),
+          Seq(
+            DeleteAction(Some(EqualTo(ndl: UnresolvedAttribute, StringLiteral("delete")))),
+            UpdateAction(
+              Some(EqualTo(nul: UnresolvedAttribute, StringLiteral("update"))),
+              secondUpdateAssigns))) =>
         assert(l.name == "target.i" && r.name == "source.i")
         assert(dl.name == "target.s")
         assert(ul.name == "target.s")
         assert(il.name == "target.s")
-        assert(updateAssigns.size == 1)
-        assert(updateAssigns.head.key.asInstanceOf[UnresolvedAttribute].name == "target.s")
-        assert(updateAssigns.head.value.asInstanceOf[UnresolvedAttribute].name == "source.s")
+        assert(ndl.name == "target.s")
+        assert(nul.name == "target.s")
+        assert(firstUpdateAssigns.size == 1)
+        assert(firstUpdateAssigns.head.key.asInstanceOf[UnresolvedAttribute].name == "target.s")
+        assert(firstUpdateAssigns.head.value.asInstanceOf[UnresolvedAttribute].name == "source.s")
         assert(insertAssigns.size == 2)
         assert(insertAssigns.head.key.asInstanceOf[UnresolvedAttribute].name == "target.i")
         assert(insertAssigns.head.value.asInstanceOf[UnresolvedAttribute].name == "source.i")
+        assert(secondUpdateAssigns.size == 1)
+        assert(secondUpdateAssigns.head.key.asInstanceOf[UnresolvedAttribute].name == "target.s")
+        assert(secondUpdateAssigns.head.value.asInstanceOf[UnresolvedAttribute].name == "target.i")
 
       case l => fail("Expected unresolved MergeIntoTable, but got:\n" + l.treeString)
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org