You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "xingchaozh (via GitHub)" <gi...@apache.org> on 2023/03/07 08:55:20 UTC

[GitHub] [spark] xingchaozh opened a new pull request, #40312: [SPARK-42695][SQL] Skew join handling in stream side of broadcast hash join

xingchaozh opened a new pull request, #40312:
URL: https://github.com/apache/spark/pull/40312

   ### What changes were proposed in this pull request?
   We could handle the steam side skew of BroadcastHashJoin to improve the join performance
   
   Before | After
   -- | --
   <img src="https://user-images.githubusercontent.com/7522130/223371603-c83da69a-cf2c-445f-b937-2af92c5f0953.png" width="400" height="450"> | <img src="https://user-images.githubusercontent.com/7522130/223371590-56d4fd20-0c32-4806-967e-bb6e47851f29.png" width="400" height="450">
   
   
   ### Why are the changes needed?
   We can extend the OptimizeSkewedJoin to handle this case
   
   ### Does this PR introduce _any_ user-facing change?
   NO
   
   
   ### How was this patch tested?
   UT


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wangyum commented on pull request #40312: [SPARK-42695][SQL] Skew join handling in stream side of broadcast hash join

Posted by "wangyum (via GitHub)" <gi...@apache.org>.
wangyum commented on PR #40312:
URL: https://github.com/apache/spark/pull/40312#issuecomment-1473032777

   cc @cloud-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] pan3793 commented on pull request #40312: [SPARK-42695][SQL] Skew join handling in stream side of broadcast hash join

Posted by "pan3793 (via GitHub)" <gi...@apache.org>.
pan3793 commented on PR #40312:
URL: https://github.com/apache/spark/pull/40312#issuecomment-1560404852

   cc @ulysses-you 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wangyum commented on a diff in pull request #40312: [SPARK-42695][SQL] Skew join handling in stream side of broadcast hash join

Posted by "wangyum (via GitHub)" <gi...@apache.org>.
wangyum commented on code in PR #40312:
URL: https://github.com/apache/spark/pull/40312#discussion_r1205132132


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala:
##########
@@ -215,6 +216,32 @@ case class OptimizeSkewedJoin(ensureRequirements: EnsureRequirements)
         case (newLeft, newRight) =>
           shj.copy(left = newLeft, right = newRight, isSkewJoin = true)
       }.getOrElse(shj)
+
+    case bhj @ BroadcastHashJoinExec(_, _, joinType, _, _, ShuffleStage(s: ShuffleQueryStageExec),
+    BroadcastQueryStageExec(_, _, _), _, false) if canSplitLeftSide(joinType) =>
+      optimizeBroadcastHashJoin(s).map(newLeft => bhj.copy(left = newLeft, isSkewJoin = true))
+        .getOrElse(bhj)
+
+    case bhj @ BroadcastHashJoinExec(_, _, joinType, _, _, BroadcastQueryStageExec(_, _, _),
+    ShuffleStage(s: ShuffleQueryStageExec), _, false) if canSplitRightSide(joinType) =>
+      optimizeBroadcastHashJoin(s).map(newRight => bhj.copy(right = newRight, isSkewJoin = true))
+        .getOrElse(bhj)
+  }
+
+  private def optimizeBroadcastHashJoin(shuffleStage: ShuffleQueryStageExec): Option[SparkPlan] = {
+
+    def isSkewed(stats: MapOutputStatistics): Boolean = {
+      val medSize = Utils.median(stats.bytesByPartitionId, false)
+      val skewThreshold = getSkewThreshold(medSize)
+      stats.bytesByPartitionId.exists(_ > skewThreshold)
+    }
+
+    if (!shuffleStage.mapStats.forall(isSkewed(_))) {
+      return None
+    }
+
+    Some(SkewJoinChildWrapper(AQEShuffleReadExec(shuffleStage,
+      OptimizeShuffleWithLocalRead.getPartitionSpecs(shuffleStage, None))))

Review Comment:
   There will be no skew if do not put the same join key values together. This optimization assumes that the data in the map stage does not have skew values (usually it is true). I don’t know if databricks has any other ideas? @cloud-fan



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] github-actions[bot] closed pull request #40312: [SPARK-42695][SQL] Skew join handling in stream side of broadcast hash join

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] closed pull request #40312: [SPARK-42695][SQL] Skew join handling in stream side of broadcast hash join
URL: https://github.com/apache/spark/pull/40312


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on a diff in pull request #40312: [SPARK-42695][SQL] Skew join handling in stream side of broadcast hash join

Posted by "ulysses-you (via GitHub)" <gi...@apache.org>.
ulysses-you commented on code in PR #40312:
URL: https://github.com/apache/spark/pull/40312#discussion_r1205223427


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala:
##########
@@ -215,6 +216,32 @@ case class OptimizeSkewedJoin(ensureRequirements: EnsureRequirements)
         case (newLeft, newRight) =>
           shj.copy(left = newLeft, right = newRight, isSkewJoin = true)
       }.getOrElse(shj)
