You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2020/01/23 03:21:15 UTC

[spark] branch master updated: [SPARK-30609] Allow default merge command resolution to be bypassed by DSv2 tables

This is an automated email from the ASF dual-hosted git repository.

tdas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d2bca8f  [SPARK-30609] Allow default merge command resolution to be bypassed by DSv2 tables
d2bca8f is described below

commit d2bca8ff70e6c82e915f633bb9f2f8a4582f7026
Author: Tathagata Das <ta...@gmail.com>
AuthorDate: Wed Jan 22 19:20:25 2020 -0800

    [SPARK-30609] Allow default merge command resolution to be bypassed by DSv2 tables
    
    ### What changes were proposed in this pull request?
    Skip resolving the merge expressions if the target is a DSv2 table with ACCEPT_ANY_SCHEMA capability.
    
    ### Why are the changes needed?
    Some DSv2 sources may want to customize the merge resolution logic. For example, a table that can accept any schema (TableCapability.ACCEPT_ANY_SCHEMA) may want to allow certain merge queries that are blocked (that is, throws AnalysisError) by the default resolution logic. So there should be a way to completely bypass the merge resolution logic in the Analyzer.
    
    ### Does this PR introduce any user-facing change?
    No, since merge itself is an unreleased feature
    
    ### How was this patch tested?
    added unit test to specifically test the skipping.
    
    Closes #27326 from tdas/SPARK-30609.
    
    Authored-by: Tathagata Das <ta...@gmail.com>
    Signed-off-by: Tathagata Das <ta...@gmail.com>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     | 62 +++++++++++++---------
 .../execution/command/PlanResolutionSuite.scala    | 52 +++++++++++++++++-
 2 files changed, 86 insertions(+), 28 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 7e9f85b..503dab1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1326,33 +1326,43 @@ class Analyzer(
 
       case m @ MergeIntoTable(targetTable, sourceTable, _, _, _)
         if !m.resolved && targetTable.resolved && sourceTable.resolved =>
-        val newMatchedActions = m.matchedActions.map {
-          case DeleteAction(deleteCondition) =>
-            val resolvedDeleteCondition = deleteCondition.map(resolveExpressionTopDown(_, m))
-            DeleteAction(resolvedDeleteCondition)
-          case UpdateAction(updateCondition, assignments) =>
-            val resolvedUpdateCondition = updateCondition.map(resolveExpressionTopDown(_, m))
-            // The update value can access columns from both target and source tables.
-            UpdateAction(
-              resolvedUpdateCondition,
-              resolveAssignments(assignments, m, resolveValuesWithSourceOnly = false))
-          case o => o
-        }
-        val newNotMatchedActions = m.notMatchedActions.map {
-          case InsertAction(insertCondition, assignments) =>
-            // The insert action is used when not matched, so its condition and value can only
-            // access columns from the source table.
-            val resolvedInsertCondition =
-              insertCondition.map(resolveExpressionTopDown(_, Project(Nil, m.sourceTable)))
-            InsertAction(
-              resolvedInsertCondition,
-              resolveAssignments(assignments, m, resolveValuesWithSourceOnly = true))
-          case o => o
+
+        EliminateSubqueryAliases(targetTable) match {
+          case r: NamedRelation if r.skipSchemaResolution =>
+            // Do not resolve the expression if the target table accepts any schema.
+            // This allows data sources to customize their own resolution logic using
+            // custom resolution rules.
+            m
+
+          case _ =>
+            val newMatchedActions = m.matchedActions.map {
+              case DeleteAction(deleteCondition) =>
+                val resolvedDeleteCondition = deleteCondition.map(resolveExpressionTopDown(_, m))
+                DeleteAction(resolvedDeleteCondition)
+              case UpdateAction(updateCondition, assignments) =>
+                val resolvedUpdateCondition = updateCondition.map(resolveExpressionTopDown(_, m))
+                // The update value can access columns from both target and source tables.
+                UpdateAction(
+                  resolvedUpdateCondition,
+                  resolveAssignments(assignments, m, resolveValuesWithSourceOnly = false))
+              case o => o
+            }
+            val newNotMatchedActions = m.notMatchedActions.map {
+              case InsertAction(insertCondition, assignments) =>
+                // The insert action is used when not matched, so its condition and value can only
+                // access columns from the source table.
+                val resolvedInsertCondition =
+                  insertCondition.map(resolveExpressionTopDown(_, Project(Nil, m.sourceTable)))
+                InsertAction(
+                  resolvedInsertCondition,
+                  resolveAssignments(assignments, m, resolveValuesWithSourceOnly = true))
+              case o => o
+            }
+            val resolvedMergeCondition = resolveExpressionTopDown(m.mergeCondition, m)
+            m.copy(mergeCondition = resolvedMergeCondition,
+              matchedActions = newMatchedActions,
+              notMatchedActions = newNotMatchedActions)
         }
-        val resolvedMergeCondition = resolveExpressionTopDown(m.mergeCondition, m)
-        m.copy(mergeCondition = resolvedMergeCondition,
-          matchedActions = newMatchedActions,
-          notMatchedActions = newNotMatchedActions)
 
       case q: LogicalPlan =>
         logTrace(s"Attempting to resolve ${q.simpleString(SQLConf.get.maxToStringFields)}")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
index 8c73b36..35b2003 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.execution.command
 
 import java.net.URI
-import java.util.Locale
+import java.util.{Collections, Locale}
 
 import org.mockito.ArgumentMatchers.any
 import org.mockito.Mockito.{mock, when}
@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, E
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, Assignment, CreateTableAsSelect, CreateV2Table, DeleteAction, DeleteFromTable, DescribeRelation, DropTable, InsertAction, LocalRelation, LogicalPlan, MergeIntoTable, OneRowRelation, Project, SubqueryAlias, UpdateAction, UpdateTable}
 import org.apache.spark.sql.connector.InMemoryTableProvider
-import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, Identifier, Table, TableCatalog, TableChange, V1Table}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, Identifier, Table, TableCapability, TableCatalog, TableChange, V1Table}
 import org.apache.spark.sql.execution.datasources.CreateTable
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.internal.SQLConf
@@ -49,6 +49,13 @@ class PlanResolutionSuite extends AnalysisTest {
     t
   }
 
