You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/12/22 04:31:17 UTC

[GitHub] [spark] beliefer opened a new pull request, #39170: [WIP][SPARK-41674][SQL] Runtime filter should supports the any side of child join as filter creation side

beliefer opened a new pull request, #39170:
URL: https://github.com/apache/spark/pull/39170

   ### What changes were proposed in this pull request?
   Currently, Spark runtime filter supports two scenes.
   1. When the Join itself is a Shuffle Join;
   2. When the join itself is a broadcast join and there is a Shuffle Join under one end of the join.3. 
   
   This PR want let runtime filter supports the third scene.
   To facilitate understanding, the below SQL is taken as an example.
   ```
   SELECT *
   FROM bf1
       JOIN bf2
       JOIN bf3
       ON bf1.c1 = bf2.c2
           AND bf3.c3 = bf2.c2
   WHERE bf2.a2 = 5
   ```
   The current only add runtime filter(subquery on bf2) on bf1 (the first scene). In fact, we can apply the runtime filter(subquery on bf2) on bf3 too.
   
   
   ### Why are the changes needed?
   1. Improve the supported scene for runtime filter 
   2. Reduct the data size for shuffle and improve the performance.
   
   ### Does this PR introduce _any_ user-facing change?
   'No'.
   Just update the inner implementation.
   
   
   ### How was this patch tested?
   New tests.
   Micro benchmark.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #39170: [SPARK-41674][SQL] Runtime filter should supports multi level shuffle join side as filter creation side

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #39170:
URL: https://github.com/apache/spark/pull/39170#discussion_r1162828030


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -159,6 +182,24 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
       REGEXP_EXTRACT_FAMILY, REGEXP_REPLACE)
   }
 
+  private def isLeftSideSuperset(
+      joinType: JoinType,
+      left: LogicalPlan,
+      filterCreationSideExp: Expression): Boolean = joinType match {
+    case Inner | LeftSemi | LeftAnti | LeftOuter | RightOuter =>

Review Comment:
   ```suggestion
       case Inner | LeftSemi | LeftAnti | LeftOuter =>
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -159,6 +182,24 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
       REGEXP_EXTRACT_FAMILY, REGEXP_REPLACE)
   }
 
+  private def isLeftSideSuperset(
+      joinType: JoinType,
+      left: LogicalPlan,
+      filterCreationSideExp: Expression): Boolean = joinType match {
+    case Inner | LeftSemi | LeftAnti | LeftOuter | RightOuter =>
+      left.output.exists(_.semanticEquals(filterCreationSideExp))
+    case _ => false
+  }
+
+  private def isRightSideSuperset(
+      joinType: JoinType,
+      right: LogicalPlan,
+      filterCreationSideExp: Expression): Boolean = joinType match {
+    case Inner | LeftOuter | RightOuter =>

Review Comment:
   ```suggestion
       case Inner | RightOuter =>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #39170: [SPARK-41674][SQL] Runtime filter should supports multi level shuffle join side as filter creation side

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #39170:
URL: https://github.com/apache/spark/pull/39170#discussion_r1164949236


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -114,44 +114,66 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
   }
 
   /**
-   * Returns whether the plan is a simple filter over scan and the filter is likely selective
-   * Also check if the plan only has simple expressions (attribute reference, literals) so that we
-   * do not add a subquery that might have an expensive computation
+   * Extracts a sub-plan which is a simple filter over scan from the input plan. The simple
+   * filter should be selective and the filter condition (including expressions in the child
+   * plan referenced by the filter condition) should be a simple expression, so that we do
+   * not add a subquery that might have an expensive computation.
    */
