You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "beliefer (via GitHub)" <gi...@apache.org> on 2023/08/03 08:48:58 UTC

[GitHub] [spark] beliefer opened a new pull request, #42317: [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions

beliefer opened a new pull request, #42317:
URL: https://github.com/apache/spark/pull/42317

   ### What changes were proposed in this pull request?
   Currently, Spark runtime filter supports multi level shuffle join side as filter creation side. Please see: https://github.com/apache/spark/pull/39170. Although this feature adds the adaptive scene and improves the performance, there are still need to support other case.
   Let me show the SQL below.
   ```
   SELECT *
   FROM (
     SELECT *
     FROM tab1
       JOIN tab2 ON tab1.c1 = tab2.c2
     WHERE bf2.a2 = 5
   ) AS a
     JOIN tab3 ON tab3.c3 = a.c1
   ```
   For the current implementation, Spark only inject runtime filter into tab1 with bloom filter based on `bf2.a2 = 5`.
   Because there is no the join between tab3 and tab2, so Spark can't inject runtime filter into tab3 with the same bloom filter.
   But the above SQL have the join condition `tab3.c3 = a.c1(tab1.c1)` between tab3 and tab2, and also have the join condition `tab1.c1 = tab2.c2`. We can rely on the transitivity of the join condition to get the virtual join condition `tab3.c3 = tab2.c2`, then we can inject the bloom filter based on `bf2.a2 = 5` into tab3.
   
   ### Why are the changes needed?
   Enhance the Spark runtime filter and improve performance.
   
   
   ### Does this PR introduce _any_ user-facing change?
   'No'.
   Just update the inner implementation.
   
   
   ### How was this patch tested?
   New tests.
   Micro benchmark for q75 in TPC-DS.
   **2TB TPC-DS**
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #42317:
URL: https://github.com/apache/spark/pull/42317#discussion_r1356556465