+  private val tableWithAcceptAnySchemaCapability: Table = {
+    val t = mock(classOf[Table])
+    when(t.schema()).thenReturn(new StructType().add("i", "int"))
+    when(t.capabilities()).thenReturn(Collections.singleton(TableCapability.ACCEPT_ANY_SCHEMA))
+    t
+  }
+
   private val v1Table: V1Table = {
     val t = mock(classOf[CatalogTable])
     when(t.schema).thenReturn(new StructType().add("i", "int").add("s", "string"))
@@ -77,6 +84,7 @@ class PlanResolutionSuite extends AnalysisTest {
         case "v1Table1" => v1Table
         case "v2Table" => table
         case "v2Table1" => table
+        case "v2TableWithAcceptAnySchemaCapability" => tableWithAcceptAnySchemaCapability
         case name => throw new NoSuchTableException(name)
       }
     })
@@ -1351,5 +1359,45 @@ class PlanResolutionSuite extends AnalysisTest {
     }
   }
 
+  test("MERGE INTO TABLE - skip resolution on v2 tables that accept any schema") {
+    val sql =
+      s"""
+         |MERGE INTO v2TableWithAcceptAnySchemaCapability AS target
+         |USING v2Table AS source
+         |ON target.i = source.i
+         |WHEN MATCHED AND (target.s='delete') THEN DELETE
+         |WHEN MATCHED AND (target.s='update') THEN UPDATE SET target.s = source.s
+         |WHEN NOT MATCHED AND (target.s='insert')
+         |  THEN INSERT (target.i, target.s) values (source.i, source.s)
+       """.stripMargin
+
+    parseAndResolve(sql) match {
+      case MergeIntoTable(
+          SubqueryAlias(AliasIdentifier("target", None), _: DataSourceV2Relation),
+          SubqueryAlias(AliasIdentifier("source", None), _: DataSourceV2Relation),
+          EqualTo(l: UnresolvedAttribute, r: UnresolvedAttribute),
+          Seq(
+            DeleteAction(Some(EqualTo(dl: UnresolvedAttribute, StringLiteral("delete")))),
+            UpdateAction(
+              Some(EqualTo(ul: UnresolvedAttribute, StringLiteral("update"))),
+              updateAssigns)),
+          Seq(
+            InsertAction(
+              Some(EqualTo(il: UnresolvedAttribute, StringLiteral("insert"))),
+              insertAssigns))) =>
+        assert(l.name == "target.i" && r.name == "source.i")
+        assert(dl.name == "target.s")
+        assert(ul.name == "target.s")
+        assert(il.name == "target.s")
+        assert(updateAssigns.size == 1)
+        assert(updateAssigns.head.key.asInstanceOf[UnresolvedAttribute].name == "target.s")
+        assert(updateAssigns.head.value.asInstanceOf[UnresolvedAttribute].name == "source.s")
+        assert(insertAssigns.size == 2)
+        assert(insertAssigns.head.key.asInstanceOf[UnresolvedAttribute].name == "target.i")
+        assert(insertAssigns.head.value.asInstanceOf[UnresolvedAttribute].name == "source.i")
+
+      case l => fail("Expected unresolved MergeIntoTable, but got:\n" + l.treeString)
+    }
+  }
   // TODO: add tests for more commands.
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org