You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "jackylee-ch (via GitHub)" <gi...@apache.org> on 2024/01/10 05:22:15 UTC

[PR] [SPARK-46950][SQL] Fix coalesce failed with BroadcastJoin and Union [spark]

jackylee-ch opened a new pull request, #44661:
URL: https://github.com/apache/spark/pull/44661

   ### What changes were proposed in this pull request?
   As outlined in JIRA issue [SPARK-46590](https://issues.apache.org/jira/browse/SPARK-46590), when a broadcast join follows a union within the same stage, the [collectCoalesceGroups](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala#L144) method will indiscriminately traverse all sub-plans, aggregating them into a single group, which is not expected.
   
   ### Why are the changes needed?
   In fact, for broadcastjoin, we do not expect broadcast exchange has same partition number. Therefore, we can safely disregard the broadcast join and continue traversing the subplan.
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   Newly added unit test. It would fail without this pr.
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46590][SQL] Fix coalesce failed with BroadcastJoin and Union [spark]

Posted by "jackylee-ch (via GitHub)" <gi...@apache.org>.
jackylee-ch commented on code in PR #44661:
URL: https://github.com/apache/spark/pull/44661#discussion_r1458345869


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala:
##########
@@ -146,14 +147,17 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe
       Seq(collectShuffleStageInfos(r))
     case unary: UnaryExecNode => collectCoalesceGroups(unary.child)
     case union: UnionExec => union.children.flatMap(collectCoalesceGroups)
-    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
-    // shuffle partitions, because we may break the assumption that all children of a spark plan
-    // have same number of output partitions.
     // Note that, `BroadcastQueryStageExec` is a valid case:
     // If a join has been optimized from shuffled join to broadcast join, then the one side is
     // `BroadcastQueryStageExec` and other side is `ShuffleQueryStageExec`. It can coalesce the
     // shuffle side as we do not expect broadcast exchange has same partition number.
-    case p if p.collectLeaves().forall(_.isInstanceOf[ExchangeQueryStageExec]) =>
+    case join: BroadcastHashJoinExec => join.children.flatMap(collectCoalesceGroups)

Review Comment:
   Got it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46590][SQL] Fix coalesce failed with BroadcastJoin and Union [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #44661:
URL: https://github.com/apache/spark/pull/44661#discussion_r1455386794


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala:
##########
@@ -146,13 +147,15 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe
       Seq(collectShuffleStageInfos(r))
     case unary: UnaryExecNode => collectCoalesceGroups(unary.child)
     case union: UnionExec => union.children.flatMap(collectCoalesceGroups)
-    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
-    // shuffle partitions, because we may break the assumption that all children of a spark plan
-    // have same number of output partitions.
     // Note that, `BroadcastQueryStageExec` is a valid case:
     // If a join has been optimized from shuffled join to broadcast join, then the one side is
     // `BroadcastQueryStageExec` and other side is `ShuffleQueryStageExec`. It can coalesce the
     // shuffle side as we do not expect broadcast exchange has same partition number.
+    case join: BroadcastHashJoinExec => join.children.flatMap(collectCoalesceGroups)
+    case join: BroadcastNestedLoopJoinExec => join.children.flatMap(collectCoalesceGroups)
+    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
+    // shuffle partitions, because we may break the assumption that all children of a spark plan
+    // have same number of output partitions.
     case p if p.collectLeaves().forall(_.isInstanceOf[ExchangeQueryStageExec]) =>

Review Comment:
   It seems we should improve this line to check the if the child of `BroadcastQueryStageExec` exists `Union`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46590][SQL] Fix coalesce failed with BroadcastJoin and Union [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44661:
URL: https://github.com/apache/spark/pull/44661#discussion_r1458190264


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala:
##########
@@ -146,14 +147,17 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe
       Seq(collectShuffleStageInfos(r))
     case unary: UnaryExecNode => collectCoalesceGroups(unary.child)
     case union: UnionExec => union.children.flatMap(collectCoalesceGroups)
-    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
-    // shuffle partitions, because we may break the assumption that all children of a spark plan
-    // have same number of output partitions.
     // Note that, `BroadcastQueryStageExec` is a valid case:
     // If a join has been optimized from shuffled join to broadcast join, then the one side is
     // `BroadcastQueryStageExec` and other side is `ShuffleQueryStageExec`. It can coalesce the
     // shuffle side as we do not expect broadcast exchange has same partition number.
-    case p if p.collectLeaves().forall(_.isInstanceOf[ExchangeQueryStageExec]) =>
+    case join: BroadcastHashJoinExec => join.children.flatMap(collectCoalesceGroups)

Review Comment:
   assume the children of `CartesianProductExec` are GROUP BY, then we can coalesce the shuffles of the two sides independently.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46590][SQL] Fix coalesce failed with BroadcastJoin and Union [spark]

Posted by "jackylee-ch (via GitHub)" <gi...@apache.org>.
jackylee-ch commented on code in PR #44661:
URL: https://github.com/apache/spark/pull/44661#discussion_r1457289271


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala:
##########
@@ -146,13 +147,15 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe
       Seq(collectShuffleStageInfos(r))
     case unary: UnaryExecNode => collectCoalesceGroups(unary.child)
     case union: UnionExec => union.children.flatMap(collectCoalesceGroups)
-    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
-    // shuffle partitions, because we may break the assumption that all children of a spark plan
-    // have same number of output partitions.
     // Note that, `BroadcastQueryStageExec` is a valid case:
     // If a join has been optimized from shuffled join to broadcast join, then the one side is
     // `BroadcastQueryStageExec` and other side is `ShuffleQueryStageExec`. It can coalesce the
     // shuffle side as we do not expect broadcast exchange has same partition number.
+    case join: BroadcastHashJoinExec => join.children.flatMap(collectCoalesceGroups)
+    case join: BroadcastNestedLoopJoinExec => join.children.flatMap(collectCoalesceGroups)
+    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
+    // shuffle partitions, because we may break the assumption that all children of a spark plan
+    // have same number of output partitions.
     case p if p.collectLeaves().forall(_.isInstanceOf[ExchangeQueryStageExec]) =>
       val shuffleStages = collectShuffleStageInfos(p)