##########
sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala:
##########
@@ -390,34 +390,60 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp
   test("Runtime bloom filter join: two joins") {
     withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000",
       SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") {
+      // bf2 as creation side and inject runtime filter for bf1 and bf3.
       assertRewroteWithBloomFilter("select * from bf1 join bf2 join bf3 on bf1.c1 = bf2.c2 " +
         "and bf3.c3 = bf2.c2 where bf2.a2 = 5", 2)
-      assertRewroteWithBloomFilter("select * from (select * from bf1 left semi join bf2 on " +
-        "bf1.c1 = bf2.c2 where bf1.a1 = 5) as a join bf3 on bf3.c3 = a.c1")
-      assertRewroteWithBloomFilter("select * from (select * from bf1 left anti join bf2 on " +
-        "bf1.c1 = bf2.c2 where bf1.a1 = 5) as a join bf3 on bf3.c3 = a.c1")
       assertRewroteWithBloomFilter("select * from bf1 left outer join bf2 join bf3 on " +
         "bf1.c1 = bf2.c2 and bf3.c3 = bf2.c2 where bf2.a2 = 5", 2)
       assertRewroteWithBloomFilter("select * from bf1 right outer join bf2 join bf3 on " +
         "bf1.c1 = bf2.c2 and bf3.c3 = bf2.c2 where bf2.a2 = 5", 2)
+      // bf1 and bf2 hasn't shuffle. bf1 as creation side and inject runtime filter for bf3.
+      assertRewroteWithBloomFilter("select * from (select * from bf1 left semi join bf2 on " +

Review Comment:
   seems we just move around some tests. Is it related to this PR or we just want to add comments to clarify the tests?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #42317:
URL: https://github.com/apache/spark/pull/42317#discussion_r1356526351


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -29,7 +29,11 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
 /**
- * Insert a filter on one side of the join if the other side has a selective predicate.
+ * Insert a runtime filter on one side of the join (we call this side the application side)
+ * if we can extract a plan (we call this plan the creation side) from the other side and

Review Comment:
   it's weird to call an extracted plan "side". A join has two sides, and we extract a plan from the creation side to build the runtime filter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #42317:
URL: https://github.com/apache/spark/pull/42317#discussion_r1354844877


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -121,14 +128,44 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
    */
   private def extractSelectiveFilterOverScan(
       plan: LogicalPlan,
-      filterCreationSideExp: Expression): Option[LogicalPlan] = {
-    @tailrec
+      filterCreationSideExp: Expression): Option[(Expression, LogicalPlan)] = {

Review Comment:
   let's update the doc for this method to explain the return type.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #42317:
URL: https://github.com/apache/spark/pull/42317#discussion_r1356628946


##########
sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala:
##########
@@ -390,34 +390,60 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp
   test("Runtime bloom filter join: two joins") {
     withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000",
       SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") {
+      // bf2 as creation side and inject runtime filter for bf1 and bf3.
       assertRewroteWithBloomFilter("select * from bf1 join bf2 join bf3 on bf1.c1 = bf2.c2 " +
         "and bf3.c3 = bf2.c2 where bf2.a2 = 5", 2)
-      assertRewroteWithBloomFilter("select * from (select * from bf1 left semi join bf2 on " +
-        "bf1.c1 = bf2.c2 where bf1.a1 = 5) as a join bf3 on bf3.c3 = a.c1")
-      assertRewroteWithBloomFilter("select * from (select * from bf1 left anti join bf2 on " +
-        "bf1.c1 = bf2.c2 where bf1.a1 = 5) as a join bf3 on bf3.c3 = a.c1")
       assertRewroteWithBloomFilter("select * from bf1 left outer join bf2 join bf3 on " +
         "bf1.c1 = bf2.c2 and bf3.c3 = bf2.c2 where bf2.a2 = 5", 2)
       assertRewroteWithBloomFilter("select * from bf1 right outer join bf2 join bf3 on " +
         "bf1.c1 = bf2.c2 and bf3.c3 = bf2.c2 where bf2.a2 = 5", 2)
+      // bf1 and bf2 hasn't shuffle. bf1 as creation side and inject runtime filter for bf3.
+      assertRewroteWithBloomFilter("select * from (select * from bf1 left semi join bf2 on " +

Review Comment:
   Here are two aspects:
   One of these test cases, I recently found out that I forgot why and inserted RF. Which specific point is being tested.
   On the other hand, I have grouped the same type together. Otherwise, it looks a bit messy.
   
   So, I organized it today and added which case to test. It also facilitates readability for developers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #42317:
URL: https://github.com/apache/spark/pull/42317#discussion_r1356546074


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -139,41 +160,68 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
             referencedExprs.map(_.references).foldLeft(AttributeSet.empty)(_ ++ _),
             hasHitFilter,
             hasHitSelectiveFilter,
-            currentPlan)
+            targetPlan,
+            targetCreationSideExpr)
         } else {
           None
         }
       case Project(_, child) =>
         assert(predicateReference.isEmpty && !hasHitSelectiveFilter)
-        extract(child, predicateReference, hasHitFilter, hasHitSelectiveFilter, currentPlan)
+        extract(child, predicateReference, hasHitFilter, hasHitSelectiveFilter, targetPlan,
+          targetCreationSideExpr)
       case Filter(condition, child) if isSimpleExpression(condition) =>
         extract(
           child,
           predicateReference ++ condition.references,
           hasHitFilter = true,
           hasHitSelectiveFilter = hasHitSelectiveFilter || isLikelySelective(condition),
-          currentPlan)
-      case ExtractEquiJoinKeys(_, _, _, _, _, left, right, _) =>
+          targetPlan,
+          targetCreationSideExpr)
+      case ExtractEquiJoinKeys(_, lkeys, rkeys, _, _, left, right, _) =>
         // Runtime filters use one side of the [[Join]] to build a set of join key values and prune
         // the other side of the [[Join]]. It's also OK to use a superset of the join key values
         // (ignore null values) to do the pruning.
-        if (left.output.exists(_.semanticEquals(filterCreationSideExp))) {
-          extract(left, AttributeSet.empty,
-            hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = left)
-        } else if (right.output.exists(_.semanticEquals(filterCreationSideExp))) {
-          extract(right, AttributeSet.empty,
-            hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = right)
+        if (left.output.exists(_.semanticEquals(targetCreationSideExpr))) {
+          val extracted = extract(left, AttributeSet.empty,
+            hasHitFilter = false, hasHitSelectiveFilter = false, targetPlan = left,
+            targetCreationSideExpr = targetCreationSideExpr)

Review Comment:
   nit:
   ```
   extract(...).getOrElse {
     ...
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on pull request #42317: [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on PR #42317:
URL: https://github.com/apache/spark/pull/42317#issuecomment-1710969251

   ping @cloud-fan I have already added comments for `extract`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #42317:
URL: https://github.com/apache/spark/pull/42317#issuecomment-1767789260

   thanks, merging to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #42317:
URL: https://github.com/apache/spark/pull/42317#discussion_r1360440483


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -143,41 +143,62 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
             referencedExprs.map(_.references).foldLeft(AttributeSet.empty)(_ ++ _),
             hasHitFilter,
             hasHitSelectiveFilter,
-            currentPlan)
+            currentPlan,
+            targetCreationSideExpr)
         } else {
           None
         }
       case Project(_, child) =>
         assert(predicateReference.isEmpty && !hasHitSelectiveFilter)
-        extract(child, predicateReference, hasHitFilter, hasHitSelectiveFilter, currentPlan)
+        extract(child, predicateReference, hasHitFilter, hasHitSelectiveFilter, currentPlan,
+          targetCreationSideExpr)
       case Filter(condition, child) if isSimpleExpression(condition) =>
         extract(
           child,
           predicateReference ++ condition.references,
           hasHitFilter = true,
           hasHitSelectiveFilter = hasHitSelectiveFilter || isLikelySelective(condition),
-          currentPlan)
-      case ExtractEquiJoinKeys(_, _, _, _, _, left, right, _) =>
+          currentPlan,
+          targetCreationSideExpr)
+      case ExtractEquiJoinKeys(_, lkeys, rkeys, _, _, left, right, _) =>
         // Runtime filters use one side of the [[Join]] to build a set of join key values and prune
         // the other side of the [[Join]]. It's also OK to use a superset of the join key values
         // (ignore null values) to do the pruning.
         if (left.output.exists(_.semanticEquals(filterCreationSideKey))) {
           extract(left, AttributeSet.empty,
-            hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = left)
+            hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = left,
+            targetCreationSideExpr = targetCreationSideExpr).orElse {
+            // We can also extract from the right side if the join keys are transitive.
+            lkeys.zip(rkeys).find(_._1.semanticEquals(targetCreationSideExpr)).map(_._2)
+              .flatMap { passedFilterCreationSideExp =>
+                extract(right, AttributeSet.empty,
+                  hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = right,

Review Comment:
   shouldn't we inherit the filter-related arguments?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #42317:
URL: https://github.com/apache/spark/pull/42317#discussion_r1356552372


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -139,41 +160,68 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
             referencedExprs.map(_.references).foldLeft(AttributeSet.empty)(_ ++ _),
             hasHitFilter,
             hasHitSelectiveFilter,
-            currentPlan)
+            targetPlan,
+            targetCreationSideExpr)
         } else {
           None
         }
       case Project(_, child) =>
         assert(predicateReference.isEmpty && !hasHitSelectiveFilter)
-        extract(child, predicateReference, hasHitFilter, hasHitSelectiveFilter, currentPlan)
+        extract(child, predicateReference, hasHitFilter, hasHitSelectiveFilter, targetPlan,
+          targetCreationSideExpr)
       case Filter(condition, child) if isSimpleExpression(condition) =>
         extract(
           child,
           predicateReference ++ condition.references,
           hasHitFilter = true,
           hasHitSelectiveFilter = hasHitSelectiveFilter || isLikelySelective(condition),
-          currentPlan)
-      case ExtractEquiJoinKeys(_, _, _, _, _, left, right, _) =>
+          targetPlan,
+          targetCreationSideExpr)
+      case ExtractEquiJoinKeys(_, lkeys, rkeys, _, _, left, right, _) =>
         // Runtime filters use one side of the [[Join]] to build a set of join key values and prune
         // the other side of the [[Join]]. It's also OK to use a superset of the join key values
         // (ignore null values) to do the pruning.
-        if (left.output.exists(_.semanticEquals(filterCreationSideExp))) {
-          extract(left, AttributeSet.empty,
-            hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = left)
-        } else if (right.output.exists(_.semanticEquals(filterCreationSideExp))) {
-          extract(right, AttributeSet.empty,
-            hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = right)
+        if (left.output.exists(_.semanticEquals(targetCreationSideExpr))) {
+          val extracted = extract(left, AttributeSet.empty,
+            hasHitFilter = false, hasHitSelectiveFilter = false, targetPlan = left,
+            targetCreationSideExpr = targetCreationSideExpr)
+          if (extracted.isEmpty) {
+            // There may be a passing of the creation side expression for runtime filters.

Review Comment:
   ```
   // We can also extract from the right side if the join keys are transitive.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #42317:
URL: https://github.com/apache/spark/pull/42317#discussion_r1362090110


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -331,8 +352,8 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
             val hasShuffle = isProbablyShuffleJoin(left, right, hint)
             if (canPruneLeft(joinType) && (hasShuffle || probablyHasShuffle(left))) {
               extractBeneficialFilterCreatePlan(left, right, l, r).foreach {
-                filterCreationSidePlan =>
-                  newLeft = injectFilter(l, newLeft, r, filterCreationSidePlan)
+                case (filterCreationSideExp, filterCreationSidePlan) =>

Review Comment:
   ```suggestion
                   case (filterCreationSideKey, filterCreationSidePlan) =>
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -341,8 +362,9 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
             if (newLeft.fastEquals(oldLeft) && canPruneRight(joinType) &&
               (hasShuffle || probablyHasShuffle(right))) {
               extractBeneficialFilterCreatePlan(right, left, r, l).foreach {
-                filterCreationSidePlan =>
-                  newRight = injectFilter(r, newRight, l, filterCreationSidePlan)
+                case (filterCreationSideExp, filterCreationSidePlan) =>

Review Comment:
   ```suggestion
                   case (filterCreationSideKey, filterCreationSidePlan) =>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on PR #42317:
URL: https://github.com/apache/spark/pull/42317#issuecomment-1751598293

   @cloud-fan 
   I added some comments to help see if it's suitable?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #42317:
URL: https://github.com/apache/spark/pull/42317#discussion_r1356531381


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -118,17 +122,34 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
    * filter should be selective and the filter condition (including expressions in the child
    * plan referenced by the filter condition) should be a simple expression, so that we do
    * not add a subquery that might have an expensive computation.
+   *
+   * Note: The parameter `filterCreationSideExp` represents the creation side expression initially

Review Comment:
   ```suggestion
      * Note: The parameter `filterCreationSideExp` represents the creation side join keys initially
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #42317:
URL: https://github.com/apache/spark/pull/42317#discussion_r1356124917


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -121,14 +128,44 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
    */
   private def extractSelectiveFilterOverScan(
       plan: LogicalPlan,
-      filterCreationSideExp: Expression): Option[LogicalPlan] = {
-    @tailrec
+      filterCreationSideExp: Expression): Option[(Expression, LogicalPlan)] = {

Review Comment:
   OK



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #42317:
URL: https://github.com/apache/spark/pull/42317#discussion_r1356551250


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -118,17 +122,34 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
    * filter should be selective and the filter condition (including expressions in the child
    * plan referenced by the filter condition) should be a simple expression, so that we do
    * not add a subquery that might have an expensive computation.
+   *
+   * Note: The parameter `filterCreationSideExp` represents the creation side expression initially
+   * confirmed outside the extract method. The target creation side expression may change during
+   * the join relationship pass process. So the extract method returns a optional tuple,
+   * which contains the target expression and plan of creation side.
    */
   private def extractSelectiveFilterOverScan(
       plan: LogicalPlan,
-      filterCreationSideExp: Expression): Option[LogicalPlan] = {
-    @tailrec
+      filterCreationSideExp: Expression): Option[(Expression, LogicalPlan)] = {
+
+    /**
+     * Extracts a sub-plan which is a simple filter over scan from the input plan iteratively
+     * by the way show below:
+     * - Extracts a sub-plan from a plan without join nodes.
+     * - Extracts a sub-plan from left or right child of join nodes.
+     *
+     * Note: There are two situations if extracts a sub-plan from any child of join nodes.
+     * If we can extract a sub-plan from one child of join node, using it directly.
+     * Otherwise, we extract a sub-plan from another child of join node if
+     * we can find out the passed creation side expression by the join relationship pass process.
+     */
     def extract(
         p: LogicalPlan,
         predicateReference: AttributeSet,
         hasHitFilter: Boolean,
         hasHitSelectiveFilter: Boolean,
-        currentPlan: LogicalPlan): Option[LogicalPlan] = p match {
+        targetPlan: LogicalPlan,
+        targetCreationSideExpr: Expression): Option[(Expression, LogicalPlan)] = p match {

Review Comment:
   if the `creation side` should only reference the original join side, then `equiKey` seems a better name here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #42317:
URL: https://github.com/apache/spark/pull/42317#discussion_r1360437686


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -118,17 +122,34 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
    * filter should be selective and the filter condition (including expressions in the child
    * plan referenced by the filter condition) should be a simple expression, so that we do
    * not add a subquery that might have an expensive computation.
+   *
+   * Note: The parameter `filterCreationSideExp` represents the creation side expression initially
+   * confirmed outside the extract method. The target creation side expression may change during
+   * the join relationship pass process. So the extract method returns a optional tuple,
+   * which contains the target expression and plan of creation side.
    */
   private def extractSelectiveFilterOverScan(
       plan: LogicalPlan,
-      filterCreationSideExp: Expression): Option[LogicalPlan] = {
-    @tailrec
+      filterCreationSideExp: Expression): Option[(Expression, LogicalPlan)] = {
+
+    /**
+     * Extracts a sub-plan which is a simple filter over scan from the input plan iteratively
+     * by the way show below:
+     * - Extracts a sub-plan from a plan without join nodes.
+     * - Extracts a sub-plan from left or right child of join nodes.
+     *
+     * Note: There are two situations if extracts a sub-plan from any child of join nodes.
+     * If we can extract a sub-plan from one child of join node, using it directly.
+     * Otherwise, we extract a sub-plan from another child of join node if
+     * we can find out the passed creation side expression by the join relationship pass process.
+     */
     def extract(
         p: LogicalPlan,
         predicateReference: AttributeSet,
         hasHitFilter: Boolean,
         hasHitSelectiveFilter: Boolean,
-        currentPlan: LogicalPlan): Option[LogicalPlan] = p match {
+        targetPlan: LogicalPlan,
+        targetCreationSideExpr: Expression): Option[(Expression, LogicalPlan)] = p match {

Review Comment:
   let's name it `targetKey`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #42317: [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #42317:
URL: https://github.com/apache/spark/pull/42317#discussion_r1312699399


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -121,14 +121,33 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
    */
   private def extractSelectiveFilterOverScan(
       plan: LogicalPlan,
-      filterCreationSideExp: Expression): Option[LogicalPlan] = {
-    @tailrec
+      filterCreationSideExp: Expression): Option[(Expression, LogicalPlan)] = {
+
+    /**
+     * Find the passed creation side expression.
+     *
+     * Note: If one side of the join condition contains the current creation side expression,
+     * the expression on the other side of the join condition can be used as
+     * the passed creation side.
+     */
+    def findPassedFilterCreationSideExp(
+        joinKeys: Seq[Expression],
+        otherJoinKeys: Seq[Expression],
+        filterCreationSideExp: Expression): Option[Expression] = {
+      joinKeys.zipWithIndex.find { case (key, _) =>
+        filterCreationSideExp.semanticEquals(key)
+      }.map { case (_, idx) =>
+        otherJoinKeys(idx)
+      }
+    }
+
     def extract(

Review Comment:
   it's not your fault, but without detail code comment on this `extract` function, I can't really understand the code change here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #42317:
URL: https://github.com/apache/spark/pull/42317#discussion_r1360438479


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -143,41 +143,62 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
             referencedExprs.map(_.references).foldLeft(AttributeSet.empty)(_ ++ _),
             hasHitFilter,
             hasHitSelectiveFilter,
-            currentPlan)
+            currentPlan,
+            targetCreationSideExpr)
         } else {
           None
         }
       case Project(_, child) =>
         assert(predicateReference.isEmpty && !hasHitSelectiveFilter)
-        extract(child, predicateReference, hasHitFilter, hasHitSelectiveFilter, currentPlan)
+        extract(child, predicateReference, hasHitFilter, hasHitSelectiveFilter, currentPlan,
+          targetCreationSideExpr)
       case Filter(condition, child) if isSimpleExpression(condition) =>
         extract(
           child,
           predicateReference ++ condition.references,
           hasHitFilter = true,
           hasHitSelectiveFilter = hasHitSelectiveFilter || isLikelySelective(condition),
-          currentPlan)
-      case ExtractEquiJoinKeys(_, _, _, _, _, left, right, _) =>
+          currentPlan,
+          targetCreationSideExpr)
+      case ExtractEquiJoinKeys(_, lkeys, rkeys, _, _, left, right, _) =>
         // Runtime filters use one side of the [[Join]] to build a set of join key values and prune
         // the other side of the [[Join]]. It's also OK to use a superset of the join key values
         // (ignore null values) to do the pruning.
         if (left.output.exists(_.semanticEquals(filterCreationSideKey))) {
           extract(left, AttributeSet.empty,
-            hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = left)
+            hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = left,
+            targetCreationSideExpr = targetCreationSideExpr).orElse {
+            // We can also extract from the right side if the join keys are transitive.
+            lkeys.zip(rkeys).find(_._1.semanticEquals(targetCreationSideExpr)).map(_._2)
+              .flatMap { passedFilterCreationSideExp =>

Review Comment:
   ```suggestion
                 .flatMap { newTargetKey =>
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -143,41 +143,62 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
             referencedExprs.map(_.references).foldLeft(AttributeSet.empty)(_ ++ _),
             hasHitFilter,
             hasHitSelectiveFilter,
-            currentPlan)
+            currentPlan,
+            targetCreationSideExpr)
         } else {
           None
         }
       case Project(_, child) =>
         assert(predicateReference.isEmpty && !hasHitSelectiveFilter)
-        extract(child, predicateReference, hasHitFilter, hasHitSelectiveFilter, currentPlan)
+        extract(child, predicateReference, hasHitFilter, hasHitSelectiveFilter, currentPlan,
+          targetCreationSideExpr)
       case Filter(condition, child) if isSimpleExpression(condition) =>
         extract(
           child,
           predicateReference ++ condition.references,
           hasHitFilter = true,
           hasHitSelectiveFilter = hasHitSelectiveFilter || isLikelySelective(condition),
-          currentPlan)
-      case ExtractEquiJoinKeys(_, _, _, _, _, left, right, _) =>
+          currentPlan,
+          targetCreationSideExpr)
+      case ExtractEquiJoinKeys(_, lkeys, rkeys, _, _, left, right, _) =>
         // Runtime filters use one side of the [[Join]] to build a set of join key values and prune
         // the other side of the [[Join]]. It's also OK to use a superset of the join key values
         // (ignore null values) to do the pruning.
         if (left.output.exists(_.semanticEquals(filterCreationSideKey))) {
           extract(left, AttributeSet.empty,
-            hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = left)
+            hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = left,
+            targetCreationSideExpr = targetCreationSideExpr).orElse {
+            // We can also extract from the right side if the join keys are transitive.
+            lkeys.zip(rkeys).find(_._1.semanticEquals(targetCreationSideExpr)).map(_._2)
+              .flatMap { passedFilterCreationSideExp =>
+                extract(right, AttributeSet.empty,
+                  hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = right,
+                  targetCreationSideExpr = passedFilterCreationSideExp)
+              }
+          }
         } else if (right.output.exists(_.semanticEquals(filterCreationSideKey))) {
           extract(right, AttributeSet.empty,
-            hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = right)
+            hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = right,
+            targetCreationSideExpr = targetCreationSideExpr).orElse {
+            // We can also extract from the right side if the join keys are transitive.
+            rkeys.zip(lkeys).find(_._1.semanticEquals(targetCreationSideExpr)).map(_._2)
+              .flatMap { passedFilterCreationSideExp =>

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #42317:
URL: https://github.com/apache/spark/pull/42317#discussion_r1360494759


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -143,41 +143,62 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
             referencedExprs.map(_.references).foldLeft(AttributeSet.empty)(_ ++ _),
             hasHitFilter,
             hasHitSelectiveFilter,
-            currentPlan)
+            currentPlan,
+            targetCreationSideExpr)
         } else {
           None
         }
       case Project(_, child) =>
         assert(predicateReference.isEmpty && !hasHitSelectiveFilter)
