You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/07/14 05:22:51 UTC

[spark] branch master updated: [SPARK-39672][SQL][3.1] Fix removing project before filter with correlated subquery

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c8c280cb631 [SPARK-39672][SQL][3.1] Fix removing project before filter with correlated subquery
c8c280cb631 is described below

commit c8c280cb631f61fee9a2876428659947d5a0634a
Author: tianlzhang <ti...@ebay.com>
AuthorDate: Thu Jul 14 12:49:57 2022 +0800

    [SPARK-39672][SQL][3.1] Fix removing project before filter with correlated subquery
    
    Add more checks to`removeProjectBeforeFilter` in `ColumnPruning` and only remove the project if
    1. the filter condition contains correlated subquery
    2. same attribute exists in both output of child of Project and subquery
    
    This is a legitimate self-join query and should not throw exception when de-duplicating attributes in subquery and outer values.
    
    ```sql
    select * from
    (
    select v1.a, v1.b, v2.c
    from v1
    inner join v2
    on v1.a=v2.a) t3
    where not exists (
      select 1
      from v2
      where t3.a=v2.a and t3.b=v2.b and t3.c=v2.c
    )
    ```
    
    Here's what happens with the current code. The above query is analyzed into following `LogicalPlan` before `ColumnPruning`.
    ```
    Project [a#250, b#251, c#268]
    +- Filter NOT exists#272 [(a#250 = a#266) && (b#251 = b#267) && (c#268 = c#268#277)]
       :  +- Project [1 AS 1#273, _1#259 AS a#266, _2#260 AS b#267, _3#261 AS c#268#277]
       :     +- LocalRelation [_1#259, _2#260, _3#261]
       +- Project [a#250, b#251, c#268]
          +- Join Inner, (a#250 = a#266)
             :- Project [a#250, b#251]
             :  +- Project [_1#243 AS a#250, _2#244 AS b#251]
             :     +- LocalRelation [_1#243, _2#244, _3#245]
             +- Project [a#266, c#268]
                +- Project [_1#259 AS a#266, _3#261 AS c#268]
                   +- LocalRelation [_1#259, _2#260, _3#261]
    ```
    
    Then in `ColumnPruning`, the Project before Filter (between Filter and Join) is removed. This changes the `outputSet` of the child of Filter among which the same attribute also exists in the subquery. Later, when `RewritePredicateSubquery` de-duplicates conflicting attributes, it would complain `Found conflicting attributes a#266 in the condition joining outer plan`.
    
    No.
    
    Add UT.
    
    Closes #37074 from manuzhang/spark-39672.
    
    Lead-authored-by: tianlzhang <ti...@ebay.com>
    Co-authored-by: Manu Zhang <Ow...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 36fc73e7c42b84e05b15b2caecc0f804610dce20)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala   | 16 +++++-
 .../scala/org/apache/spark/sql/SubquerySuite.scala | 58 +++++++++++++++++++++-
 2 files changed, 71 insertions(+), 3 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 6afb6a5424b..78fb8b5de88 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -943,12 +943,24 @@ object ColumnPruning extends Rule[LogicalPlan] {
    * order, otherwise lower Projects can be missed.
    */
   private def removeProjectBeforeFilter(plan: LogicalPlan): LogicalPlan = plan transformUp {
-    case p1 @ Project(_, f @ Filter(_, p2 @ Project(_, child)))
+    case p1 @ Project(_, f @ Filter(e, p2 @ Project(_, child)))
       if p2.outputSet.subsetOf(child.outputSet) &&
         // We only remove attribute-only project.
-        p2.projectList.forall(_.isInstanceOf[AttributeReference]) =>
+        p2.projectList.forall(_.isInstanceOf[AttributeReference]) &&
+        // We can't remove project when the child has conflicting attributes
+        // with the subquery in filter predicate
+        !hasConflictingAttrsWithSubquery(e, child) =>
       p1.copy(child = f.copy(child = child))
   }
+
+  private def hasConflictingAttrsWithSubquery(
+      predicate: Expression,
+      child: LogicalPlan): Boolean = {
+    predicate.find {
+      case s: SubqueryExpression if s.plan.outputSet.intersect(child.outputSet).nonEmpty => true
+      case _ => false
+    }.isDefined
+  }
 }
 
 /**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index fa24e8d175b..1ae5ae68d07 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
-import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Sort}
+import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Project, Sort}
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, DisableAdaptiveExecution}
 import org.apache.spark.sql.execution.datasources.FileScanRDD
@@ -2220,4 +2220,60 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
       }
     }
   }
+
+  test("SPARK-39672: Fix removing project before filter with correlated subquery") {
+    withTempView("v1", "v2") {
+      Seq((1, 2, 3), (4, 5, 6)).toDF("a", "b", "c").createTempView("v1")
+      Seq((1, 3, 5), (4, 5, 6)).toDF("a", "b", "c").createTempView("v2")
+
+      def findProject(df: DataFrame): Seq[Project] = {
+        df.queryExecution.optimizedPlan.collect {
+          case p: Project => p
+        }
+      }
+
+      // project before filter cannot be removed since subquery has conflicting attributes
+      // with outer reference
+      val df1 = sql(
+        """
+         |select * from
+         |(
+         |select
+         |v1.a,
+         |v1.b,
+         |v2.c
+         |from v1
+         |inner join v2
+         |on v1.a=v2.a) t3
+         |where not exists (
+         |  select 1
+         |  from v2
+         |  where t3.a=v2.a and t3.b=v2.b and t3.c=v2.c
+         |)
+         |""".stripMargin)
+      checkAnswer(df1, Row(1, 2, 5))
+      assert(findProject(df1).size == 4)
+
+      // project before filter can be removed when there are no conflicting attributes
+      val df2 = sql(
+        """
+         |select * from
+         |(
+         |select
+         |v1.b,
+         |v2.c
+         |from v1
+         |inner join v2
+         |on v1.b=v2.c) t3
+         |where not exists (
+         |  select 1
+         |  from v2
+         |  where t3.b=v2.b and t3.c=v2.c
+         |)
+         |""".stripMargin)
+
+      checkAnswer(df2, Row(5, 5))
+      assert(findProject(df2).size == 3)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org