Review Comment:
   Sure, I would also add a suite case for a query with union and CartesianProduct to make sure this logic work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46590][SQL] Fix coalesce failed with unexpected partition indeces [spark]

Posted by "jackylee-ch (via GitHub)" <gi...@apache.org>.
jackylee-ch commented on PR #44661:
URL: https://github.com/apache/spark/pull/44661#issuecomment-1905833521

   Thanks for your review. @cloud-fan @beliefer @ulysses-you 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46590][SQL] Fix coalesce failed with BroadcastJoin and Union [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44661:
URL: https://github.com/apache/spark/pull/44661#discussion_r1455428053


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala:
##########
@@ -146,13 +147,15 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe
       Seq(collectShuffleStageInfos(r))
     case unary: UnaryExecNode => collectCoalesceGroups(unary.child)
     case union: UnionExec => union.children.flatMap(collectCoalesceGroups)
-    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
-    // shuffle partitions, because we may break the assumption that all children of a spark plan
-    // have same number of output partitions.
     // Note that, `BroadcastQueryStageExec` is a valid case:
     // If a join has been optimized from shuffled join to broadcast join, then the one side is
     // `BroadcastQueryStageExec` and other side is `ShuffleQueryStageExec`. It can coalesce the
     // shuffle side as we do not expect broadcast exchange has same partition number.
+    case join: BroadcastHashJoinExec => join.children.flatMap(collectCoalesceGroups)
+    case join: BroadcastNestedLoopJoinExec => join.children.flatMap(collectCoalesceGroups)

Review Comment:
   Without Union, the two sides of a broadcast join will be in one coalesce group, isn't it an issue?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46590][SQL] Fix coalesce failed with BroadcastJoin and Union [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44661:
URL: https://github.com/apache/spark/pull/44661#discussion_r1457267952


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala:
##########
@@ -146,13 +147,15 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe
       Seq(collectShuffleStageInfos(r))
     case unary: UnaryExecNode => collectCoalesceGroups(unary.child)
     case union: UnionExec => union.children.flatMap(collectCoalesceGroups)
-    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
-    // shuffle partitions, because we may break the assumption that all children of a spark plan
-    // have same number of output partitions.
     // Note that, `BroadcastQueryStageExec` is a valid case:
     // If a join has been optimized from shuffled join to broadcast join, then the one side is
     // `BroadcastQueryStageExec` and other side is `ShuffleQueryStageExec`. It can coalesce the
     // shuffle side as we do not expect broadcast exchange has same partition number.
+    case join: BroadcastHashJoinExec => join.children.flatMap(collectCoalesceGroups)
+    case join: BroadcastNestedLoopJoinExec => join.children.flatMap(collectCoalesceGroups)
+    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
+    // shuffle partitions, because we may break the assumption that all children of a spark plan
+    // have same number of output partitions.
     case p if p.collectLeaves().forall(_.isInstanceOf[ExchangeQueryStageExec]) =>
       val shuffleStages = collectShuffleStageInfos(p)

Review Comment:
   I understand the issue now, but we may hit the same bug in the future if we miss any other nodes similar to Union and broadcast join. Can we add a check here? If the shuffles are not compatible, we skip coalescing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46590][SQL] Fix coalesce failed with BroadcastJoin and Union [spark]

Posted by "jackylee-ch (via GitHub)" <gi...@apache.org>.
jackylee-ch commented on code in PR #44661:
URL: https://github.com/apache/spark/pull/44661#discussion_r1457424405


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala:
##########
@@ -146,14 +147,17 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe
       Seq(collectShuffleStageInfos(r))
     case unary: UnaryExecNode => collectCoalesceGroups(unary.child)
     case union: UnionExec => union.children.flatMap(collectCoalesceGroups)
-    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
-    // shuffle partitions, because we may break the assumption that all children of a spark plan
-    // have same number of output partitions.
     // Note that, `BroadcastQueryStageExec` is a valid case:
     // If a join has been optimized from shuffled join to broadcast join, then the one side is
     // `BroadcastQueryStageExec` and other side is `ShuffleQueryStageExec`. It can coalesce the
     // shuffle side as we do not expect broadcast exchange has same partition number.
-    case p if p.collectLeaves().forall(_.isInstanceOf[ExchangeQueryStageExec]) =>
+    case join: BroadcastHashJoinExec => join.children.flatMap(collectCoalesceGroups)
+    case join: BroadcastNestedLoopJoinExec => join.children.flatMap(collectCoalesceGroups)
+    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
+    // shuffle partitions, because we may break the assumption that all children of a spark plan
+    // have same number of output partitions.
+    case p
+        if p.collectLeaves().forall(_.isInstanceOf[ShuffleQueryStageExec]) && !containsUnion(p) =>

Review Comment:
   Remove the assertion sound good to me, I would try it later



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46590][SQL] Fix coalesce failed with BroadcastJoin and Union [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #44661:
URL: https://github.com/apache/spark/pull/44661#discussion_r1450190275


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala:
##########
@@ -311,6 +311,34 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite {
     }
   }
 
+  test("SPARK-46590 adaptive query execution works correctly with broadcast join and union") {

Review Comment:
   I copied this test case to my code and run it. No error appears.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46590][SQL] Fix coalesce failed with BroadcastJoin and Union [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on PR #44661:
URL: https://github.com/apache/spark/pull/44661#issuecomment-1888817877

   > also cc @beliefer
   
   Thank you for the ping.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46590][SQL] Fix coalesce failed with unexpected partition indeces [spark]

Posted by "ulysses-you (via GitHub)" <gi...@apache.org>.
ulysses-you commented on code in PR #44661:
URL: https://github.com/apache/spark/pull/44661#discussion_r1461281654


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala:
##########
@@ -146,13 +147,16 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe
       Seq(collectShuffleStageInfos(r))
     case unary: UnaryExecNode => collectCoalesceGroups(unary.child)
     case union: UnionExec => union.children.flatMap(collectCoalesceGroups)
-    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
-    // shuffle partitions, because we may break the assumption that all children of a spark plan
-    // have same number of output partitions.
+    case join: CartesianProductExec => join.children.flatMap(collectCoalesceGroups)