-        extract(child, predicateReference, hasHitFilter, hasHitSelectiveFilter, currentPlan)
+        extract(child, predicateReference, hasHitFilter, hasHitSelectiveFilter, currentPlan,
+          targetCreationSideExpr)
       case Filter(condition, child) if isSimpleExpression(condition) =>
         extract(
           child,
           predicateReference ++ condition.references,
           hasHitFilter = true,
           hasHitSelectiveFilter = hasHitSelectiveFilter || isLikelySelective(condition),
-          currentPlan)
-      case ExtractEquiJoinKeys(_, _, _, _, _, left, right, _) =>
+          currentPlan,
+          targetCreationSideExpr)
+      case ExtractEquiJoinKeys(_, lkeys, rkeys, _, _, left, right, _) =>
         // Runtime filters use one side of the [[Join]] to build a set of join key values and prune
         // the other side of the [[Join]]. It's also OK to use a superset of the join key values
         // (ignore null values) to do the pruning.
         if (left.output.exists(_.semanticEquals(filterCreationSideKey))) {
           extract(left, AttributeSet.empty,
-            hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = left)
+            hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = left,
+            targetCreationSideExpr = targetCreationSideExpr).orElse {
+            // We can also extract from the right side if the join keys are transitive.
+            lkeys.zip(rkeys).find(_._1.semanticEquals(targetCreationSideExpr)).map(_._2)
+              .flatMap { passedFilterCreationSideExp =>
+                extract(right, AttributeSet.empty,

Review Comment:
   Because `PushDownPredicates` works before this rule.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #42317:
URL: https://github.com/apache/spark/pull/42317#discussion_r1356628270


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -139,41 +160,68 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
             referencedExprs.map(_.references).foldLeft(AttributeSet.empty)(_ ++ _),
             hasHitFilter,
             hasHitSelectiveFilter,
-            currentPlan)
+            targetPlan,
+            targetCreationSideExpr)
         } else {
           None
         }
       case Project(_, child) =>
         assert(predicateReference.isEmpty && !hasHitSelectiveFilter)