+
+    case bhj @ BroadcastHashJoinExec(_, _, joinType, _, _, ShuffleStage(s: ShuffleQueryStageExec),
+    BroadcastQueryStageExec(_, _, _), _, false) if canSplitLeftSide(joinType) =>
+      optimizeBroadcastHashJoin(s).map(newLeft => bhj.copy(left = newLeft, isSkewJoin = true))
+        .getOrElse(bhj)
+
+    case bhj @ BroadcastHashJoinExec(_, _, joinType, _, _, BroadcastQueryStageExec(_, _, _),
+    ShuffleStage(s: ShuffleQueryStageExec), _, false) if canSplitRightSide(joinType) =>
+      optimizeBroadcastHashJoin(s).map(newRight => bhj.copy(right = newRight, isSkewJoin = true))
+        .getOrElse(bhj)
+  }
+
+  private def optimizeBroadcastHashJoin(shuffleStage: ShuffleQueryStageExec): Option[SparkPlan] = {
+
+    def isSkewed(stats: MapOutputStatistics): Boolean = {
+      val medSize = Utils.median(stats.bytesByPartitionId, false)
+      val skewThreshold = getSkewThreshold(medSize)
+      stats.bytesByPartitionId.exists(_ > skewThreshold)
+    }
+
+    if (!shuffleStage.mapStats.forall(isSkewed(_))) {
+      return None
+    }
+
+    Some(SkewJoinChildWrapper(AQEShuffleReadExec(shuffleStage,
+      OptimizeShuffleWithLocalRead.getPartitionSpecs(shuffleStage, None))))

Review Comment:
   I do not quite follow this optimization.. The `OptimizeShuffleWithLocalRead` would do the same thing that add local shuffle read, so the difference is here we add local read in advance to skip coalescing partition to avoid skew ? If so, I think it would still cause skew if the number of shuffle partitions is smaller than mapper partitions. And it misses an optimzation with coalesce small partitions.
   
   If the issue is we coalesce and make some skewed partitions in `OptimizeShuffleWithLocalRead.getPartitionSpecs`, how about improving it to consider partition size ? Then the local shuffle read could support both coalesce small parittion and split skewed partition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zuston commented on pull request #40312: [SPARK-42695][SQL] Skew join handling in stream side of broadcast hash join

Posted by "zuston (via GitHub)" <gi...@apache.org>.
zuston commented on PR #40312:
URL: https://github.com/apache/spark/pull/40312#issuecomment-1519064385

   It’s a good improvement, especially for RSS 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] github-actions[bot] commented on pull request #40312: [SPARK-42695][SQL] Skew join handling in stream side of broadcast hash join

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #40312:
URL: https://github.com/apache/spark/pull/40312#issuecomment-1703970923

   We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #40312: [SPARK-42695][SQL] Skew join handling in stream side of broadcast hash join

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #40312:
URL: https://github.com/apache/spark/pull/40312#issuecomment-1562365414

   +CC @shardulm94 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on a diff in pull request #40312: [SPARK-42695][SQL] Skew join handling in stream side of broadcast hash join

Posted by "ulysses-you (via GitHub)" <gi...@apache.org>.
ulysses-you commented on code in PR #40312:
URL: https://github.com/apache/spark/pull/40312#discussion_r1203392754


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala:
##########
@@ -215,6 +216,32 @@ case class OptimizeSkewedJoin(ensureRequirements: EnsureRequirements)
         case (newLeft, newRight) =>
           shj.copy(left = newLeft, right = newRight, isSkewJoin = true)
       }.getOrElse(shj)
+
+    case bhj @ BroadcastHashJoinExec(_, _, joinType, _, _, ShuffleStage(s: ShuffleQueryStageExec),
+    BroadcastQueryStageExec(_, _, _), _, false) if canSplitLeftSide(joinType) =>
+      optimizeBroadcastHashJoin(s).map(newLeft => bhj.copy(left = newLeft, isSkewJoin = true))
+        .getOrElse(bhj)
+
+    case bhj @ BroadcastHashJoinExec(_, _, joinType, _, _, BroadcastQueryStageExec(_, _, _),
+    ShuffleStage(s: ShuffleQueryStageExec), _, false) if canSplitRightSide(joinType) =>
+      optimizeBroadcastHashJoin(s).map(newRight => bhj.copy(right = newRight, isSkewJoin = true))
+        .getOrElse(bhj)
+  }
+
+  private def optimizeBroadcastHashJoin(shuffleStage: ShuffleQueryStageExec): Option[SparkPlan] = {
+
+    def isSkewed(stats: MapOutputStatistics): Boolean = {
+      val medSize = Utils.median(stats.bytesByPartitionId, false)
+      val skewThreshold = getSkewThreshold(medSize)
+      stats.bytesByPartitionId.exists(_ > skewThreshold)
+    }
+
+    if (!shuffleStage.mapStats.forall(isSkewed(_))) {
+      return None
+    }
+
+    Some(SkewJoinChildWrapper(AQEShuffleReadExec(shuffleStage,
+      OptimizeShuffleWithLocalRead.getPartitionSpecs(shuffleStage, None))))

Review Comment:
   How can it handle skewed partition ? The code in `OptimizeShuffleWithLocalRead.getPartitionSpecs` does not take reduce partition size into account.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org