Review Comment:
   The issue is that, if the plan itself does not require shuffle exchange, then we should not assume all of its leaf nodes are shuffle exchanges. How about adding a new pattern to make the non-unary plan which does not require shuffle into new groups. We can even remore the union pattern.
   ```
   def doNotRequireShuffleExchange(plan) = plan.requiredChildDistribution.exists {
     case _: BroadcastDistribution | _: UnspecifiedDistribution => true
     case _ => false
   }
   
   case p if doNotRequireShuffleExchange(p) => p.children.flatMap(collectCoalesceGroups)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46590][SQL] Fix coalesce failed with unexpected partition indeces [spark]

Posted by "jackylee-ch (via GitHub)" <gi...@apache.org>.
jackylee-ch commented on code in PR #44661:
URL: https://github.com/apache/spark/pull/44661#discussion_r1460718052


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala:
##########
@@ -146,14 +147,17 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe
       Seq(collectShuffleStageInfos(r))
     case unary: UnaryExecNode => collectCoalesceGroups(unary.child)
     case union: UnionExec => union.children.flatMap(collectCoalesceGroups)
-    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
-    // shuffle partitions, because we may break the assumption that all children of a spark plan
-    // have same number of output partitions.
+    case join: CartesianProductExec => join.children.flatMap(collectCoalesceGroups)
     // Note that, `BroadcastQueryStageExec` is a valid case:
     // If a join has been optimized from shuffled join to broadcast join, then the one side is
     // `BroadcastQueryStageExec` and other side is `ShuffleQueryStageExec`. It can coalesce the
     // shuffle side as we do not expect broadcast exchange has same partition number.
-    case p if p.collectLeaves().forall(_.isInstanceOf[ExchangeQueryStageExec]) =>
+    case join: BroadcastHashJoinExec => join.children.flatMap(collectCoalesceGroups)
+    case join: BroadcastNestedLoopJoinExec => join.children.flatMap(collectCoalesceGroups)
+    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
+    // shuffle partitions, because we may break the assumption that all children of a spark plan
+    // have same number of output partitions.
+    case p if p.collectLeaves().forall(_.isInstanceOf[ShuffleQueryStageExec]) =>

Review Comment:
   Done. It should be `ExchangeQueryStageExec` when there is a query with `ShuffledHashJoin` and `BroadcastHashJoin`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46590][SQL] Fix coalesce failed with BroadcastJoin and Union [spark]

Posted by "jackylee-ch (via GitHub)" <gi...@apache.org>.
jackylee-ch commented on code in PR #44661:
URL: https://github.com/apache/spark/pull/44661#discussion_r1456724717


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala:
##########
@@ -146,13 +147,15 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe
       Seq(collectShuffleStageInfos(r))
     case unary: UnaryExecNode => collectCoalesceGroups(unary.child)
     case union: UnionExec => union.children.flatMap(collectCoalesceGroups)
-    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
-    // shuffle partitions, because we may break the assumption that all children of a spark plan
-    // have same number of output partitions.
     // Note that, `BroadcastQueryStageExec` is a valid case:
     // If a join has been optimized from shuffled join to broadcast join, then the one side is
     // `BroadcastQueryStageExec` and other side is `ShuffleQueryStageExec`. It can coalesce the
     // shuffle side as we do not expect broadcast exchange has same partition number.
+    case join: BroadcastHashJoinExec => join.children.flatMap(collectCoalesceGroups)
+    case join: BroadcastNestedLoopJoinExec => join.children.flatMap(collectCoalesceGroups)
+    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
+    // shuffle partitions, because we may break the assumption that all children of a spark plan
+    // have same number of output partitions.
     case p if p.collectLeaves().forall(_.isInstanceOf[ExchangeQueryStageExec]) =>

Review Comment:
   When [collectShuffleStageInfos](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala#L157), called in [collectCoalesceGroups](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala#L144), collect the shuffleStageInfos of the subplans, it will only traverse `ShuffleQueryStageExec` and `AQEShuffleReadExec`, the `BroadcastQueryStageExec` will be excluded. Therefore, when a SQL only has broadcastjoin, only the valid shuffleStage can be obtained.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46590][SQL] Fix coalesce failed with BroadcastJoin and Union [spark]

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #44661:
URL: https://github.com/apache/spark/pull/44661#issuecomment-1884241887

   cc @cloud-fan @ulysses-you FYI
   
   @jackylee-ch encountered this issue in our production environment.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46590][SQL] Fix coalesce failed with BroadcastJoin and Union [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #44661:
URL: https://github.com/apache/spark/pull/44661#discussion_r1455373617


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala:
##########
@@ -146,13 +147,15 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe
       Seq(collectShuffleStageInfos(r))
     case unary: UnaryExecNode => collectCoalesceGroups(unary.child)
     case union: UnionExec => union.children.flatMap(collectCoalesceGroups)
-    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
-    // shuffle partitions, because we may break the assumption that all children of a spark plan
-    // have same number of output partitions.
     // Note that, `BroadcastQueryStageExec` is a valid case:
     // If a join has been optimized from shuffled join to broadcast join, then the one side is
     // `BroadcastQueryStageExec` and other side is `ShuffleQueryStageExec`. It can coalesce the
     // shuffle side as we do not expect broadcast exchange has same partition number.
+    case join: BroadcastHashJoinExec => join.children.flatMap(collectCoalesceGroups)
+    case join: BroadcastNestedLoopJoinExec => join.children.flatMap(collectCoalesceGroups)
+    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
+    // shuffle partitions, because we may break the assumption that all children of a spark plan
+    // have same number of output partitions.
     case p if p.collectLeaves().forall(_.isInstanceOf[ExchangeQueryStageExec]) =>

Review Comment:
   It seems we should improve this line to check the if the child of `BroadcastQueryStageExec` exists `Union`.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala:
##########
@@ -146,13 +147,15 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe
       Seq(collectShuffleStageInfos(r))
     case unary: UnaryExecNode => collectCoalesceGroups(unary.child)
     case union: UnionExec => union.children.flatMap(collectCoalesceGroups)
-    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
-    // shuffle partitions, because we may break the assumption that all children of a spark plan
-    // have same number of output partitions.
     // Note that, `BroadcastQueryStageExec` is a valid case:
     // If a join has been optimized from shuffled join to broadcast join, then the one side is
     // `BroadcastQueryStageExec` and other side is `ShuffleQueryStageExec`. It can coalesce the
     // shuffle side as we do not expect broadcast exchange has same partition number.
+    case join: BroadcastHashJoinExec => join.children.flatMap(collectCoalesceGroups)
+    case join: BroadcastNestedLoopJoinExec => join.children.flatMap(collectCoalesceGroups)
+    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
+    // shuffle partitions, because we may break the assumption that all children of a spark plan
+    // have same number of output partitions.
     case p if p.collectLeaves().forall(_.isInstanceOf[ExchangeQueryStageExec]) =>

Review Comment:
   It seems we should improve this line to check the if the child of `BroadcastQueryStageExec` exists `Union`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46590][SQL] Fix coalesce failed with BroadcastJoin and Union [spark]

Posted by "jackylee-ch (via GitHub)" <gi...@apache.org>.
jackylee-ch commented on code in PR #44661:
URL: https://github.com/apache/spark/pull/44661#discussion_r1457444524


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala:
##########
@@ -146,14 +147,17 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe
       Seq(collectShuffleStageInfos(r))
     case unary: UnaryExecNode => collectCoalesceGroups(unary.child)
     case union: UnionExec => union.children.flatMap(collectCoalesceGroups)
-    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
-    // shuffle partitions, because we may break the assumption that all children of a spark plan
-    // have same number of output partitions.
     // Note that, `BroadcastQueryStageExec` is a valid case:
     // If a join has been optimized from shuffled join to broadcast join, then the one side is
     // `BroadcastQueryStageExec` and other side is `ShuffleQueryStageExec`. It can coalesce the
     // shuffle side as we do not expect broadcast exchange has same partition number.
-    case p if p.collectLeaves().forall(_.isInstanceOf[ExchangeQueryStageExec]) =>
+    case join: BroadcastHashJoinExec => join.children.flatMap(collectCoalesceGroups)

Review Comment:
   > We should also match `CartesianProductExec` here.
   
   No get why we need match `CartesianProductExec` here. For `CartesianProductExec`, the two sides are both required, so there is no need to process `collectCoalesceGroups` separately.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46590][SQL] Fix coalesce failed with unexpected partition indeces [spark]

Posted by "jackylee-ch (via GitHub)" <gi...@apache.org>.
jackylee-ch commented on code in PR #44661:
URL: https://github.com/apache/spark/pull/44661#discussion_r1458389613


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala:
##########
@@ -47,9 +47,7 @@ object ShufflePartitionsUtil extends Logging {
       advisoryTargetSize: Long,

Review Comment:
   We met the [assertion](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala#L131) inside of `ShufflePartitionsUtil.coalescePartitionsWithSkew`.
   
   > are you sure all the assertions in this file should be removed?
   
   Compared with assertion error, maybe it is better to abort the coalesce operation and continue running the query execution?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46590][SQL] Fix coalesce failed with BroadcastJoin and Union [spark]

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #44661:
URL: https://github.com/apache/spark/pull/44661#issuecomment-1888756676

   also cc @beliefer 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46590][SQL] Fix coalesce failed with BroadcastJoin and Union [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44661:
URL: https://github.com/apache/spark/pull/44661#discussion_r1457415768


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala:
##########
@@ -146,14 +147,17 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe
       Seq(collectShuffleStageInfos(r))
     case unary: UnaryExecNode => collectCoalesceGroups(unary.child)
     case union: UnionExec => union.children.flatMap(collectCoalesceGroups)
-    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
-    // shuffle partitions, because we may break the assumption that all children of a spark plan
-    // have same number of output partitions.
     // Note that, `BroadcastQueryStageExec` is a valid case:
     // If a join has been optimized from shuffled join to broadcast join, then the one side is
     // `BroadcastQueryStageExec` and other side is `ShuffleQueryStageExec`. It can coalesce the
     // shuffle side as we do not expect broadcast exchange has same partition number.
-    case p if p.collectLeaves().forall(_.isInstanceOf[ExchangeQueryStageExec]) =>
+    case join: BroadcastHashJoinExec => join.children.flatMap(collectCoalesceGroups)
+    case join: BroadcastNestedLoopJoinExec => join.children.flatMap(collectCoalesceGroups)
+    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
+    // shuffle partitions, because we may break the assumption that all children of a spark plan
+    // have same number of output partitions.
+    case p
+        if p.collectLeaves().forall(_.isInstanceOf[ShuffleQueryStageExec]) && !containsUnion(p) =>

Review Comment:
   I disagree with this change. It's hacky to say that the shuffles are compatible if there is no Union. We should have a more general solution, like checking the shuffle partitions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46590][SQL] Fix coalesce failed with unexpected partition indeces [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44661:
URL: https://github.com/apache/spark/pull/44661#discussion_r1458353228


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala:
##########
@@ -128,14 +126,18 @@ object ShufflePartitionsUtil extends Logging {
 
     // There should be no unexpected partition specs and the start indices should be identical
     // across all different shuffles.
-    assert(partitionIndicesSeq.distinct.length == 1 && partitionIndicesSeq.head.forall(_ >= 0),
-      s"Invalid shuffle partition specs: $inputPartitionSpecs")
+    if(partitionIndicesSeq.distinct.length > 1 || partitionIndicesSeq.head.exists(_ < 0)) {

Review Comment:
   ```suggestion
       if (partitionIndicesSeq.distinct.length > 1 || partitionIndicesSeq.head.exists(_ < 0)) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46590][SQL] Fix coalesce failed with BroadcastJoin and Union [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44661:
URL: https://github.com/apache/spark/pull/44661#discussion_r1455626733


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala:
##########
@@ -146,13 +147,15 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe
       Seq(collectShuffleStageInfos(r))
     case unary: UnaryExecNode => collectCoalesceGroups(unary.child)
     case union: UnionExec => union.children.flatMap(collectCoalesceGroups)
-    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
-    // shuffle partitions, because we may break the assumption that all children of a spark plan
-    // have same number of output partitions.
     // Note that, `BroadcastQueryStageExec` is a valid case:
     // If a join has been optimized from shuffled join to broadcast join, then the one side is
     // `BroadcastQueryStageExec` and other side is `ShuffleQueryStageExec`. It can coalesce the
     // shuffle side as we do not expect broadcast exchange has same partition number.
+    case join: BroadcastHashJoinExec => join.children.flatMap(collectCoalesceGroups)
+    case join: BroadcastNestedLoopJoinExec => join.children.flatMap(collectCoalesceGroups)
+    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
+    // shuffle partitions, because we may break the assumption that all children of a spark plan
+    // have same number of output partitions.
     case p if p.collectLeaves().forall(_.isInstanceOf[ExchangeQueryStageExec]) =>

Review Comment:
   > we filter the right side for broadcast join in collectShuffleStageInfos
   
   I didn't see any code related to it, can you elaborate?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46590][SQL] Fix coalesce failed with BroadcastJoin and Union [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44661:
URL: https://github.com/apache/spark/pull/44661#discussion_r1452022010


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala:
##########
@@ -146,13 +147,15 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe
       Seq(collectShuffleStageInfos(r))
     case unary: UnaryExecNode => collectCoalesceGroups(unary.child)
     case union: UnionExec => union.children.flatMap(collectCoalesceGroups)
-    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
-    // shuffle partitions, because we may break the assumption that all children of a spark plan
-    // have same number of output partitions.
     // Note that, `BroadcastQueryStageExec` is a valid case:
     // If a join has been optimized from shuffled join to broadcast join, then the one side is
     // `BroadcastQueryStageExec` and other side is `ShuffleQueryStageExec`. It can coalesce the
     // shuffle side as we do not expect broadcast exchange has same partition number.
+    case join: BroadcastHashJoinExec => join.children.flatMap(collectCoalesceGroups)
+    case join: BroadcastNestedLoopJoinExec => join.children.flatMap(collectCoalesceGroups)

Review Comment:
   why can this change fix the assertion error? where is the assert?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46590][SQL] Fix coalesce failed with BroadcastJoin and Union [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #44661:
URL: https://github.com/apache/spark/pull/44661#discussion_r1450190275


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala:
##########
@@ -311,6 +311,34 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite {
     }
   }
 