-  private def isSelectiveFilterOverScan(plan: LogicalPlan): Boolean = {
+  private def extractSelectiveFilterOverScan(
+      plan: LogicalPlan,
+      filterCreationSideExp: Expression): Option[LogicalPlan] = {
+    var currentPlan: LogicalPlan = plan

Review Comment:
   let's pass it as a parameter of `def extract`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39170: [SPARK-41674][SQL] Runtime filter should supports multi level shuffle join side as filter creation side

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #39170:
URL: https://github.com/apache/spark/pull/39170#discussion_r1161614362


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -114,11 +114,13 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
   }
 
   /**
-   * Returns whether the plan is a simple filter over scan and the filter is likely selective
+   * Returns whether the plan exists a simple filter over scan and the filter is likely selective
    * Also check if the plan only has simple expressions (attribute reference, literals) so that we
    * do not add a subquery that might have an expensive computation

Review Comment:
   It's good to adds these comment to `tryInjectRuntimeFilter`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #39170: [SPARK-41674][SQL] Runtime filter should supports multi level shuffle join side as filter creation side

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #39170:
URL: https://github.com/apache/spark/pull/39170#discussion_r1162830210


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -159,6 +182,24 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
       REGEXP_EXTRACT_FAMILY, REGEXP_REPLACE)
   }
 
+  private def isLeftSideSuperset(
+      joinType: JoinType,
+      left: LogicalPlan,
+      filterCreationSideExp: Expression): Boolean = joinType match {
+    case Inner | LeftSemi | LeftAnti | LeftOuter | RightOuter =>
+      left.output.exists(_.semanticEquals(filterCreationSideExp))
+    case _ => false
+  }
+
+  private def isRightSideSuperset(
+      joinType: JoinType,
+      right: LogicalPlan,
+      filterCreationSideExp: Expression): Boolean = joinType match {
+    case Inner | LeftOuter | RightOuter =>

Review Comment:
   ```suggestion
       case Inner | RightOuter =>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on pull request #39170: [SPARK-41674][SQL] Runtime filter should supports multi level shuffle join side as filter creation side

Posted by GitBox <gi...@apache.org>.
beliefer commented on PR #39170:
URL: https://github.com/apache/spark/pull/39170#issuecomment-1371955351

   > Can you show the query plan before and after your PR?
   
   OK.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39170: [SPARK-41674][SQL] Runtime filter should supports multi level shuffle join side as filter creation side

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #39170:
URL: https://github.com/apache/spark/pull/39170#discussion_r1116776255


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -146,6 +148,15 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
           predicateReference ++ condition.references,
           hasHitFilter = true,
           hasHitSelectiveFilter = hasHitSelectiveFilter || isLikelySelective(condition))
+      case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, _, _, left, right, _)

Review Comment:
   The selective is not only exists in brother node, it also exists in left or right of child join node.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on pull request #39170: [SPARK-41674][SQL] Runtime filter should supports multi level shuffle join side as filter creation side

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on PR #39170:
URL: https://github.com/apache/spark/pull/39170#issuecomment-1507011534

   @cloud-fan Thank you for you hard work and review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #39170: [SPARK-41674][SQL] Runtime filter should supports multi level shuffle join side as filter creation side

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #39170:
URL: https://github.com/apache/spark/pull/39170#discussion_r1163910679


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -114,51 +115,89 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
   }
 
   /**
-   * Returns whether the plan is a simple filter over scan and the filter is likely selective
+   * Extracts the plan exists a simple filter over scan and the filter is likely selective

Review Comment:
   ```
   Extracts a sub-plan which is a simple filter over scan from the input plan. The simple
   filter should be selective and the filter condition (including expressions in the child
   plan referenced by the filter condition) should be a simple expression, so that we do
   not add a subquery that might have an expensive computation.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #39170: [SPARK-41674][SQL] Runtime filter should supports multi level shuffle join side as filter creation side

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #39170:
URL: https://github.com/apache/spark/pull/39170#discussion_r1164131408


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -114,44 +114,63 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
   }
 
   /**
-   * Returns whether the plan is a simple filter over scan and the filter is likely selective
-   * Also check if the plan only has simple expressions (attribute reference, literals) so that we
-   * do not add a subquery that might have an expensive computation
+   * Extracts a sub-plan which is a simple filter over scan from the input plan. The simple
+   * filter should be selective and the filter condition (including expressions in the child
+   * plan referenced by the filter condition) should be a simple expression, so that we do
+   * not add a subquery that might have an expensive computation.
    */
-  private def isSelectiveFilterOverScan(plan: LogicalPlan): Boolean = {
+  private def extractSelectiveFilterOverScan(
+      plan: LogicalPlan,
+      filterCreationSideExp: Expression): Option[LogicalPlan] = {
     @tailrec
-    def isSelective(
+    def extract(
         p: LogicalPlan,
         predicateReference: AttributeSet,
         hasHitFilter: Boolean,
-        hasHitSelectiveFilter: Boolean): Boolean = p match {
-      case Project(projectList, child) =>
-        if (hasHitFilter) {
-          // We need to make sure all expressions referenced by filter predicates are simple
-          // expressions.
-          val referencedExprs = projectList.filter(predicateReference.contains)
-          referencedExprs.forall(isSimpleExpression) &&
-            isSelective(
-              child,
-              referencedExprs.map(_.references).foldLeft(AttributeSet.empty)(_ ++ _),
-              hasHitFilter,
-              hasHitSelectiveFilter)
+        hasHitSelectiveFilter: Boolean): Option[LogicalPlan] = p match {
+      case Project(projectList, child) if hasHitFilter =>
+        // We need to make sure all expressions referenced by filter predicates are simple
+        // expressions.
+        val referencedExprs = projectList.filter(predicateReference.contains)
+        if (referencedExprs.forall(isSimpleExpression)) {
+          extract(
+            child,
+            referencedExprs.map(_.references).foldLeft(AttributeSet.empty)(_ ++ _),
+            hasHitFilter,
+            hasHitSelectiveFilter)
         } else {
-          assert(predicateReference.isEmpty && !hasHitSelectiveFilter)
-          isSelective(child, predicateReference, hasHitFilter, hasHitSelectiveFilter)
+          None
         }
-      case Filter(condition, child) =>
-        isSimpleExpression(condition) && isSelective(
+      case Project(_, child) =>
+        assert(predicateReference.isEmpty && !hasHitSelectiveFilter)
+        extract(child, predicateReference, hasHitFilter, hasHitSelectiveFilter)
+      case Filter(condition, child) if isSimpleExpression(condition) =>
+        extract(
           child,
           predicateReference ++ condition.references,
           hasHitFilter = true,
           hasHitSelectiveFilter = hasHitSelectiveFilter || isLikelySelective(condition))
-      case _: LeafNode => hasHitSelectiveFilter
-      case _ => false
+      case ExtractEquiJoinKeys(joinType, _, _, _, _, left, right, hint) =>
+        // Runtime filters use one side of the [[Join]] to build a set of join key values and prune
+        // the other side of the [[Join]]. It's also OK to use a superset of the join key values
+        // (ignore null values) to do the pruning.
+        if (left.output.exists(_.semanticEquals(filterCreationSideExp))) {
+          extractSelectiveFilterOverScan(left, filterCreationSideExp)

Review Comment:
   to make `def extract` still a tail recursion, we should call `extract(left, AttributeSet.empty, hasHitFilter = false, hasHitSelectiveFilter = false)` here



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -114,44 +114,63 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
   }
 
   /**
-   * Returns whether the plan is a simple filter over scan and the filter is likely selective
-   * Also check if the plan only has simple expressions (attribute reference, literals) so that we
-   * do not add a subquery that might have an expensive computation
+   * Extracts a sub-plan which is a simple filter over scan from the input plan. The simple
+   * filter should be selective and the filter condition (including expressions in the child
+   * plan referenced by the filter condition) should be a simple expression, so that we do
+   * not add a subquery that might have an expensive computation.
    */
-  private def isSelectiveFilterOverScan(plan: LogicalPlan): Boolean = {
+  private def extractSelectiveFilterOverScan(
+      plan: LogicalPlan,
+      filterCreationSideExp: Expression): Option[LogicalPlan] = {
     @tailrec
-    def isSelective(
+    def extract(
         p: LogicalPlan,
         predicateReference: AttributeSet,
         hasHitFilter: Boolean,
-        hasHitSelectiveFilter: Boolean): Boolean = p match {
-      case Project(projectList, child) =>
-        if (hasHitFilter) {
-          // We need to make sure all expressions referenced by filter predicates are simple
-          // expressions.
-          val referencedExprs = projectList.filter(predicateReference.contains)
-          referencedExprs.forall(isSimpleExpression) &&
-            isSelective(
-              child,
-              referencedExprs.map(_.references).foldLeft(AttributeSet.empty)(_ ++ _),
-              hasHitFilter,
-              hasHitSelectiveFilter)
+        hasHitSelectiveFilter: Boolean): Option[LogicalPlan] = p match {
+      case Project(projectList, child) if hasHitFilter =>
+        // We need to make sure all expressions referenced by filter predicates are simple
+        // expressions.
+        val referencedExprs = projectList.filter(predicateReference.contains)
+        if (referencedExprs.forall(isSimpleExpression)) {
+          extract(
+            child,
+            referencedExprs.map(_.references).foldLeft(AttributeSet.empty)(_ ++ _),
+            hasHitFilter,
+            hasHitSelectiveFilter)
         } else {
-          assert(predicateReference.isEmpty && !hasHitSelectiveFilter)
-          isSelective(child, predicateReference, hasHitFilter, hasHitSelectiveFilter)
+          None
         }
-      case Filter(condition, child) =>
-        isSimpleExpression(condition) && isSelective(
+      case Project(_, child) =>
+        assert(predicateReference.isEmpty && !hasHitSelectiveFilter)
+        extract(child, predicateReference, hasHitFilter, hasHitSelectiveFilter)
+      case Filter(condition, child) if isSimpleExpression(condition) =>
+        extract(
           child,
           predicateReference ++ condition.references,
           hasHitFilter = true,
           hasHitSelectiveFilter = hasHitSelectiveFilter || isLikelySelective(condition))
-      case _: LeafNode => hasHitSelectiveFilter
-      case _ => false
+      case ExtractEquiJoinKeys(joinType, _, _, _, _, left, right, hint) =>
+        // Runtime filters use one side of the [[Join]] to build a set of join key values and prune
+        // the other side of the [[Join]]. It's also OK to use a superset of the join key values
+        // (ignore null values) to do the pruning.
+        if (left.output.exists(_.semanticEquals(filterCreationSideExp))) {
+          extractSelectiveFilterOverScan(left, filterCreationSideExp)
+        } else if (right.output.exists(_.semanticEquals(filterCreationSideExp))) {
+          extractSelectiveFilterOverScan(right, filterCreationSideExp)

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #39170: [SPARK-41674][SQL] Runtime filter should supports multi level shuffle join side as filter creation side

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #39170:
URL: https://github.com/apache/spark/pull/39170#discussion_r1162841632


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -146,7 +151,25 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
           predicateReference ++ condition.references,
           hasHitFilter = true,
           hasHitSelectiveFilter = hasHitSelectiveFilter || isLikelySelective(condition))
-      case _: LeafNode => hasHitSelectiveFilter
+      case ExtractEquiJoinKeys(joinType, _, _, _, _, left, right, hint) =>
+        // Runtime filters use one side of the [[Join]] to build a set of join key values and prune
+        // the other side of the [[Join]]. It's also OK to use a superset of the join key values to
+        // do the pruning. For inner [[Join]]s, one side of the [[Join]] always produces a superset

Review Comment:
   We can remove `For inner [[Join]]s ...` as the actual code allows more join types.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #39170: [SPARK-41674][SQL] Runtime filter should supports multi level shuffle join side as filter creation side

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #39170:
URL: https://github.com/apache/spark/pull/39170#discussion_r1161512196


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -114,11 +114,13 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
   }
 
   /**
-   * Returns whether the plan is a simple filter over scan and the filter is likely selective
+   * Returns whether the plan exists a simple filter over scan and the filter is likely selective
    * Also check if the plan only has simple expressions (attribute reference, literals) so that we
    * do not add a subquery that might have an expensive computation

Review Comment:
   Let's add some more comments to introduce the theory: Runtime filters use one side of the join to build a set of join key values and prune the other side of the join. It's also OK to use a superset of the join key values to do the pruning. For inner joins, one side of the join always produces a superset of the join key values.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #39170: [SPARK-41674][SQL] Runtime filter should supports multi level shuffle join side as filter creation side

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #39170:
URL: https://github.com/apache/spark/pull/39170#issuecomment-1507007990

   thanks, merging to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #39170: [SPARK-41674][SQL] Runtime filter should supports multi level shuffle join side as filter creation side

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #39170:
URL: https://github.com/apache/spark/pull/39170#discussion_r1163913638


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -114,51 +115,89 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
   }
 
   /**
-   * Returns whether the plan is a simple filter over scan and the filter is likely selective
+   * Extracts the plan exists a simple filter over scan and the filter is likely selective
    * Also check if the plan only has simple expressions (attribute reference, literals) so that we
    * do not add a subquery that might have an expensive computation
    */
-  private def isSelectiveFilterOverScan(plan: LogicalPlan): Boolean = {
+  private def extractSelectiveFilterOverScan(
+      plan: LogicalPlan,
+      filterCreationSideExp: Expression): Option[LogicalPlan] = {
     @tailrec
     def isSelective(
         p: LogicalPlan,
         predicateReference: AttributeSet,
         hasHitFilter: Boolean,
-        hasHitSelectiveFilter: Boolean): Boolean = p match {
-      case Project(projectList, child) =>
-        if (hasHitFilter) {
-          // We need to make sure all expressions referenced by filter predicates are simple
-          // expressions.
-          val referencedExprs = projectList.filter(predicateReference.contains)
-          referencedExprs.forall(isSimpleExpression) &&
-            isSelective(
-              child,
-              referencedExprs.map(_.references).foldLeft(AttributeSet.empty)(_ ++ _),
-              hasHitFilter,
-              hasHitSelectiveFilter)
+        hasHitSelectiveFilter: Boolean): Option[LogicalPlan] = p match {
+      case Project(projectList, child) if hasHitFilter =>
+        // We need to make sure all expressions referenced by filter predicates are simple
+        // expressions.
+        val referencedExprs = projectList.filter(predicateReference.contains)
+        if (referencedExprs.forall(isSimpleExpression)) {
+          isSelective(
+            child,
+            referencedExprs.map(_.references).foldLeft(AttributeSet.empty)(_ ++ _),
+            hasHitFilter,
+            hasHitSelectiveFilter)
         } else {
-          assert(predicateReference.isEmpty && !hasHitSelectiveFilter)
-          isSelective(child, predicateReference, hasHitFilter, hasHitSelectiveFilter)
+          None
         }
-      case Filter(condition, child) =>
-        isSimpleExpression(condition) && isSelective(
+      case Project(_, child) =>
+        assert(predicateReference.isEmpty && !hasHitSelectiveFilter)
+        isSelective(child, predicateReference, hasHitFilter, hasHitSelectiveFilter)
+      case Filter(condition, child) if isSimpleExpression(condition) =>
+        isSelective(
           child,
           predicateReference ++ condition.references,
           hasHitFilter = true,
           hasHitSelectiveFilter = hasHitSelectiveFilter || isLikelySelective(condition))
-      case _: LeafNode => hasHitSelectiveFilter
-      case _ => false
+      case ExtractEquiJoinKeys(joinType, _, _, _, _, left, right, hint) =>
+        // Runtime filters use one side of the [[Join]] to build a set of join key values and prune
+        // the other side of the [[Join]]. It's also OK to use a superset of the join key values to
+        // do the pruning.
+        if (isLeftSideSuperset(joinType, left, filterCreationSideExp) &&

Review Comment:
   we can simplify the condition here. Join children always output a superset of join output (if null is ignored), except for the right child of left joins. However, for left joins, the right side plan output is not included in the join output, so `left/right.output.exists(_.semanticEquals(filterCreationSideExp))` should just work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on pull request #39170: [SPARK-41674][SQL] Runtime filter should supports multi level shuffle join side as filter creation side

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on PR #39170:
URL: https://github.com/apache/spark/pull/39170#issuecomment-1506984108

   The failure GA is unrelated to this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on pull request #39170: [SPARK-41674][SQL] Runtime filter should supports multi level shuffle join side as filter creation side

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on PR #39170:
URL: https://github.com/apache/spark/pull/39170#issuecomment-1499898495

   ping @cloud-fan Could you have time to take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on pull request #39170: [SPARK-41674][SQL] Runtime filter should supports multi level shuffle join side as filter creation side

Posted by GitBox <gi...@apache.org>.
beliefer commented on PR #39170:
URL: https://github.com/apache/spark/pull/39170#issuecomment-1369550493

   ping @somani @cloud-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #39170: [SPARK-41674][SQL] Runtime filter should supports multi level shuffle join side as filter creation side

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #39170:
URL: https://github.com/apache/spark/pull/39170#discussion_r1116492541


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -146,6 +148,15 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
           predicateReference ++ condition.references,
           hasHitFilter = true,
           hasHitSelectiveFilter = hasHitSelectiveFilter || isLikelySelective(condition))
+      case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, _, _, left, right, _)

Review Comment:
   The PR description is very clear about what this PR is doing, but the "how" part is missing. I don't quite understand the changes here. Does it mean a join query can be the build query of a runtime filter?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #39170: [SPARK-41674][SQL] Runtime filter should supports multi level shuffle join side as filter creation side

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #39170:
URL: https://github.com/apache/spark/pull/39170#discussion_r1161512363


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -146,6 +148,15 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
           predicateReference ++ condition.references,
           hasHitFilter = true,
           hasHitSelectiveFilter = hasHitSelectiveFilter || isLikelySelective(condition))
+      case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, _, _, left, right, _)
+        if canPruneLeft(joinType) =>

Review Comment:
   See https://github.com/apache/spark/pull/39170/files#r1161512196 , I think the join type can only be inner.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #39170: [SPARK-41674][SQL] Runtime filter should supports multi level shuffle join side as filter creation side

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #39170:
URL: https://github.com/apache/spark/pull/39170#discussion_r1162849238


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -146,7 +151,25 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
           predicateReference ++ condition.references,
           hasHitFilter = true,
           hasHitSelectiveFilter = hasHitSelectiveFilter || isLikelySelective(condition))
-      case _: LeafNode => hasHitSelectiveFilter
+      case ExtractEquiJoinKeys(joinType, _, _, _, _, left, right, hint) =>
+        // Runtime filters use one side of the [[Join]] to build a set of join key values and prune
+        // the other side of the [[Join]]. It's also OK to use a superset of the join key values to
+        // do the pruning. For inner [[Join]]s, one side of the [[Join]] always produces a superset
+        // of the join key values.
+        if (isLeftSideSuperset(joinType, left, filterCreationSideExp)) {
+          !hintToBroadcastLeft(hint) && !canBroadcastBySize(left, conf) &&
+            existsSelectiveFilterOverScan(left, filterCreationSideExp, filterCreationSidePlans)
+        } else if (isRightSideSuperset(joinType, right, filterCreationSideExp)) {
+          !hintToBroadcastRight(hint) && !canBroadcastBySize(right, conf) &&
+            existsSelectiveFilterOverScan(right, filterCreationSideExp, filterCreationSidePlans)
+        } else {
+          false
+        }
+      case _: LeafNode =>
+        if (hasHitSelectiveFilter) {

Review Comment:
   This can avoid duplicating the code to extract the build plan later, but it's indeed a bit hacky. 
   
   How about we rename `filteringHasBenefit` to `def extractBeneficialFilterCreatePlan(...): Option[LogicalPlan]`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #39170: [SPARK-41674][SQL] Runtime filter should supports multi level shuffle join side as filter creation side

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #39170:
URL: https://github.com/apache/spark/pull/39170#discussion_r1162843596


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -146,7 +151,25 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
           predicateReference ++ condition.references,
           hasHitFilter = true,
           hasHitSelectiveFilter = hasHitSelectiveFilter || isLikelySelective(condition))
-      case _: LeafNode => hasHitSelectiveFilter
+      case ExtractEquiJoinKeys(joinType, _, _, _, _, left, right, hint) =>
+        // Runtime filters use one side of the [[Join]] to build a set of join key values and prune
+        // the other side of the [[Join]]. It's also OK to use a superset of the join key values to
+        // do the pruning. For inner [[Join]]s, one side of the [[Join]] always produces a superset
+        // of the join key values.
+        if (isLeftSideSuperset(joinType, left, filterCreationSideExp)) {
+          !hintToBroadcastLeft(hint) && !canBroadcastBySize(left, conf) &&

Review Comment:
   why do we check if it's broadcastable? We have a very conservative definition of `a selective filter over scan` which is sufficient to decide building runtime filter or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #39170: [SPARK-41674][SQL] Runtime filter should supports multi level shuffle join side as filter creation side

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #39170:
URL: https://github.com/apache/spark/pull/39170#discussion_r1162843596


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -146,7 +151,25 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
           predicateReference ++ condition.references,
           hasHitFilter = true,
           hasHitSelectiveFilter = hasHitSelectiveFilter || isLikelySelective(condition))
-      case _: LeafNode => hasHitSelectiveFilter
+      case ExtractEquiJoinKeys(joinType, _, _, _, _, left, right, hint) =>
+        // Runtime filters use one side of the [[Join]] to build a set of join key values and prune
+        // the other side of the [[Join]]. It's also OK to use a superset of the join key values to
+        // do the pruning. For inner [[Join]]s, one side of the [[Join]] always produces a superset
+        // of the join key values.
+        if (isLeftSideSuperset(joinType, left, filterCreationSideExp)) {
+          !hintToBroadcastLeft(hint) && !canBroadcastBySize(left, conf) &&

Review Comment:
   why do we check if it's broadcastable? We have a very conservative definition of `a selective filter over scan` and which is sufficient.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39170: [SPARK-41674][SQL] Runtime filter should supports multi level shuffle join side as filter creation side

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #39170:
URL: https://github.com/apache/spark/pull/39170#discussion_r1163583014


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -146,7 +151,25 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
           predicateReference ++ condition.references,
           hasHitFilter = true,
           hasHitSelectiveFilter = hasHitSelectiveFilter || isLikelySelective(condition))
-      case _: LeafNode => hasHitSelectiveFilter
+      case ExtractEquiJoinKeys(joinType, _, _, _, _, left, right, hint) =>
+        // Runtime filters use one side of the [[Join]] to build a set of join key values and prune
+        // the other side of the [[Join]]. It's also OK to use a superset of the join key values to
+        // do the pruning. For inner [[Join]]s, one side of the [[Join]] always produces a superset
+        // of the join key values.
+        if (isLeftSideSuperset(joinType, left, filterCreationSideExp)) {
+          !hintToBroadcastLeft(hint) && !canBroadcastBySize(left, conf) &&

Review Comment:
   I want to remain conservative here. Because I found that the four use cases in TPC-DS will create a runtime filter as a result, but there is no significant performance improvement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #39170: [SPARK-41674][SQL] Runtime filter should supports multi level shuffle join side as filter creation side

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #39170:
URL: https://github.com/apache/spark/pull/39170#discussion_r1162828030


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -159,6 +182,24 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
       REGEXP_EXTRACT_FAMILY, REGEXP_REPLACE)
   }
 
+  private def isLeftSideSuperset(
+      joinType: JoinType,
+      left: LogicalPlan,
+      filterCreationSideExp: Expression): Boolean = joinType match {
+    case Inner | LeftSemi | LeftAnti | LeftOuter | RightOuter =>

Review Comment:
   ```suggestion
       case Inner | LeftSemi | LeftAnti | LeftOuter =>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39170: [SPARK-41674][SQL] Runtime filter should supports multi level shuffle join side as filter creation side

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #39170:
URL: https://github.com/apache/spark/pull/39170#discussion_r1163614899


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -146,7 +151,25 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
           predicateReference ++ condition.references,
           hasHitFilter = true,
           hasHitSelectiveFilter = hasHitSelectiveFilter || isLikelySelective(condition))
-      case _: LeafNode => hasHitSelectiveFilter
+      case ExtractEquiJoinKeys(joinType, _, _, _, _, left, right, hint) =>
+        // Runtime filters use one side of the [[Join]] to build a set of join key values and prune
+        // the other side of the [[Join]]. It's also OK to use a superset of the join key values to
+        // do the pruning. For inner [[Join]]s, one side of the [[Join]] always produces a superset
+        // of the join key values.
+        if (isLeftSideSuperset(joinType, left, filterCreationSideExp)) {
+          !hintToBroadcastLeft(hint) && !canBroadcastBySize(left, conf) &&

Review Comment:
   Because I found that the four use cases in TPC-DS will create a runtime filter as a result, but let me take a test for performance improvement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39170: [SPARK-41674][SQL] Runtime filter should supports multi level shuffle join side as filter creation side

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #39170:
URL: https://github.com/apache/spark/pull/39170#discussion_r1163583014


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -146,7 +151,25 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
           predicateReference ++ condition.references,
           hasHitFilter = true,
           hasHitSelectiveFilter = hasHitSelectiveFilter || isLikelySelective(condition))
-      case _: LeafNode => hasHitSelectiveFilter
+      case ExtractEquiJoinKeys(joinType, _, _, _, _, left, right, hint) =>
+        // Runtime filters use one side of the [[Join]] to build a set of join key values and prune
+        // the other side of the [[Join]]. It's also OK to use a superset of the join key values to
+        // do the pruning. For inner [[Join]]s, one side of the [[Join]] always produces a superset
+        // of the join key values.
+        if (isLeftSideSuperset(joinType, left, filterCreationSideExp)) {
+          !hintToBroadcastLeft(hint) && !canBroadcastBySize(left, conf) &&

Review Comment:
   I want to remain conservative here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #39170: [SPARK-41674][SQL] Runtime filter should supports multi level shuffle join side as filter creation side

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #39170:
URL: https://github.com/apache/spark/pull/39170#discussion_r1161489288


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -146,6 +148,15 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
           predicateReference ++ condition.references,
           hasHitFilter = true,
           hasHitSelectiveFilter = hasHitSelectiveFilter || isLikelySelective(condition))
+      case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, _, _, left, right, _)
+        if canPruneLeft(joinType) =>
+        if (leftKeys.contains(filterCreationSideExp)) {

Review Comment:
   nit: we shouldn't rely on `Expression.equals`, we should call `semanticEquals`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39170: [SPARK-41674][SQL] Runtime filter should supports multi level shuffle join side as filter creation side

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #39170:
URL: https://github.com/apache/spark/pull/39170#discussion_r1163583856


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -146,7 +151,25 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
           predicateReference ++ condition.references,
           hasHitFilter = true,
           hasHitSelectiveFilter = hasHitSelectiveFilter || isLikelySelective(condition))
-      case _: LeafNode => hasHitSelectiveFilter
+      case ExtractEquiJoinKeys(joinType, _, _, _, _, left, right, hint) =>
+        // Runtime filters use one side of the [[Join]] to build a set of join key values and prune
+        // the other side of the [[Join]]. It's also OK to use a superset of the join key values to
+        // do the pruning. For inner [[Join]]s, one side of the [[Join]] always produces a superset
+        // of the join key values.
+        if (isLeftSideSuperset(joinType, left, filterCreationSideExp)) {
+          !hintToBroadcastLeft(hint) && !canBroadcastBySize(left, conf) &&
+            existsSelectiveFilterOverScan(left, filterCreationSideExp, filterCreationSidePlans)
+        } else if (isRightSideSuperset(joinType, right, filterCreationSideExp)) {
+          !hintToBroadcastRight(hint) && !canBroadcastBySize(right, conf) &&
+            existsSelectiveFilterOverScan(right, filterCreationSideExp, filterCreationSidePlans)
+        } else {
+          false
+        }
+      case _: LeafNode =>
+        if (hasHitSelectiveFilter) {

Review Comment:
   Good idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #39170: [SPARK-41674][SQL] Runtime filter should supports multi level shuffle join side as filter creation side

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on PR #39170:
URL: https://github.com/apache/spark/pull/39170#issuecomment-1371031081

   ```
   SELECT *
   FROM bf1
       JOIN bf2
       JOIN bf3
       ON bf1.c1 = bf2.c2
           AND bf3.c3 = bf2.c2
   WHERE bf2.a2 = 5
   ```
   
   Can you show the query plan before and after your PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan closed pull request #39170: [SPARK-41674][SQL] Runtime filter should supports multi level shuffle join side as filter creation side

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan closed pull request #39170: [SPARK-41674][SQL] Runtime filter should supports multi level shuffle join side as filter creation side
URL: https://github.com/apache/spark/pull/39170


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #39170: [SPARK-41674][SQL] Runtime filter should supports multi level shuffle join side as filter creation side

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #39170:
URL: https://github.com/apache/spark/pull/39170#discussion_r1161489559


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -159,6 +170,21 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
       REGEXP_EXTRACT_FAMILY, REGEXP_REPLACE)
   }
 
+  private def confirmFilterCreationSidePlan(

Review Comment:
   what does `confirm` mean here?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -159,6 +170,21 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
       REGEXP_EXTRACT_FAMILY, REGEXP_REPLACE)
   }
 
+  private def confirmFilterCreationSidePlan(

Review Comment:
   is it `extract`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #39170: [SPARK-41674][SQL] Runtime filter should supports multi level shuffle join side as filter creation side

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #39170:
URL: https://github.com/apache/spark/pull/39170#discussion_r1163916095


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -114,51 +115,89 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
   }
 
   /**
-   * Returns whether the plan is a simple filter over scan and the filter is likely selective
+   * Extracts the plan exists a simple filter over scan and the filter is likely selective
    * Also check if the plan only has simple expressions (attribute reference, literals) so that we
    * do not add a subquery that might have an expensive computation
    */
-  private def isSelectiveFilterOverScan(plan: LogicalPlan): Boolean = {
+  private def extractSelectiveFilterOverScan(
+      plan: LogicalPlan,
+      filterCreationSideExp: Expression): Option[LogicalPlan] = {
     @tailrec
     def isSelective(

Review Comment:
   we should rename this function now.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -114,51 +115,89 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
   }
 
   /**
-   * Returns whether the plan is a simple filter over scan and the filter is likely selective
+   * Extracts the plan exists a simple filter over scan and the filter is likely selective
    * Also check if the plan only has simple expressions (attribute reference, literals) so that we
    * do not add a subquery that might have an expensive computation
    */
-  private def isSelectiveFilterOverScan(plan: LogicalPlan): Boolean = {
+  private def extractSelectiveFilterOverScan(
+      plan: LogicalPlan,
+      filterCreationSideExp: Expression): Option[LogicalPlan] = {
     @tailrec
     def isSelective(

Review Comment:
   probably just `def extract`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #39170: [SPARK-41674][SQL] Runtime filter should supports multi level shuffle join side as filter creation side

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #39170:
URL: https://github.com/apache/spark/pull/39170#discussion_r1163911148


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -114,51 +115,89 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
   }
 
   /**
-   * Returns whether the plan is a simple filter over scan and the filter is likely selective
+   * Extracts the plan exists a simple filter over scan and the filter is likely selective
    * Also check if the plan only has simple expressions (attribute reference, literals) so that we
    * do not add a subquery that might have an expensive computation
    */
-  private def isSelectiveFilterOverScan(plan: LogicalPlan): Boolean = {
+  private def extractSelectiveFilterOverScan(
+      plan: LogicalPlan,
+      filterCreationSideExp: Expression): Option[LogicalPlan] = {
     @tailrec
     def isSelective(
         p: LogicalPlan,
         predicateReference: AttributeSet,
         hasHitFilter: Boolean,
-        hasHitSelectiveFilter: Boolean): Boolean = p match {
-      case Project(projectList, child) =>
-        if (hasHitFilter) {
-          // We need to make sure all expressions referenced by filter predicates are simple
-          // expressions.
-          val referencedExprs = projectList.filter(predicateReference.contains)
-          referencedExprs.forall(isSimpleExpression) &&
-            isSelective(
-              child,
-              referencedExprs.map(_.references).foldLeft(AttributeSet.empty)(_ ++ _),
-              hasHitFilter,
-              hasHitSelectiveFilter)
+        hasHitSelectiveFilter: Boolean): Option[LogicalPlan] = p match {
+      case Project(projectList, child) if hasHitFilter =>
+        // We need to make sure all expressions referenced by filter predicates are simple
+        // expressions.
+        val referencedExprs = projectList.filter(predicateReference.contains)
+        if (referencedExprs.forall(isSimpleExpression)) {
+          isSelective(
+            child,
+            referencedExprs.map(_.references).foldLeft(AttributeSet.empty)(_ ++ _),
+            hasHitFilter,
+            hasHitSelectiveFilter)
         } else {
-          assert(predicateReference.isEmpty && !hasHitSelectiveFilter)
-          isSelective(child, predicateReference, hasHitFilter, hasHitSelectiveFilter)
+          None
         }
-      case Filter(condition, child) =>
-        isSimpleExpression(condition) && isSelective(
+      case Project(_, child) =>
+        assert(predicateReference.isEmpty && !hasHitSelectiveFilter)
+        isSelective(child, predicateReference, hasHitFilter, hasHitSelectiveFilter)
+      case Filter(condition, child) if isSimpleExpression(condition) =>
+        isSelective(
           child,
           predicateReference ++ condition.references,
           hasHitFilter = true,
           hasHitSelectiveFilter = hasHitSelectiveFilter || isLikelySelective(condition))
-      case _: LeafNode => hasHitSelectiveFilter
-      case _ => false
+      case ExtractEquiJoinKeys(joinType, _, _, _, _, left, right, hint) =>
+        // Runtime filters use one side of the [[Join]] to build a set of join key values and prune
+        // the other side of the [[Join]]. It's also OK to use a superset of the join key values to

Review Comment:
   ```suggestion
           // the other side of the [[Join]]. It's also OK to use a superset of the join key values (ignore null values) to
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39170: [SPARK-41674][SQL] Runtime filter should supports multi level shuffle join side as filter creation side

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #39170:
URL: https://github.com/apache/spark/pull/39170#discussion_r1163913361


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -146,7 +151,25 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
           predicateReference ++ condition.references,
           hasHitFilter = true,
           hasHitSelectiveFilter = hasHitSelectiveFilter || isLikelySelective(condition))
-      case _: LeafNode => hasHitSelectiveFilter
+      case ExtractEquiJoinKeys(joinType, _, _, _, _, left, right, hint) =>
+        // Runtime filters use one side of the [[Join]] to build a set of join key values and prune
+        // the other side of the [[Join]]. It's also OK to use a superset of the join key values to
+        // do the pruning. For inner [[Join]]s, one side of the [[Join]] always produces a superset
+        // of the join key values.
+        if (isLeftSideSuperset(joinType, left, filterCreationSideExp)) {
+          !hintToBroadcastLeft(hint) && !canBroadcastBySize(left, conf) &&

Review Comment:
   I tested, remove these check is good for performance.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org