You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/02/24 07:24:27 UTC

[spark] branch branch-3.0 updated: [SPARK-30924][SQL][3.0] Add additional checks to Merge Into

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new d164ee3  [SPARK-30924][SQL][3.0] Add additional checks to Merge Into
d164ee3 is described below

commit d164ee3d44669ed717497b402776d567636d1f71
Author: Burak Yavuz <br...@gmail.com>
AuthorDate: Mon Feb 24 15:16:37 2020 +0800

    [SPARK-30924][SQL][3.0] Add additional checks to Merge Into
    
    ### What changes were proposed in this pull request?
    
    Merge Into is currently missing additional validation around:
    
     1. The lack of any WHEN statements
     2. The first WHEN MATCHED statement needs to have a condition if there are two WHEN MATCHED statements.
     3. Single use of UPDATE/DELETE
    
    This PR introduces these validations.
    (1) is required, because otherwise the MERGE statement is useless.
    (2) is required, because otherwise the second WHEN MATCHED condition becomes dead code
    (3) is up for debate, but the idea there is that a single expression should be sufficient to specify when you would like to update or delete your records. We restrict it for now to reduce surface area and ambiguity.
    
    ### Why are the changes needed?
    
    To ease DataSource developers when building implementations for MERGE
    
    ### Does this PR introduce any user-facing change?
    
    Adds additional validation checks
    
    ### How was this patch tested?
    
    Unit tests
    
    Closes #27677 from brkyvz/mergeChecks.
    
    Authored-by: Burak Yavuz <br...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 4ff2718d5448f212c0ede885c4072790c43bf826)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/catalyst/parser/AstBuilder.scala     | 12 ++++++
 .../spark/sql/catalyst/parser/DDLParserSuite.scala | 50 ++++++++++++++++++++++
 .../execution/command/PlanResolutionSuite.scala    | 10 ++---
 3 files changed, 67 insertions(+), 5 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 36c1647..2806fd4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -460,6 +460,18 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
         }
       }
     }
