You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "beliefer (via GitHub)" <gi...@apache.org> on 2023/10/19 08:05:12 UTC

[PR] [WIP][SPARK-45606][SQL] Release restrictions on multi-layer runtime filter [spark]

beliefer opened a new pull request, #43449:
URL: https://github.com/apache/spark/pull/43449

   ### What changes were proposed in this pull request?
   Before https://github.com/apache/spark/pull/39170, Spark only supports insert runtime filter for application side of shuffle join on single-layer. Considered it's not worth to insert more runtime filter if one side of the shuffle join already exists runtime filter, Spark restricts it at https://github.com/apache/spark/blob/7057952f6bc2c5cf97dd408effd1b18bee1cb8f4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala#L346
   
   After https://github.com/apache/spark/pull/39170, Spark supports insert runtime filter for one side of any shuffle join on multi-layer. But the restrictions on multi-layer runtime filter mentioned above looks outdated.
   
   
   ### Why are the changes needed?
   Release restrictions on multi-layer runtime filter.
   Expand optimization surface.
   
   
   ### Does this PR introduce _any_ user-facing change?
   'No'.
   New feature.
   
   
   ### How was this patch tested?
   Test cases updated.
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   'No'.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45606][SQL] Release restrictions on multi-layer runtime filter [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43449:
URL: https://github.com/apache/spark/pull/43449#discussion_r1384852516


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -277,28 +269,33 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
         leftKeys.lazyZip(rightKeys).foreach((l, r) => {
           // Check if:
           // 1. There is already a DPP filter on the key
-          // 2. There is already a bloom filter on the key
-          // 3. The keys are simple cheap expressions
+          // 2. The keys are simple cheap expressions
           if (filterCounter < numFilterThreshold &&
             !hasDynamicPruningSubquery(left, right, l, r) &&
-            !hasBloomFilter(newLeft, newRight, l, r) &&
             isSimpleExpression(l) && isSimpleExpression(r)) {
             val oldLeft = newLeft
             val oldRight = newRight
-            // Check if the current join is a shuffle join or a broadcast join that
-            // has a shuffle below it
+            // Check if:
+            // 1. The current join type supports prune the left side with runtime filter
+            // 2. The current join is a shuffle join or a broadcast join that
+            //    has a shuffle below it
+            // 3. There is already a bloom filter on the left key

Review Comment:
   ```suggestion
               // 3. There is no bloom filter on the left key yet
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -277,28 +269,33 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
         leftKeys.lazyZip(rightKeys).foreach((l, r) => {
           // Check if:
           // 1. There is already a DPP filter on the key
-          // 2. There is already a bloom filter on the key
-          // 3. The keys are simple cheap expressions
+          // 2. The keys are simple cheap expressions
           if (filterCounter < numFilterThreshold &&
             !hasDynamicPruningSubquery(left, right, l, r) &&
-            !hasBloomFilter(newLeft, newRight, l, r) &&
             isSimpleExpression(l) && isSimpleExpression(r)) {
             val oldLeft = newLeft
             val oldRight = newRight
-            // Check if the current join is a shuffle join or a broadcast join that
-            // has a shuffle below it
+            // Check if:
+            // 1. The current join type supports prune the left side with runtime filter
+            // 2. The current join is a shuffle join or a broadcast join that
+            //    has a shuffle below it
+            // 3. There is already a bloom filter on the left key
             val hasShuffle = isProbablyShuffleJoin(left, right, hint)
-            if (canPruneLeft(joinType) && (hasShuffle || probablyHasShuffle(left))) {
+            if (canPruneLeft(joinType) && (hasShuffle || probablyHasShuffle(left)) &&
+              !hasBloomFilter(newLeft, l)) {
               extractBeneficialFilterCreatePlan(left, right, l, r).foreach {
                 case (filterCreationSideKey, filterCreationSidePlan) =>
                   newLeft = injectFilter(l, newLeft, filterCreationSideKey, filterCreationSidePlan)
               }
             }
             // Did we actually inject on the left? If not, try on the right
-            // Check if the current join is a shuffle join or a broadcast join that
-            // has a shuffle below it
+            // Check if:
+            // 1. The current join type supports prune the right side with runtime filter
+            // 2. The current join is a shuffle join or a broadcast join that
+            //    has a shuffle below it
+            // 3. There is already a bloom filter on the right key

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45606][SQL] Release restrictions on multi-layer runtime filter [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #43449:
URL: https://github.com/apache/spark/pull/43449#discussion_r1366682188


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -305,7 +305,8 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
   }
 
   private def findBloomFilterWithKey(plan: LogicalPlan, key: Expression): Boolean = {
-    plan.exists {
+    // Ensure that sibling nodes under the same Join do not have runtime filter
+    !plan.exists(_.isInstanceOf[Join]) && plan.exists {

Review Comment:
   Now, the support are multiple levels. If there are two nested joins that require shuffling, then when transforming up, runtime filter will first be inserted at one end of the lower level join. When scrolling up to the upper join, the lower runtime filter can often be inserted into one end of the upper join.
   
   For example, `bigA join B join BigC where a=b and b=c` cannot ignore the opportunity to insert runtime filter into BigC just because B has inserted a runtime filter into bigA.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45606][SQL] Release restrictions on multi-layer runtime filter [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on PR #43449:
URL: https://github.com/apache/spark/pull/43449#issuecomment-1772016427

   ping @cloud-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45606][SQL] Release restrictions on multi-layer runtime filter [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on PR #43449:
URL: https://github.com/apache/spark/pull/43449#issuecomment-1801724153

   Merged to master.
   @cloud-fan Thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45606][SQL] Release restrictions on multi-layer runtime filter [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on PR #43449:
URL: https://github.com/apache/spark/pull/43449#issuecomment-1794516188

   ping @cloud-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45606][SQL] Release restrictions on multi-layer runtime filter [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43449:
URL: https://github.com/apache/spark/pull/43449#discussion_r1366453566


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -305,7 +305,8 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
   }
 
   private def findBloomFilterWithKey(plan: LogicalPlan, key: Expression): Boolean = {
-    plan.exists {
+    // Ensure that sibling nodes under the same Join do not have runtime filter
+    !plan.exists(_.isInstanceOf[Join]) && plan.exists {

Review Comment:
   it looks also wrong to skip the check completely if the plan contains Join.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45606][SQL] Release restrictions on multi-layer runtime filter [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer closed pull request #43449: [SPARK-45606][SQL] Release restrictions on multi-layer runtime filter
URL: https://github.com/apache/spark/pull/43449


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45606][SQL] Release restrictions on multi-layer runtime filter [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on PR #43449:
URL: https://github.com/apache/spark/pull/43449#issuecomment-1780575988

   The GA failure is unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45606][SQL] Release restrictions on multi-layer runtime filter [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #43449:
URL: https://github.com/apache/spark/pull/43449#discussion_r1366682188


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -305,7 +305,8 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
   }
 
   private def findBloomFilterWithKey(plan: LogicalPlan, key: Expression): Boolean = {
-    plan.exists {
+    // Ensure that sibling nodes under the same Join do not have runtime filter
+    !plan.exists(_.isInstanceOf[Join]) && plan.exists {

Review Comment:
   Now, the support are multiple levels. If there are two nested joins that require shuffling, when transforming up, runtime filter will first be inserted at one end of the lower level join. When scrolling up to the upper join, the lower runtime filter can often be inserted into one end of the upper join.
   
   For example, `bigA join B join BigC where a=b and b=c` cannot ignore the opportunity to insert runtime filter into BigC just because B has inserted a runtime filter into bigA.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org