+  test("SPARK-46590 adaptive query execution works correctly with broadcast join and union") {

Review Comment:
   I copied this test case to my code and run it. No error appears.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46590][SQL] Fix coalesce failed with BroadcastJoin and Union [spark]

Posted by "jackylee-ch (via GitHub)" <gi...@apache.org>.
jackylee-ch commented on code in PR #44661:
URL: https://github.com/apache/spark/pull/44661#discussion_r1453489448


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala:
##########
@@ -146,13 +147,15 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe
       Seq(collectShuffleStageInfos(r))
     case unary: UnaryExecNode => collectCoalesceGroups(unary.child)
     case union: UnionExec => union.children.flatMap(collectCoalesceGroups)
-    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
-    // shuffle partitions, because we may break the assumption that all children of a spark plan
-    // have same number of output partitions.
     // Note that, `BroadcastQueryStageExec` is a valid case:
     // If a join has been optimized from shuffled join to broadcast join, then the one side is
     // `BroadcastQueryStageExec` and other side is `ShuffleQueryStageExec`. It can coalesce the
     // shuffle side as we do not expect broadcast exchange has same partition number.
+    case join: BroadcastHashJoinExec => join.children.flatMap(collectCoalesceGroups)
+    case join: BroadcastNestedLoopJoinExec => join.children.flatMap(collectCoalesceGroups)

