You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/11/11 13:55:23 UTC
[spark] branch branch-3.0 updated: [SPARK-33412][SQL][3.0]
OverwriteByExpression should resolve its delete condition based on the
table relation not the input query
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 1e2984b [SPARK-33412][SQL][3.0] OverwriteByExpression should resolve its delete condition based on the table relation not the input query
1e2984b is described below
commit 1e2984bc9525d38ddd31ade48f5caf83213fc853
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Wed Nov 11 13:50:39 2020 +0000
[SPARK-33412][SQL][3.0] OverwriteByExpression should resolve its delete condition based on the table relation not the input query
backport https://github.com/apache/spark/pull/30318 to 3.0
Closes #30328 from cloud-fan/backport.
Authored-by: Wenchen Fan <we...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../apache/spark/sql/catalyst/analysis/Analyzer.scala | 9 ++++-----
.../spark/sql/catalyst/plans/logical/v2Commands.scala | 3 ++-
.../catalyst/analysis/DataSourceV2AnalysisSuite.scala | 17 ++++++++++++-----
3 files changed, 18 insertions(+), 11 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 2f30ba3..7795d70 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1363,11 +1363,10 @@ class Analyzer(
case a @ Aggregate(groupingExprs, aggExprs, appendColumns: AppendColumns) =>
a.mapExpressions(resolveExpressionTopDown(_, appendColumns))
- case o: OverwriteByExpression
- if !(o.table.resolved && o.query.resolved && o.outputResolved) =>
- // do not resolve expression attributes until the query attributes are resolved against the
- // table by ResolveOutputRelation. that rule will alias the attributes to the table's names.
- o
+ case o: OverwriteByExpression if o.table.resolved =>
+ // The delete condition of `OverwriteByExpression` will be passed to the table
+ // implementation and should be resolved based on the table schema.
+ o.copy(deleteExpr = resolveExpressionBottomUp(o.deleteExpr, o.table))
case m @ MergeIntoTable(targetTable, sourceTable, _, _, _)
if !m.resolved && targetTable.resolved && sourceTable.resolved =>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index 35874a9..9077f7a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.analysis.{NamedRelation, UnresolvedException}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, Unevaluable}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, Unevaluable}
import org.apache.spark.sql.catalyst.plans.DescribeTableSchema
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, ColumnChange}
@@ -89,6 +89,7 @@ case class OverwriteByExpression(
override lazy val resolved: Boolean = {
table.resolved && query.resolved && outputResolved && deleteExpr.resolved
}
+ override def inputSet: AttributeSet = AttributeSet(table.output)
}
object OverwriteByExpression {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala
index 5aa2b98..3810434 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala
@@ -588,9 +588,7 @@ abstract class DataSourceV2AnalysisBaseSuite extends AnalysisTest {
Alias(Cast(a, DoubleType, Some(conf.sessionLocalTimeZone)), "x")(),
Alias(Cast(b, DoubleType, Some(conf.sessionLocalTimeZone)), "y")()),
query),
- LessThanOrEqual(
- AttributeReference("x", DoubleType, nullable = false)(x.exprId),
- Literal(15.0d)))
+ LessThanOrEqual(x, Literal(15.0d)))
assertNotResolved(parsedPlan)
checkAnalysis(parsedPlan, expectedPlan)
@@ -598,7 +596,7 @@ abstract class DataSourceV2AnalysisBaseSuite extends AnalysisTest {
}
protected def testNotResolvedOverwriteByExpression(): Unit = {
- val xRequiredTable = TestRelation(StructType(Seq(
+ val table = TestRelation(StructType(Seq(
StructField("x", DoubleType, nullable = false),
StructField("y", DoubleType))).toAttributes)
@@ -607,10 +605,19 @@ abstract class DataSourceV2AnalysisBaseSuite extends AnalysisTest {
StructField("b", DoubleType))).toAttributes)
// the write is resolved (checked above). this test plan is not because of the expression.
- val parsedPlan = OverwriteByExpression.byPosition(xRequiredTable, query,
+ val parsedPlan = OverwriteByExpression.byPosition(table, query,
LessThanOrEqual(UnresolvedAttribute(Seq("a")), Literal(15.0d)))
assertNotResolved(parsedPlan)
assertAnalysisError(parsedPlan, Seq("cannot resolve", "`a`", "given input columns", "x, y"))
+
+ val tableAcceptAnySchema = TestRelationAcceptAnySchema(StructType(Seq(
+ StructField("x", DoubleType, nullable = false),
+ StructField("y", DoubleType))).toAttributes)
+
+ val parsedPlan2 = OverwriteByExpression.byPosition(tableAcceptAnySchema, query,
+ LessThanOrEqual(UnresolvedAttribute(Seq("a")), Literal(15.0d)))
+ assertNotResolved(parsedPlan2)
+ assertAnalysisError(parsedPlan2, Seq("cannot resolve", "`a`", "given input columns", "x, y"))
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org