You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/11/11 13:55:23 UTC

[spark] branch branch-3.0 updated: [SPARK-33412][SQL][3.0] OverwriteByExpression should resolve its delete condition based on the table relation not the input query

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 1e2984b  [SPARK-33412][SQL][3.0] OverwriteByExpression should resolve its delete condition based on the table relation not the input query
1e2984b is described below

commit 1e2984bc9525d38ddd31ade48f5caf83213fc853
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Wed Nov 11 13:50:39 2020 +0000

    [SPARK-33412][SQL][3.0] OverwriteByExpression should resolve its delete condition based on the table relation not the input query
    
    backport https://github.com/apache/spark/pull/30318 to 3.0
    
    Closes #30328 from cloud-fan/backport.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../apache/spark/sql/catalyst/analysis/Analyzer.scala   |  9 ++++-----
 .../spark/sql/catalyst/plans/logical/v2Commands.scala   |  3 ++-
 .../catalyst/analysis/DataSourceV2AnalysisSuite.scala   | 17 ++++++++++++-----
 3 files changed, 18 insertions(+), 11 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 2f30ba3..7795d70 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1363,11 +1363,10 @@ class Analyzer(
       case a @ Aggregate(groupingExprs, aggExprs, appendColumns: AppendColumns) =>
         a.mapExpressions(resolveExpressionTopDown(_, appendColumns))
 
-      case o: OverwriteByExpression
-          if !(o.table.resolved && o.query.resolved && o.outputResolved) =>
-        // do not resolve expression attributes until the query attributes are resolved against the
-        // table by ResolveOutputRelation. that rule will alias the attributes to the table's names.
-        o
+      case o: OverwriteByExpression if o.table.resolved =>
+        // The delete condition of `OverwriteByExpression` will be passed to the table
+        // implementation and should be resolved based on the table schema.
+        o.copy(deleteExpr = resolveExpressionBottomUp(o.deleteExpr, o.table))
 
       case m @ MergeIntoTable(targetTable, sourceTable, _, _, _)
         if !m.resolved && targetTable.resolved && sourceTable.resolved =>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index 35874a9..9077f7a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.plans.logical
 
 import org.apache.spark.sql.catalyst.analysis.{NamedRelation, UnresolvedException}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, Unevaluable}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, Unevaluable}
 import org.apache.spark.sql.catalyst.plans.DescribeTableSchema
 import org.apache.spark.sql.connector.catalog._
 import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, ColumnChange}
@@ -89,6 +89,7 @@ case class OverwriteByExpression(
   override lazy val resolved: Boolean = {
     table.resolved && query.resolved && outputResolved && deleteExpr.resolved
   }
+  override def inputSet: AttributeSet = AttributeSet(table.output)
 }
 
 object OverwriteByExpression {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala
index 5aa2b98..3810434 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala
@@ -588,9 +588,7 @@ abstract class DataSourceV2AnalysisBaseSuite extends AnalysisTest {
         Alias(Cast(a, DoubleType, Some(conf.sessionLocalTimeZone)), "x")(),
         Alias(Cast(b, DoubleType, Some(conf.sessionLocalTimeZone)), "y")()),
         query),
-      LessThanOrEqual(
-        AttributeReference("x", DoubleType, nullable = false)(x.exprId),
-        Literal(15.0d)))
+      LessThanOrEqual(x, Literal(15.0d)))
 
     assertNotResolved(parsedPlan)
     checkAnalysis(parsedPlan, expectedPlan)
@@ -598,7 +596,7 @@ abstract class DataSourceV2AnalysisBaseSuite extends AnalysisTest {
   }
 
   protected def testNotResolvedOverwriteByExpression(): Unit = {
-    val xRequiredTable = TestRelation(StructType(Seq(
+    val table = TestRelation(StructType(Seq(
       StructField("x", DoubleType, nullable = false),
       StructField("y", DoubleType))).toAttributes)
 
@@ -607,10 +605,19 @@ abstract class DataSourceV2AnalysisBaseSuite extends AnalysisTest {
       StructField("b", DoubleType))).toAttributes)
 
     // the write is resolved (checked above). this test plan is not because of the expression.
-    val parsedPlan = OverwriteByExpression.byPosition(xRequiredTable, query,
+    val parsedPlan = OverwriteByExpression.byPosition(table, query,
       LessThanOrEqual(UnresolvedAttribute(Seq("a")), Literal(15.0d)))
 
     assertNotResolved(parsedPlan)
     assertAnalysisError(parsedPlan, Seq("cannot resolve", "`a`", "given input columns", "x, y"))
+
+    val tableAcceptAnySchema = TestRelationAcceptAnySchema(StructType(Seq(
+      StructField("x", DoubleType, nullable = false),
+      StructField("y", DoubleType))).toAttributes)
+
+    val parsedPlan2 = OverwriteByExpression.byPosition(tableAcceptAnySchema, query,
+      LessThanOrEqual(UnresolvedAttribute(Seq("a")), Literal(15.0d)))
+    assertNotResolved(parsedPlan2)
+    assertAnalysisError(parsedPlan2, Seq("cannot resolve", "`a`", "given input columns", "x, y"))
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org