You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2020/01/23 03:21:15 UTC
[spark] branch master updated: [SPARK-30609] Allow default merge
command resolution to be bypassed by DSv2 tables
This is an automated email from the ASF dual-hosted git repository.
tdas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d2bca8f [SPARK-30609] Allow default merge command resolution to be bypassed by DSv2 tables
d2bca8f is described below
commit d2bca8ff70e6c82e915f633bb9f2f8a4582f7026
Author: Tathagata Das <ta...@gmail.com>
AuthorDate: Wed Jan 22 19:20:25 2020 -0800
[SPARK-30609] Allow default merge command resolution to be bypassed by DSv2 tables
### What changes were proposed in this pull request?
Skip resolving the merge expressions if the target is a DSv2 table with ACCEPT_ANY_SCHEMA capability.
### Why are the changes needed?
Some DSv2 sources may want to customize the merge resolution logic. For example, a table that can accept any schema (TableCapability.ACCEPT_ANY_SCHEMA) may want to allow certain merge queries that are blocked (that is, throws AnalysisError) by the default resolution logic. So there should be a way to completely bypass the merge resolution logic in the Analyzer.
### Does this PR introduce any user-facing change?
No, since merge itself is an unreleased feature
### How was this patch tested?
added unit test to specifically test the skipping.
Closes #27326 from tdas/SPARK-30609.
Authored-by: Tathagata Das <ta...@gmail.com>
Signed-off-by: Tathagata Das <ta...@gmail.com>
---
.../spark/sql/catalyst/analysis/Analyzer.scala | 62 +++++++++++++---------
.../execution/command/PlanResolutionSuite.scala | 52 +++++++++++++++++-
2 files changed, 86 insertions(+), 28 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 7e9f85b..503dab1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1326,33 +1326,43 @@ class Analyzer(
case m @ MergeIntoTable(targetTable, sourceTable, _, _, _)
if !m.resolved && targetTable.resolved && sourceTable.resolved =>
- val newMatchedActions = m.matchedActions.map {
- case DeleteAction(deleteCondition) =>
- val resolvedDeleteCondition = deleteCondition.map(resolveExpressionTopDown(_, m))
- DeleteAction(resolvedDeleteCondition)
- case UpdateAction(updateCondition, assignments) =>
- val resolvedUpdateCondition = updateCondition.map(resolveExpressionTopDown(_, m))
- // The update value can access columns from both target and source tables.
- UpdateAction(
- resolvedUpdateCondition,
- resolveAssignments(assignments, m, resolveValuesWithSourceOnly = false))
- case o => o
- }
- val newNotMatchedActions = m.notMatchedActions.map {
- case InsertAction(insertCondition, assignments) =>
- // The insert action is used when not matched, so its condition and value can only
- // access columns from the source table.
- val resolvedInsertCondition =
- insertCondition.map(resolveExpressionTopDown(_, Project(Nil, m.sourceTable)))
- InsertAction(
- resolvedInsertCondition,
- resolveAssignments(assignments, m, resolveValuesWithSourceOnly = true))
- case o => o
+
+ EliminateSubqueryAliases(targetTable) match {
+ case r: NamedRelation if r.skipSchemaResolution =>
+ // Do not resolve the expression if the target table accepts any schema.
+ // This allows data sources to customize their own resolution logic using
+ // custom resolution rules.
+ m
+
+ case _ =>
+ val newMatchedActions = m.matchedActions.map {
+ case DeleteAction(deleteCondition) =>
+ val resolvedDeleteCondition = deleteCondition.map(resolveExpressionTopDown(_, m))
+ DeleteAction(resolvedDeleteCondition)
+ case UpdateAction(updateCondition, assignments) =>
+ val resolvedUpdateCondition = updateCondition.map(resolveExpressionTopDown(_, m))
+ // The update value can access columns from both target and source tables.
+ UpdateAction(
+ resolvedUpdateCondition,
+ resolveAssignments(assignments, m, resolveValuesWithSourceOnly = false))
+ case o => o
+ }
+ val newNotMatchedActions = m.notMatchedActions.map {
+ case InsertAction(insertCondition, assignments) =>
+ // The insert action is used when not matched, so its condition and value can only
+ // access columns from the source table.
+ val resolvedInsertCondition =
+ insertCondition.map(resolveExpressionTopDown(_, Project(Nil, m.sourceTable)))
+ InsertAction(
+ resolvedInsertCondition,
+ resolveAssignments(assignments, m, resolveValuesWithSourceOnly = true))
+ case o => o
+ }
+ val resolvedMergeCondition = resolveExpressionTopDown(m.mergeCondition, m)
+ m.copy(mergeCondition = resolvedMergeCondition,
+ matchedActions = newMatchedActions,
+ notMatchedActions = newNotMatchedActions)
}
- val resolvedMergeCondition = resolveExpressionTopDown(m.mergeCondition, m)
- m.copy(mergeCondition = resolvedMergeCondition,
- matchedActions = newMatchedActions,
- notMatchedActions = newNotMatchedActions)
case q: LogicalPlan =>
logTrace(s"Attempting to resolve ${q.simpleString(SQLConf.get.maxToStringFields)}")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
index 8c73b36..35b2003 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.command
import java.net.URI
-import java.util.Locale
+import java.util.{Collections, Locale}
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{mock, when}
@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, E
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, Assignment, CreateTableAsSelect, CreateV2Table, DeleteAction, DeleteFromTable, DescribeRelation, DropTable, InsertAction, LocalRelation, LogicalPlan, MergeIntoTable, OneRowRelation, Project, SubqueryAlias, UpdateAction, UpdateTable}
import org.apache.spark.sql.connector.InMemoryTableProvider
-import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, Identifier, Table, TableCatalog, TableChange, V1Table}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, Identifier, Table, TableCapability, TableCatalog, TableChange, V1Table}
import org.apache.spark.sql.execution.datasources.CreateTable
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.internal.SQLConf
@@ -49,6 +49,13 @@ class PlanResolutionSuite extends AnalysisTest {
t
}
+ private val tableWithAcceptAnySchemaCapability: Table = {
+ val t = mock(classOf[Table])
+ when(t.schema()).thenReturn(new StructType().add("i", "int"))
+ when(t.capabilities()).thenReturn(Collections.singleton(TableCapability.ACCEPT_ANY_SCHEMA))
+ t
+ }
+
private val v1Table: V1Table = {
val t = mock(classOf[CatalogTable])
when(t.schema).thenReturn(new StructType().add("i", "int").add("s", "string"))
@@ -77,6 +84,7 @@ class PlanResolutionSuite extends AnalysisTest {
case "v1Table1" => v1Table
case "v2Table" => table
case "v2Table1" => table
+ case "v2TableWithAcceptAnySchemaCapability" => tableWithAcceptAnySchemaCapability
case name => throw new NoSuchTableException(name)
}
})
@@ -1351,5 +1359,45 @@ class PlanResolutionSuite extends AnalysisTest {
}
}
+ test("MERGE INTO TABLE - skip resolution on v2 tables that accept any schema") {
+ val sql =
+ s"""
+ |MERGE INTO v2TableWithAcceptAnySchemaCapability AS target
+ |USING v2Table AS source
+ |ON target.i = source.i
+ |WHEN MATCHED AND (target.s='delete') THEN DELETE
+ |WHEN MATCHED AND (target.s='update') THEN UPDATE SET target.s = source.s
+ |WHEN NOT MATCHED AND (target.s='insert')
+ | THEN INSERT (target.i, target.s) values (source.i, source.s)
+ """.stripMargin
+
+ parseAndResolve(sql) match {
+ case MergeIntoTable(
+ SubqueryAlias(AliasIdentifier("target", None), _: DataSourceV2Relation),
+ SubqueryAlias(AliasIdentifier("source", None), _: DataSourceV2Relation),
+ EqualTo(l: UnresolvedAttribute, r: UnresolvedAttribute),
+ Seq(
+ DeleteAction(Some(EqualTo(dl: UnresolvedAttribute, StringLiteral("delete")))),
+ UpdateAction(
+ Some(EqualTo(ul: UnresolvedAttribute, StringLiteral("update"))),
+ updateAssigns)),
+ Seq(
+ InsertAction(
+ Some(EqualTo(il: UnresolvedAttribute, StringLiteral("insert"))),
+ insertAssigns))) =>
+ assert(l.name == "target.i" && r.name == "source.i")
+ assert(dl.name == "target.s")
+ assert(ul.name == "target.s")
+ assert(il.name == "target.s")
+ assert(updateAssigns.size == 1)
+ assert(updateAssigns.head.key.asInstanceOf[UnresolvedAttribute].name == "target.s")
+ assert(updateAssigns.head.value.asInstanceOf[UnresolvedAttribute].name == "source.s")
+ assert(insertAssigns.size == 2)
+ assert(insertAssigns.head.key.asInstanceOf[UnresolvedAttribute].name == "target.i")
+ assert(insertAssigns.head.value.asInstanceOf[UnresolvedAttribute].name == "source.i")
+
+ case l => fail("Expected unresolved MergeIntoTable, but got:\n" + l.treeString)
+ }
+ }
// TODO: add tests for more commands.
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org