Review Comment:
   The assertion in [ShufflePartitionsutils.coalescePartitionsWithSkew](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala#L131) ensures that there are no unexpected partition specifications and that the start indices are identical across all different shuffles. However, the assertion is broken when a union operation is involved. To address this issue, we split the union into groups at [collectCoalesceGroups](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala#L144) to ensure that each group adheres to this assertion.
   ‎
   The problem we encountered is that if a query contains a union before a broadcast join, all sub-plans are directly grouped together when the program reaches the broadcast join. If the sub-plans of the union have different shuffle partition numbers, an AssertionError will occur.
   
   Since the shuffle partition number for each side of broadcastJoin are not relevant, we can disregard the broadcastJoin and proceed to find the union after it and then split the plans into groups.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46590][SQL] Fix coalesce failed with BroadcastJoin and Union [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44661:
URL: https://github.com/apache/spark/pull/44661#discussion_r1457417250


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala:
##########
@@ -146,14 +147,17 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe
       Seq(collectShuffleStageInfos(r))
     case unary: UnaryExecNode => collectCoalesceGroups(unary.child)
     case union: UnionExec => union.children.flatMap(collectCoalesceGroups)
-    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
-    // shuffle partitions, because we may break the assumption that all children of a spark plan
-    // have same number of output partitions.
     // Note that, `BroadcastQueryStageExec` is a valid case:
     // If a join has been optimized from shuffled join to broadcast join, then the one side is
     // `BroadcastQueryStageExec` and other side is `ShuffleQueryStageExec`. It can coalesce the
     // shuffle side as we do not expect broadcast exchange has same partition number.
-    case p if p.collectLeaves().forall(_.isInstanceOf[ExchangeQueryStageExec]) =>
+    case join: BroadcastHashJoinExec => join.children.flatMap(collectCoalesceGroups)

Review Comment:
   We should also match `CartesianProductExec` here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46590][SQL] Fix coalesce failed with BroadcastJoin and Union [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #44661:
URL: https://github.com/apache/spark/pull/44661#discussion_r1457372344


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala:
##########
@@ -146,13 +147,15 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe
       Seq(collectShuffleStageInfos(r))
     case unary: UnaryExecNode => collectCoalesceGroups(unary.child)
     case union: UnionExec => union.children.flatMap(collectCoalesceGroups)
-    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
-    // shuffle partitions, because we may break the assumption that all children of a spark plan
-    // have same number of output partitions.
     // Note that, `BroadcastQueryStageExec` is a valid case:
     // If a join has been optimized from shuffled join to broadcast join, then the one side is
     // `BroadcastQueryStageExec` and other side is `ShuffleQueryStageExec`. It can coalesce the
     // shuffle side as we do not expect broadcast exchange has same partition number.
+    case join: BroadcastHashJoinExec => join.children.flatMap(collectCoalesceGroups)
+    case join: BroadcastNestedLoopJoinExec => join.children.flatMap(collectCoalesceGroups)

Review Comment:
   Do we still need the two lines?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46590][SQL] Fix coalesce failed with BroadcastJoin and Union [spark]