-        extract(child, predicateReference, hasHitFilter, hasHitSelectiveFilter, currentPlan)
+        extract(child, predicateReference, hasHitFilter, hasHitSelectiveFilter, targetPlan,
+          targetCreationSideExpr)
       case Filter(condition, child) if isSimpleExpression(condition) =>
         extract(
           child,
           predicateReference ++ condition.references,
           hasHitFilter = true,
           hasHitSelectiveFilter = hasHitSelectiveFilter || isLikelySelective(condition),
-          currentPlan)
-      case ExtractEquiJoinKeys(_, _, _, _, _, left, right, _) =>
+          targetPlan,
+          targetCreationSideExpr)
+      case ExtractEquiJoinKeys(_, lkeys, rkeys, _, _, left, right, _) =>
         // Runtime filters use one side of the [[Join]] to build a set of join key values and prune
         // the other side of the [[Join]]. It's also OK to use a superset of the join key values
         // (ignore null values) to do the pruning.
-        if (left.output.exists(_.semanticEquals(filterCreationSideExp))) {
-          extract(left, AttributeSet.empty,
-            hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = left)
-        } else if (right.output.exists(_.semanticEquals(filterCreationSideExp))) {
-          extract(right, AttributeSet.empty,
-            hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = right)
+        if (left.output.exists(_.semanticEquals(targetCreationSideExpr))) {
+          val extracted = extract(left, AttributeSet.empty,
+            hasHitFilter = false, hasHitSelectiveFilter = false, targetPlan = left,
+            targetCreationSideExpr = targetCreationSideExpr)

Review Comment:
   Yeah. Let's use `OrElse`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #42317:
URL: https://github.com/apache/spark/pull/42317#discussion_r1356096817


##########
sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala:
##########
@@ -390,34 +390,58 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp
   test("Runtime bloom filter join: two joins") {
     withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000",
       SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") {
-      assertRewroteWithBloomFilter("select * from bf1 join bf2 join bf3 on bf1.c1 = bf2.c2 " +

Review Comment:
   Control the join exists shuffle.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #42317:
URL: https://github.com/apache/spark/pull/42317#discussion_r1356546564


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -29,7 +29,11 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
 /**
- * Insert a filter on one side of the join if the other side has a selective predicate.
+ * Insert a runtime filter on one side of the join (we call this side the application side)
+ * if we can extract a plan (we call this plan the creation side) from the other side and

Review Comment:
   Sounds reasonable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #42317:
URL: https://github.com/apache/spark/pull/42317#discussion_r1356628946


##########
sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala:
##########
@@ -390,34 +390,60 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp
   test("Runtime bloom filter join: two joins") {
     withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000",
       SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") {
+      // bf2 as creation side and inject runtime filter for bf1 and bf3.
       assertRewroteWithBloomFilter("select * from bf1 join bf2 join bf3 on bf1.c1 = bf2.c2 " +
         "and bf3.c3 = bf2.c2 where bf2.a2 = 5", 2)
-      assertRewroteWithBloomFilter("select * from (select * from bf1 left semi join bf2 on " +
-        "bf1.c1 = bf2.c2 where bf1.a1 = 5) as a join bf3 on bf3.c3 = a.c1")
-      assertRewroteWithBloomFilter("select * from (select * from bf1 left anti join bf2 on " +
-        "bf1.c1 = bf2.c2 where bf1.a1 = 5) as a join bf3 on bf3.c3 = a.c1")
       assertRewroteWithBloomFilter("select * from bf1 left outer join bf2 join bf3 on " +
         "bf1.c1 = bf2.c2 and bf3.c3 = bf2.c2 where bf2.a2 = 5", 2)
       assertRewroteWithBloomFilter("select * from bf1 right outer join bf2 join bf3 on " +
         "bf1.c1 = bf2.c2 and bf3.c3 = bf2.c2 where bf2.a2 = 5", 2)
+      // bf1 and bf2 hasn't shuffle. bf1 as creation side and inject runtime filter for bf3.
+      assertRewroteWithBloomFilter("select * from (select * from bf1 left semi join bf2 on " +

Review Comment:
   Here are two aspects:
   One of these test cases, I recently found out that I forgot which specific point is being tested?
   On the other hand, I have grouped the same type together. Otherwise, it looks a bit messy.
   
   So, I organized it today and added which case to test. It also facilitates readability for developers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #42317: [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #42317:
URL: https://github.com/apache/spark/pull/42317#discussion_r1312699399


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -121,14 +121,33 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
    */
   private def extractSelectiveFilterOverScan(
       plan: LogicalPlan,
-      filterCreationSideExp: Expression): Option[LogicalPlan] = {
-    @tailrec
+      filterCreationSideExp: Expression): Option[(Expression, LogicalPlan)] = {
+
+    /**
+     * Find the passed creation side expression.
+     *
+     * Note: If one side of the join condition contains the current creation side expression,
+     * the expression on the other side of the join condition can be used as
+     * the passed creation side.
+     */
+    def findPassedFilterCreationSideExp(
+        joinKeys: Seq[Expression],
+        otherJoinKeys: Seq[Expression],
+        filterCreationSideExp: Expression): Option[Expression] = {
+      joinKeys.zipWithIndex.find { case (key, _) =>
+        filterCreationSideExp.semanticEquals(key)
+      }.map { case (_, idx) =>
+        otherJoinKeys(idx)
+      }
+    }
+
     def extract(

Review Comment:
   it's not your fault, but without detail code comment on this `extract` function, I can't really understand the code change on it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #42317:
URL: https://github.com/apache/spark/pull/42317#discussion_r1354849545


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -139,41 +176,68 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
             referencedExprs.map(_.references).foldLeft(AttributeSet.empty)(_ ++ _),
             hasHitFilter,
             hasHitSelectiveFilter,
-            currentPlan)
+            currentPlan,
+            currentFilterCreationSideExp)
         } else {
           None
         }
       case Project(_, child) =>
         assert(predicateReference.isEmpty && !hasHitSelectiveFilter)
-        extract(child, predicateReference, hasHitFilter, hasHitSelectiveFilter, currentPlan)
+        extract(child, predicateReference, hasHitFilter, hasHitSelectiveFilter, currentPlan,
+          currentFilterCreationSideExp)
       case Filter(condition, child) if isSimpleExpression(condition) =>
         extract(
           child,
           predicateReference ++ condition.references,
           hasHitFilter = true,
           hasHitSelectiveFilter = hasHitSelectiveFilter || isLikelySelective(condition),
-          currentPlan)
-      case ExtractEquiJoinKeys(_, _, _, _, _, left, right, _) =>
+          currentPlan,
+          currentFilterCreationSideExp)
+      case ExtractEquiJoinKeys(_, lkeys, rkeys, _, _, left, right, _) =>
         // Runtime filters use one side of the [[Join]] to build a set of join key values and prune
         // the other side of the [[Join]]. It's also OK to use a superset of the join key values
         // (ignore null values) to do the pruning.
-        if (left.output.exists(_.semanticEquals(filterCreationSideExp))) {
-          extract(left, AttributeSet.empty,
-            hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = left)
-        } else if (right.output.exists(_.semanticEquals(filterCreationSideExp))) {
-          extract(right, AttributeSet.empty,
-            hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = right)
+        if (left.output.exists(_.semanticEquals(currentFilterCreationSideExp))) {
+          val extracted = extract(left, AttributeSet.empty,
+            hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = left,
+            currentFilterCreationSideExp = currentFilterCreationSideExp)
+          if (extracted.isEmpty) {
+            // There may be a passing of the creation side expression for runtime filters.
+            findPassedFilterCreationSideExp(lkeys, rkeys, currentFilterCreationSideExp)

Review Comment:
   instead of adding the `findPassedFilterCreationSideExp` method, I think we can just write simple code here
   ```
   lkeys.zip(rkeys).find(_._1.semanticEquals(targetCreationSideExpr)).map(_._2)...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #42317:
URL: https://github.com/apache/spark/pull/42317#discussion_r1354840980


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -29,7 +29,14 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
 /**
- * Insert a filter on one side of the join if the other side has a selective predicate.
+ * Insert a runtime filter on one side of the join (we call this side the application side) if
+ * the plan of other side satisfy one of the following scenes:

Review Comment:
   We can be general here and mention details in the corresponding methods. Here can be
   ```
   Insert a runtime filter on one side of the join (we call this side the application side) if we can extract
   a runtime filter from the other side (creation side). A simple case is that the creation side is a
   table scan with a selective filter.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #42317:
URL: https://github.com/apache/spark/pull/42317#discussion_r1356541828


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -118,17 +122,34 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
    * filter should be selective and the filter condition (including expressions in the child
    * plan referenced by the filter condition) should be a simple expression, so that we do
    * not add a subquery that might have an expensive computation.
+   *
+   * Note: The parameter `filterCreationSideExp` represents the creation side expression initially
+   * confirmed outside the extract method. The target creation side expression may change during
+   * the join relationship pass process. So the extract method returns a optional tuple,
+   * which contains the target expression and plan of creation side.

Review Comment:
   how about
   ```
   Note: The parameter `filterCreationSideExp` represents the creation side join key, but the extracted
   sub-plan may use different key as we can extract the sub-plan from both sides of inner joins and the
   join key can be transitive. We need to return the actual join key used by the extracted plan.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #42317:
URL: https://github.com/apache/spark/pull/42317#discussion_r1356628946


##########
sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala:
##########
@@ -390,34 +390,60 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp
   test("Runtime bloom filter join: two joins") {
     withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000",
       SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") {
+      // bf2 as creation side and inject runtime filter for bf1 and bf3.
       assertRewroteWithBloomFilter("select * from bf1 join bf2 join bf3 on bf1.c1 = bf2.c2 " +
         "and bf3.c3 = bf2.c2 where bf2.a2 = 5", 2)
-      assertRewroteWithBloomFilter("select * from (select * from bf1 left semi join bf2 on " +
-        "bf1.c1 = bf2.c2 where bf1.a1 = 5) as a join bf3 on bf3.c3 = a.c1")
-      assertRewroteWithBloomFilter("select * from (select * from bf1 left anti join bf2 on " +
-        "bf1.c1 = bf2.c2 where bf1.a1 = 5) as a join bf3 on bf3.c3 = a.c1")
       assertRewroteWithBloomFilter("select * from bf1 left outer join bf2 join bf3 on " +
         "bf1.c1 = bf2.c2 and bf3.c3 = bf2.c2 where bf2.a2 = 5", 2)
       assertRewroteWithBloomFilter("select * from bf1 right outer join bf2 join bf3 on " +
         "bf1.c1 = bf2.c2 and bf3.c3 = bf2.c2 where bf2.a2 = 5", 2)
+      // bf1 and bf2 hasn't shuffle. bf1 as creation side and inject runtime filter for bf3.
+      assertRewroteWithBloomFilter("select * from (select * from bf1 left semi join bf2 on " +

Review Comment:
   Here are two aspects:
   One of these test cases, I recently found out that I forgot why and inserted runtime filter. Which specific point is being tested?
   On the other hand, I have grouped the same type together. Otherwise, it looks a bit messy.
   
   So, I organized it today and added which case to test. It also facilitates readability for developers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan closed pull request #42317: [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions
URL: https://github.com/apache/spark/pull/42317


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on pull request #42317: [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on PR #42317:
URL: https://github.com/apache/spark/pull/42317#issuecomment-1669436970

   The CI failure is unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #42317:
URL: https://github.com/apache/spark/pull/42317#discussion_r1360495142


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -143,41 +143,62 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
             referencedExprs.map(_.references).foldLeft(AttributeSet.empty)(_ ++ _),
             hasHitFilter,
             hasHitSelectiveFilter,
-            currentPlan)
+            currentPlan,
+            targetCreationSideExpr)
         } else {
           None
         }
       case Project(_, child) =>
         assert(predicateReference.isEmpty && !hasHitSelectiveFilter)
-        extract(child, predicateReference, hasHitFilter, hasHitSelectiveFilter, currentPlan)
+        extract(child, predicateReference, hasHitFilter, hasHitSelectiveFilter, currentPlan,
+          targetCreationSideExpr)
       case Filter(condition, child) if isSimpleExpression(condition) =>
         extract(
           child,
           predicateReference ++ condition.references,
           hasHitFilter = true,
           hasHitSelectiveFilter = hasHitSelectiveFilter || isLikelySelective(condition),
-          currentPlan)
-      case ExtractEquiJoinKeys(_, _, _, _, _, left, right, _) =>
+          currentPlan,
+          targetCreationSideExpr)
+      case ExtractEquiJoinKeys(_, lkeys, rkeys, _, _, left, right, _) =>
         // Runtime filters use one side of the [[Join]] to build a set of join key values and prune
         // the other side of the [[Join]]. It's also OK to use a superset of the join key values
         // (ignore null values) to do the pruning.
         if (left.output.exists(_.semanticEquals(filterCreationSideKey))) {
           extract(left, AttributeSet.empty,
-            hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = left)
+            hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = left,
+            targetCreationSideExpr = targetCreationSideExpr).orElse {
+            // We can also extract from the right side if the join keys are transitive.
+            lkeys.zip(rkeys).find(_._1.semanticEquals(targetCreationSideExpr)).map(_._2)
+              .flatMap { passedFilterCreationSideExp =>
+                extract(right, AttributeSet.empty,
+                  hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = right,

Review Comment:
   Let's add a comment now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #42317:
URL: https://github.com/apache/spark/pull/42317#discussion_r1360439724


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -143,41 +143,62 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
             referencedExprs.map(_.references).foldLeft(AttributeSet.empty)(_ ++ _),
             hasHitFilter,
             hasHitSelectiveFilter,
-            currentPlan)
+            currentPlan,
+            targetCreationSideExpr)
         } else {
           None
         }
       case Project(_, child) =>
         assert(predicateReference.isEmpty && !hasHitSelectiveFilter)
-        extract(child, predicateReference, hasHitFilter, hasHitSelectiveFilter, currentPlan)
+        extract(child, predicateReference, hasHitFilter, hasHitSelectiveFilter, currentPlan,
+          targetCreationSideExpr)
       case Filter(condition, child) if isSimpleExpression(condition) =>
         extract(
           child,
           predicateReference ++ condition.references,
           hasHitFilter = true,
           hasHitSelectiveFilter = hasHitSelectiveFilter || isLikelySelective(condition),
-          currentPlan)
-      case ExtractEquiJoinKeys(_, _, _, _, _, left, right, _) =>
+          currentPlan,
+          targetCreationSideExpr)
+      case ExtractEquiJoinKeys(_, lkeys, rkeys, _, _, left, right, _) =>
         // Runtime filters use one side of the [[Join]] to build a set of join key values and prune
         // the other side of the [[Join]]. It's also OK to use a superset of the join key values
         // (ignore null values) to do the pruning.
         if (left.output.exists(_.semanticEquals(filterCreationSideKey))) {
           extract(left, AttributeSet.empty,
-            hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = left)
+            hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = left,
+            targetCreationSideExpr = targetCreationSideExpr).orElse {
+            // We can also extract from the right side if the join keys are transitive.
+            lkeys.zip(rkeys).find(_._1.semanticEquals(targetCreationSideExpr)).map(_._2)
+              .flatMap { passedFilterCreationSideExp =>
+                extract(right, AttributeSet.empty,

Review Comment:
   shall we rewrite `predicateReference` with the attr map? (left keys -> right keys)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on PR #42317:
URL: https://github.com/apache/spark/pull/42317#issuecomment-1765547107

   The GA failure is unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #42317:
URL: https://github.com/apache/spark/pull/42317#discussion_r1354904128


##########
sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala:
##########
@@ -390,34 +390,58 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp
   test("Runtime bloom filter join: two joins") {
     withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000",
       SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") {
-      assertRewroteWithBloomFilter("select * from bf1 join bf2 join bf3 on bf1.c1 = bf2.c2 " +

Review Comment:
   why do we need to run this query with `AUTO_BROADCASTJOIN_THRESHOLD` set to 1200 now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #42317:
URL: https://github.com/apache/spark/pull/42317#discussion_r1356017720


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -29,7 +29,14 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
 /**
- * Insert a filter on one side of the join if the other side has a selective predicate.
+ * Insert a runtime filter on one side of the join (we call this side the application side) if
+ * the plan of other side satisfy one of the following scenes:

Review Comment:
   To be precise, `the other side` may not be `creation side`, the creation side may be a sub plan of the other side.
   
   So I adjust this comment.
   
   ```
   Insert a runtime filter on one side of the join (we call this side the application side)
   if we can extract a plan (we call this plan the creation side) from the other side and
   construct a runtime filter with the creation side. A simple case is that the creation side
   is a table scan with a selective filter.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on PR #42317:
URL: https://github.com/apache/spark/pull/42317#issuecomment-1767795481

   @cloud-fan Thank you! I will do the de-alias project.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #42317:
URL: https://github.com/apache/spark/pull/42317#discussion_r1360440880


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -143,41 +143,62 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
             referencedExprs.map(_.references).foldLeft(AttributeSet.empty)(_ ++ _),
             hasHitFilter,
             hasHitSelectiveFilter,
-            currentPlan)
+            currentPlan,
+            targetCreationSideExpr)
         } else {
           None
         }
       case Project(_, child) =>
         assert(predicateReference.isEmpty && !hasHitSelectiveFilter)
-        extract(child, predicateReference, hasHitFilter, hasHitSelectiveFilter, currentPlan)
+        extract(child, predicateReference, hasHitFilter, hasHitSelectiveFilter, currentPlan,
+          targetCreationSideExpr)
       case Filter(condition, child) if isSimpleExpression(condition) =>
         extract(
           child,
           predicateReference ++ condition.references,
           hasHitFilter = true,
           hasHitSelectiveFilter = hasHitSelectiveFilter || isLikelySelective(condition),
-          currentPlan)
-      case ExtractEquiJoinKeys(_, _, _, _, _, left, right, _) =>
+          currentPlan,
+          targetCreationSideExpr)
+      case ExtractEquiJoinKeys(_, lkeys, rkeys, _, _, left, right, _) =>
         // Runtime filters use one side of the [[Join]] to build a set of join key values and prune
         // the other side of the [[Join]]. It's also OK to use a superset of the join key values
         // (ignore null values) to do the pruning.
         if (left.output.exists(_.semanticEquals(filterCreationSideKey))) {
           extract(left, AttributeSet.empty,
-            hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = left)
+            hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = left,
+            targetCreationSideExpr = targetCreationSideExpr).orElse {
+            // We can also extract from the right side if the join keys are transitive.
+            lkeys.zip(rkeys).find(_._1.semanticEquals(targetCreationSideExpr)).map(_._2)
+              .flatMap { passedFilterCreationSideExp =>
+                extract(right, AttributeSet.empty,
+                  hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = right,

Review Comment:
   let's also add tests to cover this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #42317:
URL: https://github.com/apache/spark/pull/42317#discussion_r1356637969


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -118,17 +122,34 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
    * filter should be selective and the filter condition (including expressions in the child
    * plan referenced by the filter condition) should be a simple expression, so that we do
    * not add a subquery that might have an expensive computation.
+   *
+   * Note: The parameter `filterCreationSideExp` represents the creation side expression initially
+   * confirmed outside the extract method. The target creation side expression may change during
+   * the join relationship pass process. So the extract method returns a optional tuple,
+   * which contains the target expression and plan of creation side.
    */
   private def extractSelectiveFilterOverScan(
       plan: LogicalPlan,
-      filterCreationSideExp: Expression): Option[LogicalPlan] = {
-    @tailrec
+      filterCreationSideExp: Expression): Option[(Expression, LogicalPlan)] = {
+
+    /**
+     * Extracts a sub-plan which is a simple filter over scan from the input plan iteratively
+     * by the way show below:
+     * - Extracts a sub-plan from a plan without join nodes.
+     * - Extracts a sub-plan from left or right child of join nodes.
+     *
+     * Note: There are two situations if extracts a sub-plan from any child of join nodes.
+     * If we can extract a sub-plan from one child of join node, using it directly.
+     * Otherwise, we extract a sub-plan from another child of join node if
+     * we can find out the passed creation side expression by the join relationship pass process.
+     */
     def extract(
         p: LogicalPlan,
         predicateReference: AttributeSet,
         hasHitFilter: Boolean,
         hasHitSelectiveFilter: Boolean,
-        currentPlan: LogicalPlan): Option[LogicalPlan] = p match {
+        targetPlan: LogicalPlan,
+        targetCreationSideExpr: Expression): Option[(Expression, LogicalPlan)] = p match {

Review Comment:
   OK. Let's create a PR to rename all the variable names.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #42317:
URL: https://github.com/apache/spark/pull/42317#discussion_r1356530715


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -118,17 +122,34 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
    * filter should be selective and the filter condition (including expressions in the child
    * plan referenced by the filter condition) should be a simple expression, so that we do
    * not add a subquery that might have an expensive computation.

Review Comment:
   let's add more doc to explain why we can extract a sub-plan to filter the application side.
   ```
   The extracted sub-plan should produce a superset of the entire creation side output data, so that
   it's still correct to use the sub-plan to build the runtime filter to prune the application side.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #42317:
URL: https://github.com/apache/spark/pull/42317#discussion_r1356616038


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -139,41 +160,68 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
             referencedExprs.map(_.references).foldLeft(AttributeSet.empty)(_ ++ _),
             hasHitFilter,
             hasHitSelectiveFilter,
-            currentPlan)
+            targetPlan,
+            targetCreationSideExpr)
         } else {
           None
         }
       case Project(_, child) =>
         assert(predicateReference.isEmpty && !hasHitSelectiveFilter)
-        extract(child, predicateReference, hasHitFilter, hasHitSelectiveFilter, currentPlan)
+        extract(child, predicateReference, hasHitFilter, hasHitSelectiveFilter, targetPlan,
+          targetCreationSideExpr)
       case Filter(condition, child) if isSimpleExpression(condition) =>
         extract(
           child,
           predicateReference ++ condition.references,
           hasHitFilter = true,
           hasHitSelectiveFilter = hasHitSelectiveFilter || isLikelySelective(condition),
-          currentPlan)
-      case ExtractEquiJoinKeys(_, _, _, _, _, left, right, _) =>
+          targetPlan,
+          targetCreationSideExpr)
+      case ExtractEquiJoinKeys(_, lkeys, rkeys, _, _, left, right, _) =>
         // Runtime filters use one side of the [[Join]] to build a set of join key values and prune
         // the other side of the [[Join]]. It's also OK to use a superset of the join key values
         // (ignore null values) to do the pruning.
-        if (left.output.exists(_.semanticEquals(filterCreationSideExp))) {
-          extract(left, AttributeSet.empty,
-            hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = left)
-        } else if (right.output.exists(_.semanticEquals(filterCreationSideExp))) {
-          extract(right, AttributeSet.empty,
-            hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = right)
+        if (left.output.exists(_.semanticEquals(targetCreationSideExpr))) {
+          val extracted = extract(left, AttributeSet.empty,
+            hasHitFilter = false, hasHitSelectiveFilter = false, targetPlan = left,
+            targetCreationSideExpr = targetCreationSideExpr)

Review Comment:
   Because this method returns `Option[(Expression, LogicalPlan)]`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #42317:
URL: https://github.com/apache/spark/pull/42317#discussion_r1354845923


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -121,14 +128,44 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
    */
   private def extractSelectiveFilterOverScan(
       plan: LogicalPlan,
-      filterCreationSideExp: Expression): Option[LogicalPlan] = {
-    @tailrec
+      filterCreationSideExp: Expression): Option[(Expression, LogicalPlan)] = {
+
+    /**
+     * Find the passed creation side expression.
+     *
+     * Note: If one side of the join condition contains the current creation side expression,
+     * the expression on the other side of the join condition can be used as
+     * the passed creation side.
+     */
+    def findPassedFilterCreationSideExp(
+        joinKeys: Seq[Expression],
+        otherJoinKeys: Seq[Expression],
+        filterCreationSideExp: Expression): Option[Expression] = {
+      joinKeys.zipWithIndex.find { case (key, _) =>
+        filterCreationSideExp.semanticEquals(key)
+      }.map { case (_, idx) =>
+        otherJoinKeys(idx)
+      }
+    }
+
+    /**
+     * Extracts a sub-plan which is a simple filter over scan from the input plan iteratively
+     * by the way show below:
+     * - Extracts a sub-plan from a plan without join nodes.
+     * - Extracts a sub-plan from left or right child of join nodes.
+     *
+     * Note: There are two situations if extracts a sub-plan from any child of join nodes.
+     * If we can extract a sub-plan from one child of join node, using it directly.
+     * Otherwise, we extract a sub-plan from another child of join node if
+     * we can find out the passed creation side expression.
+     */
     def extract(
         p: LogicalPlan,
         predicateReference: AttributeSet,
         hasHitFilter: Boolean,
         hasHitSelectiveFilter: Boolean,
-        currentPlan: LogicalPlan): Option[LogicalPlan] = p match {
+        currentPlan: LogicalPlan,
+        currentFilterCreationSideExp: Expression): Option[(Expression, LogicalPlan)] = p match {

Review Comment:
   `current` is a bit weird here. How about `targetCreationSideExpr`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on pull request #42317: [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on PR #42317:
URL: https://github.com/apache/spark/pull/42317#issuecomment-1687318910

   ping @cloud-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #42317:
URL: https://github.com/apache/spark/pull/42317#discussion_r1356545232


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -118,17 +122,34 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
    * filter should be selective and the filter condition (including expressions in the child
    * plan referenced by the filter condition) should be a simple expression, so that we do
    * not add a subquery that might have an expensive computation.
+   *
+   * Note: The parameter `filterCreationSideExp` represents the creation side expression initially
+   * confirmed outside the extract method. The target creation side expression may change during
+   * the join relationship pass process. So the extract method returns a optional tuple,
+   * which contains the target expression and plan of creation side.
    */
   private def extractSelectiveFilterOverScan(
       plan: LogicalPlan,
-      filterCreationSideExp: Expression): Option[LogicalPlan] = {
-    @tailrec
+      filterCreationSideExp: Expression): Option[(Expression, LogicalPlan)] = {
+
+    /**
+     * Extracts a sub-plan which is a simple filter over scan from the input plan iteratively
+     * by the way show below:
+     * - Extracts a sub-plan from a plan without join nodes.
+     * - Extracts a sub-plan from left or right child of join nodes.
+     *
+     * Note: There are two situations if extracts a sub-plan from any child of join nodes.
+     * If we can extract a sub-plan from one child of join node, using it directly.
+     * Otherwise, we extract a sub-plan from another child of join node if
+     * we can find out the passed creation side expression by the join relationship pass process.
+     */

Review Comment:
   I think we can remove the doc here now as `extractSelectiveFilterOverScan` should already explained it well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #42317:
URL: https://github.com/apache/spark/pull/42317#discussion_r1356541828


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -118,17 +122,34 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
    * filter should be selective and the filter condition (including expressions in the child
    * plan referenced by the filter condition) should be a simple expression, so that we do
    * not add a subquery that might have an expensive computation.
+   *
+   * Note: The parameter `filterCreationSideExp` represents the creation side expression initially
+   * confirmed outside the extract method. The target creation side expression may change during
+   * the join relationship pass process. So the extract method returns a optional tuple,
+   * which contains the target expression and plan of creation side.

Review Comment:
   how about
   ```
   Note: The parameter `filterCreationSideExp` represents the creation side join keys, but the extracted
   sub-plan may use different keys as we can extract the sub-plan from both sides of inner joins and the
   join keys can be transitive.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #42317:
URL: https://github.com/apache/spark/pull/42317#discussion_r1356553243


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -118,17 +122,34 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
    * filter should be selective and the filter condition (including expressions in the child
    * plan referenced by the filter condition) should be a simple expression, so that we do
    * not add a subquery that might have an expensive computation.
+   *
+   * Note: The parameter `filterCreationSideExp` represents the creation side expression initially
+   * confirmed outside the extract method. The target creation side expression may change during
+   * the join relationship pass process. So the extract method returns a optional tuple,
+   * which contains the target expression and plan of creation side.
    */
   private def extractSelectiveFilterOverScan(
       plan: LogicalPlan,
-      filterCreationSideExp: Expression): Option[LogicalPlan] = {
-    @tailrec
+      filterCreationSideExp: Expression): Option[(Expression, LogicalPlan)] = {
+
+    /**
+     * Extracts a sub-plan which is a simple filter over scan from the input plan iteratively
+     * by the way show below:
+     * - Extracts a sub-plan from a plan without join nodes.
+     * - Extracts a sub-plan from left or right child of join nodes.
+     *
+     * Note: There are two situations if extracts a sub-plan from any child of join nodes.
+     * If we can extract a sub-plan from one child of join node, using it directly.
+     * Otherwise, we extract a sub-plan from another child of join node if
+     * we can find out the passed creation side expression by the join relationship pass process.
+     */
     def extract(
         p: LogicalPlan,
         predicateReference: AttributeSet,
         hasHitFilter: Boolean,
         hasHitSelectiveFilter: Boolean,
-        currentPlan: LogicalPlan): Option[LogicalPlan] = p match {
+        targetPlan: LogicalPlan,

Review Comment:
   `currentPlan` should be better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on PR #42317:
URL: https://github.com/apache/spark/pull/42317#issuecomment-1762596244

   The GA failure is unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on pull request #42317: [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on PR #42317:
URL: https://github.com/apache/spark/pull/42317#issuecomment-1665610287

   ping @cloud-fan @somani 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #42317: [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #42317:
URL: https://github.com/apache/spark/pull/42317#issuecomment-1702329344

   I have no idea how to review the code change here. I understand the general idea but how to map it to the actual code? What does the `def extract` do today and why do you add a new parameter to it? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #42317: [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #42317:
URL: https://github.com/apache/spark/pull/42317#discussion_r1312831022


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -121,14 +121,33 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
    */
   private def extractSelectiveFilterOverScan(
       plan: LogicalPlan,
-      filterCreationSideExp: Expression): Option[LogicalPlan] = {
-    @tailrec
+      filterCreationSideExp: Expression): Option[(Expression, LogicalPlan)] = {
+
+    /**
+     * Find the passed creation side expression.
+     *
+     * Note: If one side of the join condition contains the current creation side expression,
+     * the expression on the other side of the join condition can be used as
+     * the passed creation side.
+     */
+    def findPassedFilterCreationSideExp(
+        joinKeys: Seq[Expression],
+        otherJoinKeys: Seq[Expression],
+        filterCreationSideExp: Expression): Option[Expression] = {
+      joinKeys.zipWithIndex.find { case (key, _) =>
+        filterCreationSideExp.semanticEquals(key)
+      }.map { case (_, idx) =>
+        otherJoinKeys(idx)
+      }
+    }
+
     def extract(

Review Comment:
   I have already added comments for `extract`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org