+    if (matchedActions.isEmpty && notMatchedActions.isEmpty) {
+      throw new ParseException("There must be at least one WHEN clause in a MERGE statement", ctx)
+    }
+    // children being empty means that the condition is not set
+    if (matchedActions.length == 2 && matchedActions.head.children.isEmpty) {
+      throw new ParseException("When there are 2 MATCHED clauses in a MERGE statement, " +
+        "the first MATCHED clause must have a condition", ctx)
+    }
+    if (matchedActions.groupBy(_.getClass).mapValues(_.size).exists(_._2 > 1)) {
+      throw new ParseException(
+        "UPDATE and DELETE can appear at most once in MATCHED clauses in a MERGE statement", ctx)
+    }
 
     MergeIntoTable(
       aliasedTarget,
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index bc7b51f..4a636bd 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -1144,6 +1144,56 @@ class DDLParserSuite extends AnalysisTest {
     assert(exc.getMessage.contains("There should be at most 1 'WHEN NOT MATCHED' clause."))
   }
 
+  test("merge into table: the first matched clause must have a condition if there's a second") {
+    val exc = intercept[ParseException] {
+      parsePlan(
+        """
+          |MERGE INTO testcat1.ns1.ns2.tbl AS target
+          |USING testcat2.ns1.ns2.tbl AS source
+          |ON target.col1 = source.col1
+          |WHEN MATCHED THEN DELETE
+          |WHEN MATCHED THEN UPDATE SET target.col2 = source.col2
+          |WHEN NOT MATCHED AND (target.col2='insert')
+          |THEN INSERT (target.col1, target.col2) values (source.col1, source.col2)
+        """.stripMargin)
+    }
+
+    assert(exc.getMessage.contains("the first MATCHED clause must have a condition"))
+  }
+
+  test("merge into table: there must be a when (not) matched condition") {
+    val exc = intercept[ParseException] {
+      parsePlan(
+        """
+          |MERGE INTO testcat1.ns1.ns2.tbl AS target
+          |USING testcat2.ns1.ns2.tbl AS source
+          |ON target.col1 = source.col1
+        """.stripMargin)
+    }
+
+    assert(exc.getMessage.contains("There must be at least one WHEN clause in a MERGE statement"))
+  }
+
+  test("merge into table: there can be only a single use DELETE or UPDATE") {
+    Seq("UPDATE SET *", "DELETE").foreach { op =>
+      val exc = intercept[ParseException] {
+        parsePlan(
+          s"""
+             |MERGE INTO testcat1.ns1.ns2.tbl AS target
+             |USING testcat2.ns1.ns2.tbl AS source
+             |ON target.col1 = source.col1
+             |WHEN MATCHED AND (target.col2='delete') THEN $op
+             |WHEN MATCHED THEN $op
+             |WHEN NOT MATCHED AND (target.col2='insert')
+             |THEN INSERT (target.col1, target.col2) values (source.col1, source.col2)
+           """.stripMargin)
+      }
+
+      assert(exc.getMessage.contains(
+        "UPDATE and DELETE can appear at most once in MATCHED clauses"))
+    }
+  }
+
   test("show tables") {
     comparePlans(
       parsePlan("SHOW TABLES"),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
index 21c698c..01d16ce 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -1277,7 +1277,7 @@ class PlanResolutionSuite extends AnalysisTest {
              |MERGE INTO $target AS target
              |USING $source AS source
              |ON target.i = source.i
-             |WHEN MATCHED THEN DELETE
+             |WHEN MATCHED AND (target.s='delete') THEN DELETE
              |WHEN MATCHED THEN UPDATE SET target.s = source.s
              |WHEN NOT MATCHED THEN INSERT (target.i, target.s) values (source.i, source.s)
            """.stripMargin
@@ -1286,7 +1286,7 @@ class PlanResolutionSuite extends AnalysisTest {
               SubqueryAlias(AliasIdentifier("target", Seq()), AsDataSourceV2Relation(target)),
               SubqueryAlias(AliasIdentifier("source", Seq()), AsDataSourceV2Relation(source)),
               mergeCondition,
-              Seq(DeleteAction(None), UpdateAction(None, updateAssigns)),
+              Seq(DeleteAction(Some(_)), UpdateAction(None, updateAssigns)),
               Seq(InsertAction(None, insertAssigns))) =>
             checkResolution(target, source, mergeCondition, None, None, None,
               updateAssigns, insertAssigns)
@@ -1364,7 +1364,7 @@ class PlanResolutionSuite extends AnalysisTest {
            |MERGE INTO $target
            |USING $source
            |ON 1 = 1
-           |WHEN MATCHED THEN DELETE
+           |WHEN MATCHED AND (${target}.s='delete') THEN DELETE
            |WHEN MATCHED THEN UPDATE SET s = 1
            |WHEN NOT MATCHED AND (s = 'a') THEN INSERT (i) values (i)
          """.stripMargin
@@ -1374,7 +1374,7 @@ class PlanResolutionSuite extends AnalysisTest {
             AsDataSourceV2Relation(target),
             AsDataSourceV2Relation(source),
             _,
-            Seq(DeleteAction(None), UpdateAction(None, updateAssigns)),
+            Seq(DeleteAction(Some(_)), UpdateAction(None, updateAssigns)),
             Seq(InsertAction(
               Some(EqualTo(il: AttributeReference, StringLiteral("a"))),
               insertAssigns))) =>
@@ -1451,7 +1451,7 @@ class PlanResolutionSuite extends AnalysisTest {
          |MERGE INTO non_exist_target
          |USING non_exist_source
          |ON target.i = source.i
-         |WHEN MATCHED THEN DELETE
+         |WHEN MATCHED AND (non_exist_target.s='delete') THEN DELETE
          |WHEN MATCHED THEN UPDATE SET *
          |WHEN NOT MATCHED THEN INSERT *
        """.stripMargin


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org