Posted by "jackylee-ch (via GitHub)" <gi...@apache.org>.
jackylee-ch commented on code in PR #44661:
URL: https://github.com/apache/spark/pull/44661#discussion_r1457381745


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala:
##########
@@ -146,13 +147,15 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe
       Seq(collectShuffleStageInfos(r))
     case unary: UnaryExecNode => collectCoalesceGroups(unary.child)
     case union: UnionExec => union.children.flatMap(collectCoalesceGroups)
-    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
-    // shuffle partitions, because we may break the assumption that all children of a spark plan
-    // have same number of output partitions.
     // Note that, `BroadcastQueryStageExec` is a valid case:
     // If a join has been optimized from shuffled join to broadcast join, then the one side is
     // `BroadcastQueryStageExec` and other side is `ShuffleQueryStageExec`. It can coalesce the
     // shuffle side as we do not expect broadcast exchange has same partition number.
+    case join: BroadcastHashJoinExec => join.children.flatMap(collectCoalesceGroups)
+    case join: BroadcastNestedLoopJoinExec => join.children.flatMap(collectCoalesceGroups)

Review Comment:
   I think it is still necessary to include these two lines. Our intention is to continue performing coalesce on one side when encountering a broadcast join and union.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46590][SQL] Fix coalesce failed with unexpected partition indeces [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #44661:
URL: https://github.com/apache/spark/pull/44661#issuecomment-1905495286

   thanks, merging to master/3.5!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46590][SQL] Fix coalesce failed with BroadcastJoin and Union [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44661:
URL: https://github.com/apache/spark/pull/44661#discussion_r1454872518


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala:
##########
@@ -146,13 +147,15 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe
       Seq(collectShuffleStageInfos(r))
     case unary: UnaryExecNode => collectCoalesceGroups(unary.child)
     case union: UnionExec => union.children.flatMap(collectCoalesceGroups)
-    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
-    // shuffle partitions, because we may break the assumption that all children of a spark plan
-    // have same number of output partitions.
     // Note that, `BroadcastQueryStageExec` is a valid case:
     // If a join has been optimized from shuffled join to broadcast join, then the one side is
     // `BroadcastQueryStageExec` and other side is `ShuffleQueryStageExec`. It can coalesce the
     // shuffle side as we do not expect broadcast exchange has same partition number.
+    case join: BroadcastHashJoinExec => join.children.flatMap(collectCoalesceGroups)
+    case join: BroadcastNestedLoopJoinExec => join.children.flatMap(collectCoalesceGroups)
+    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
+    // shuffle partitions, because we may break the assumption that all children of a spark plan
+    // have same number of output partitions.
     case p if p.collectLeaves().forall(_.isInstanceOf[ExchangeQueryStageExec]) =>

Review Comment:
   so we were fine before for simple queries because this condition filters out the broadcast joins?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46590][SQL] Fix coalesce failed with unexpected partition indeces [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44661:
URL: https://github.com/apache/spark/pull/44661#discussion_r1458683923


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala:
##########
@@ -47,9 +47,7 @@ object ShufflePartitionsUtil extends Logging {
       advisoryTargetSize: Long,

Review Comment:
   some assertions can help us find bugs, we shouldn't blindly remove all assertions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46590][SQL] Fix coalesce failed with unexpected partition indeces [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44661:
URL: https://github.com/apache/spark/pull/44661#discussion_r1460428714


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala:
##########
@@ -146,14 +147,17 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe
       Seq(collectShuffleStageInfos(r))
     case unary: UnaryExecNode => collectCoalesceGroups(unary.child)
     case union: UnionExec => union.children.flatMap(collectCoalesceGroups)
-    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
-    // shuffle partitions, because we may break the assumption that all children of a spark plan
-    // have same number of output partitions.
+    case join: CartesianProductExec => join.children.flatMap(collectCoalesceGroups)
     // Note that, `BroadcastQueryStageExec` is a valid case:
     // If a join has been optimized from shuffled join to broadcast join, then the one side is
     // `BroadcastQueryStageExec` and other side is `ShuffleQueryStageExec`. It can coalesce the
     // shuffle side as we do not expect broadcast exchange has same partition number.
-    case p if p.collectLeaves().forall(_.isInstanceOf[ExchangeQueryStageExec]) =>
+    case join: BroadcastHashJoinExec => join.children.flatMap(collectCoalesceGroups)
+    case join: BroadcastNestedLoopJoinExec => join.children.flatMap(collectCoalesceGroups)
+    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
+    // shuffle partitions, because we may break the assumption that all children of a spark plan
+    // have same number of output partitions.
+    case p if p.collectLeaves().forall(_.isInstanceOf[ShuffleQueryStageExec]) =>

Review Comment:
   shall we change it back to `ExchangeQueryStageExec`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46590][SQL] Fix coalesce failed with unexpected partition indeces [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44661:
URL: https://github.com/apache/spark/pull/44661#discussion_r1458802466


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala:
##########
@@ -47,9 +47,7 @@ object ShufflePartitionsUtil extends Logging {
       advisoryTargetSize: Long,

Review Comment:
   > Or maybe we could just add assertion for testing?
   
   Are you going to do this for all assertions in Spark?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46590][SQL] Fix coalesce failed with unexpected partition indeces [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan closed pull request #44661: [SPARK-46590][SQL] Fix coalesce failed with unexpected partition indeces
URL: https://github.com/apache/spark/pull/44661


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46590][SQL] Fix coalesce failed with unexpected partition indeces [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44661:
URL: https://github.com/apache/spark/pull/44661#discussion_r1462863274


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala:
##########
@@ -146,13 +147,16 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe
       Seq(collectShuffleStageInfos(r))
     case unary: UnaryExecNode => collectCoalesceGroups(unary.child)
     case union: UnionExec => union.children.flatMap(collectCoalesceGroups)
-    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
-    // shuffle partitions, because we may break the assumption that all children of a spark plan
-    // have same number of output partitions.
+    case join: CartesianProductExec => join.children.flatMap(collectCoalesceGroups)

Review Comment:
   I'd prefer the current "whilte list" approach to be conservative. If there is a bug then we mistakenly coalesce shuffle partitions and lead to correctness issues.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46590][SQL] Fix coalesce failed with BroadcastJoin and Union [spark]

Posted by "jackylee-ch (via GitHub)" <gi...@apache.org>.
jackylee-ch commented on code in PR #44661:
URL: https://github.com/apache/spark/pull/44661#discussion_r1455516769


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala:
##########
@@ -146,13 +147,15 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe
       Seq(collectShuffleStageInfos(r))
     case unary: UnaryExecNode => collectCoalesceGroups(unary.child)
     case union: UnionExec => union.children.flatMap(collectCoalesceGroups)
-    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
-    // shuffle partitions, because we may break the assumption that all children of a spark plan
-    // have same number of output partitions.
     // Note that, `BroadcastQueryStageExec` is a valid case:
     // If a join has been optimized from shuffled join to broadcast join, then the one side is
     // `BroadcastQueryStageExec` and other side is `ShuffleQueryStageExec`. It can coalesce the
     // shuffle side as we do not expect broadcast exchange has same partition number.
+    case join: BroadcastHashJoinExec => join.children.flatMap(collectCoalesceGroups)
+    case join: BroadcastNestedLoopJoinExec => join.children.flatMap(collectCoalesceGroups)
+    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
+    // shuffle partitions, because we may break the assumption that all children of a spark plan
+    // have same number of output partitions.
     case p if p.collectLeaves().forall(_.isInstanceOf[ExchangeQueryStageExec]) =>

Review Comment:
   > It seems we should improve this line to check if the child of `BroadcastQueryStageExec` exists `Union`.
   
   In fact, I have tested the queries with various join types and join types with 'union', and found that `ShuffledHashJoin` and `SortMergeJoin` work normally, while `BroadcastJoin` and `CartesianProduct` with `Union` fail. The former case we should ignore broadcast joins, while the latter requires additional detection. Maybe we should add the union check and solve the latter cases in this pr?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46590][SQL] Fix coalesce failed with BroadcastJoin and Union [spark]

Posted by "jackylee-ch (via GitHub)" <gi...@apache.org>.
jackylee-ch commented on code in PR #44661:
URL: https://github.com/apache/spark/pull/44661#discussion_r1455509066


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala:
##########
@@ -146,13 +147,15 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe
       Seq(collectShuffleStageInfos(r))
     case unary: UnaryExecNode => collectCoalesceGroups(unary.child)
     case union: UnionExec => union.children.flatMap(collectCoalesceGroups)
-    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
-    // shuffle partitions, because we may break the assumption that all children of a spark plan
-    // have same number of output partitions.
     // Note that, `BroadcastQueryStageExec` is a valid case:
     // If a join has been optimized from shuffled join to broadcast join, then the one side is
     // `BroadcastQueryStageExec` and other side is `ShuffleQueryStageExec`. It can coalesce the
     // shuffle side as we do not expect broadcast exchange has same partition number.
+    case join: BroadcastHashJoinExec => join.children.flatMap(collectCoalesceGroups)
+    case join: BroadcastNestedLoopJoinExec => join.children.flatMap(collectCoalesceGroups)
+    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
+    // shuffle partitions, because we may break the assumption that all children of a spark plan
+    // have same number of output partitions.
     case p if p.collectLeaves().forall(_.isInstanceOf[ExchangeQueryStageExec]) =>

Review Comment:
   > so we were fine before for simple queries because this condition filters out the broadcast joins?
   
   No, this condition won't filter out the broadcast joins, we filter the right side for broadcast join in `collectShuffleStageInfos`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46590][SQL] Fix coalesce failed with unexpected partition indeces [spark]

Posted by "jackylee-ch (via GitHub)" <gi...@apache.org>.
jackylee-ch commented on code in PR #44661:
URL: https://github.com/apache/spark/pull/44661#discussion_r1458785050


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala:
##########
@@ -47,9 +47,7 @@ object ShufflePartitionsUtil extends Logging {
       advisoryTargetSize: Long,

Review Comment:
   Or maybe we could just add assertion for testing?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46590][SQL] Fix coalesce failed with unexpected partition indeces [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44661:
URL: https://github.com/apache/spark/pull/44661#discussion_r1458353853


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala:
##########
@@ -47,9 +47,7 @@ object ShufflePartitionsUtil extends Logging {
       advisoryTargetSize: Long,

Review Comment:
   are you sure all the assertions in this file should be removed? Which assertion error did you hit?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46590][SQL] Fix coalesce failed with BroadcastJoin and Union [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44661:
URL: https://github.com/apache/spark/pull/44661#discussion_r1454873922


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala:
##########
@@ -146,13 +147,15 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe
       Seq(collectShuffleStageInfos(r))
     case unary: UnaryExecNode => collectCoalesceGroups(unary.child)
     case union: UnionExec => union.children.flatMap(collectCoalesceGroups)
-    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
-    // shuffle partitions, because we may break the assumption that all children of a spark plan
-    // have same number of output partitions.
     // Note that, `BroadcastQueryStageExec` is a valid case:
     // If a join has been optimized from shuffled join to broadcast join, then the one side is
     // `BroadcastQueryStageExec` and other side is `ShuffleQueryStageExec`. It can coalesce the
     // shuffle side as we do not expect broadcast exchange has same partition number.
+    case join: BroadcastHashJoinExec => join.children.flatMap(collectCoalesceGroups)
+    case join: BroadcastNestedLoopJoinExec => join.children.flatMap(collectCoalesceGroups)
+    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
+    // shuffle partitions, because we may break the assumption that all children of a spark plan
+    // have same number of output partitions.
     case p if p.collectLeaves().forall(_.isInstanceOf[ExchangeQueryStageExec]) =>

Review Comment:
   Can we reproduce the bug without union?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46590][SQL] Fix coalesce failed with BroadcastJoin and Union [spark]

Posted by "jackylee-ch (via GitHub)" <gi...@apache.org>.
jackylee-ch commented on code in PR #44661:
URL: https://github.com/apache/spark/pull/44661#discussion_r1453489448


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala:
##########
@@ -146,13 +147,15 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe
       Seq(collectShuffleStageInfos(r))
     case unary: UnaryExecNode => collectCoalesceGroups(unary.child)
     case union: UnionExec => union.children.flatMap(collectCoalesceGroups)
-    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
-    // shuffle partitions, because we may break the assumption that all children of a spark plan
-    // have same number of output partitions.
     // Note that, `BroadcastQueryStageExec` is a valid case:
     // If a join has been optimized from shuffled join to broadcast join, then the one side is
     // `BroadcastQueryStageExec` and other side is `ShuffleQueryStageExec`. It can coalesce the
     // shuffle side as we do not expect broadcast exchange has same partition number.
+    case join: BroadcastHashJoinExec => join.children.flatMap(collectCoalesceGroups)
+    case join: BroadcastNestedLoopJoinExec => join.children.flatMap(collectCoalesceGroups)

Review Comment:
   The assertion in [ShufflePartitionsutils.coalescePartitionsWithSkew](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala#L131) ensures that there are no unexpected partition specifications and that the start indices are identical across all different shuffles. However, the assertion is broken when a union operation is involved. To resolve this, we split the union into groups at [collectCoalesceGroups](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala#L144) to ensure that each group adheres to this assertion.
   ‎
   The problem we encountered is that if a query contains a union before a broadcast join, all sub-plans are directly grouped together when the program reaches the broadcast join. If the sub-plans of the union have different shuffle partition numbers, an AssertionError will occur.
   
   Since the shuffle partition number for each side of broadcastJoin are not relevant, we can disregard the broadcastJoin and proceed to find the union after it and then split the plans into groups.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46590][SQL] Fix coalesce failed with BroadcastJoin and Union [spark]

Posted by "jackylee-ch (via GitHub)" <gi...@apache.org>.
jackylee-ch commented on code in PR #44661:
URL: https://github.com/apache/spark/pull/44661#discussion_r1455482815


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala:
##########
@@ -146,13 +147,15 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe
       Seq(collectShuffleStageInfos(r))
     case unary: UnaryExecNode => collectCoalesceGroups(unary.child)
     case union: UnionExec => union.children.flatMap(collectCoalesceGroups)
-    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
-    // shuffle partitions, because we may break the assumption that all children of a spark plan
-    // have same number of output partitions.
     // Note that, `BroadcastQueryStageExec` is a valid case:
     // If a join has been optimized from shuffled join to broadcast join, then the one side is
     // `BroadcastQueryStageExec` and other side is `ShuffleQueryStageExec`. It can coalesce the
     // shuffle side as we do not expect broadcast exchange has same partition number.
+    case join: BroadcastHashJoinExec => join.children.flatMap(collectCoalesceGroups)
+    case join: BroadcastNestedLoopJoinExec => join.children.flatMap(collectCoalesceGroups)

Review Comment:
   There would only one side in one coalesce group after `collectShuffleStageInfos`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46590][SQL] Fix coalesce failed with BroadcastJoin and Union [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #44661:
URL: https://github.com/apache/spark/pull/44661#discussion_r1455373617


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala:
##########
@@ -146,13 +147,15 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe
       Seq(collectShuffleStageInfos(r))
     case unary: UnaryExecNode => collectCoalesceGroups(unary.child)
     case union: UnionExec => union.children.flatMap(collectCoalesceGroups)
-    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
-    // shuffle partitions, because we may break the assumption that all children of a spark plan
-    // have same number of output partitions.
     // Note that, `BroadcastQueryStageExec` is a valid case:
     // If a join has been optimized from shuffled join to broadcast join, then the one side is
     // `BroadcastQueryStageExec` and other side is `ShuffleQueryStageExec`. It can coalesce the
     // shuffle side as we do not expect broadcast exchange has same partition number.
+    case join: BroadcastHashJoinExec => join.children.flatMap(collectCoalesceGroups)
+    case join: BroadcastNestedLoopJoinExec => join.children.flatMap(collectCoalesceGroups)
+    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
+    // shuffle partitions, because we may break the assumption that all children of a spark plan
+    // have same number of output partitions.
     case p if p.collectLeaves().forall(_.isInstanceOf[ExchangeQueryStageExec]) =>

Review Comment:
   It seems we should improve this line to check if the child of `BroadcastQueryStageExec` exists `Union`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46590][SQL] Fix coalesce failed with BroadcastJoin and Union [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44661:
URL: https://github.com/apache/spark/pull/44661#discussion_r1457419146


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala:
##########
@@ -146,14 +147,17 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe
       Seq(collectShuffleStageInfos(r))
     case unary: UnaryExecNode => collectCoalesceGroups(unary.child)
     case union: UnionExec => union.children.flatMap(collectCoalesceGroups)
-    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
-    // shuffle partitions, because we may break the assumption that all children of a spark plan
-    // have same number of output partitions.
     // Note that, `BroadcastQueryStageExec` is a valid case:
     // If a join has been optimized from shuffled join to broadcast join, then the one side is
     // `BroadcastQueryStageExec` and other side is `ShuffleQueryStageExec`. It can coalesce the
     // shuffle side as we do not expect broadcast exchange has same partition number.
-    case p if p.collectLeaves().forall(_.isInstanceOf[ExchangeQueryStageExec]) =>
+    case join: BroadcastHashJoinExec => join.children.flatMap(collectCoalesceGroups)
+    case join: BroadcastNestedLoopJoinExec => join.children.flatMap(collectCoalesceGroups)
+    // If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
+    // shuffle partitions, because we may break the assumption that all children of a spark plan
+    // have same number of output partitions.
+    case p
+        if p.collectLeaves().forall(_.isInstanceOf[ShuffleQueryStageExec]) && !containsUnion(p) =>

Review Comment:
   Maybe we can update `ShufflePartitionsUtil.coalescePartitions` to do nothing if the assumption does not hold, instead of throwing assertion error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org