You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/04/25 05:20:52 UTC

[GitHub] [spark] ulysses-you opened a new pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

ulysses-you opened a new pull request #32328:
URL: https://github.com/apache/spark/pull/32328


   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error message, please read the guideline first:
        https://spark.apache.org/error-message-guidelines.html
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   Add `ShuffledHashJoin` pattern check in `OptimizeSkewedJoin` so that we can optimize it.
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   Currently, we have already supported all type of join through hint that make it easy to choose the join implementation.
   
   We would choose `ShuffledHashJoin` if one table is not big but over the broadcast threshold. It's better that we can support optimize it in `OptimizeSkewedJoin`.
   
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   Probably yes, the execute plan in AQE mode may be changed.
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   -->
   Improve exists test in `AdaptiveQueryExecSuite`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
ulysses-you commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-828077872


   @c21 ah, thank you, I'm gold to have some discussion, and also missed consider parallelism part : )  @cloud-fan thank you, addressed all comment.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #32328:
URL: https://github.com/apache/spark/pull/32328#discussion_r621241335



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
##########
@@ -685,64 +691,82 @@ class AdaptiveQueryExecSuite
   }
 
   test("SPARK-29544: adaptive skew join with different join types") {
-    withSQLConf(
-      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
-      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
-      SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
-      SQLConf.SHUFFLE_PARTITIONS.key -> "100",
-      SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "800",
-      SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "800") {
-      withTempView("skewData1", "skewData2") {
-        spark
-          .range(0, 1000, 1, 10)
-          .select(
-            when('id < 250, 249)
-              .when('id >= 750, 1000)
-              .otherwise('id).as("key1"),
-            'id as "value1")
-          .createOrReplaceTempView("skewData1")
-        spark
-          .range(0, 1000, 1, 10)
-          .select(
-            when('id < 250, 249)
-              .otherwise('id).as("key2"),
-            'id as "value2")
-          .createOrReplaceTempView("skewData2")
+    Seq("SHUFFLE_MERGE", "SHUFFLE_HASH").foreach { joinHint =>
+      val isSMJ = joinHint == "SHUFFLE_MERGE"

Review comment:
       nit: to reduce more duplicated code
   ```
   def getJoinNode(plan: LogicalPlan) = if (joinHint == "SHUFFLE_MERGE") {
     findTopLevelSortMergeJoin(plan)
   } else {
     findTopLevelShuffledHashJoin(plan)
   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #32328:
URL: https://github.com/apache/spark/pull/32328#discussion_r620045305



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
##########
@@ -157,98 +157,121 @@ object OptimizeSkewedJoin extends CustomShuffleReaderRule {
    * 4. Wrap the join right child with a special shuffle reader that reads partition0 3 times by
    *    3 tasks separately.
    */
-  def optimizeSkewJoin(plan: SparkPlan): SparkPlan = plan.transformUp {
-    case smj @ SortMergeJoinExec(_, _, joinType, _,
-        s1 @ SortExec(_, _, ShuffleStage(left: ShuffleStageInfo), _),
-        s2 @ SortExec(_, _, ShuffleStage(right: ShuffleStageInfo), _), _)
-        if supportedJoinTypes.contains(joinType) =>
-      assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length)
-      val numPartitions = left.partitionsWithSizes.length
-      // We use the median size of the original shuffle partitions to detect skewed partitions.
-      val leftMedSize = medianSize(left.mapStats)
-      val rightMedSize = medianSize(right.mapStats)
-      logDebug(
-        s"""
-          |Optimizing skewed join.
-          |Left side partitions size info:
-          |${getSizeInfo(leftMedSize, left.mapStats.bytesByPartitionId)}
-          |Right side partitions size info:
-          |${getSizeInfo(rightMedSize, right.mapStats.bytesByPartitionId)}
-        """.stripMargin)
-      val canSplitLeft = canSplitLeftSide(joinType)
-      val canSplitRight = canSplitRightSide(joinType)
-      // We use the actual partition sizes (may be coalesced) to calculate target size, so that
-      // the final data distribution is even (coalesced partitions + split partitions).
-      val leftActualSizes = left.partitionsWithSizes.map(_._2)
-      val rightActualSizes = right.partitionsWithSizes.map(_._2)
-      val leftTargetSize = targetSize(leftActualSizes, leftMedSize)
-      val rightTargetSize = targetSize(rightActualSizes, rightMedSize)
-
-      val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
-      val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
-      var numSkewedLeft = 0
-      var numSkewedRight = 0
-      for (partitionIndex <- 0 until numPartitions) {
-        val leftActualSize = leftActualSizes(partitionIndex)
-        val isLeftSkew = isSkewed(leftActualSize, leftMedSize) && canSplitLeft
-        val leftPartSpec = left.partitionsWithSizes(partitionIndex)._1
-        val isLeftCoalesced = leftPartSpec.startReducerIndex + 1 < leftPartSpec.endReducerIndex
-
-        val rightActualSize = rightActualSizes(partitionIndex)
-        val isRightSkew = isSkewed(rightActualSize, rightMedSize) && canSplitRight
-        val rightPartSpec = right.partitionsWithSizes(partitionIndex)._1
-        val isRightCoalesced = rightPartSpec.startReducerIndex + 1 < rightPartSpec.endReducerIndex
-
-        // A skewed partition should never be coalesced, but skip it here just to be safe.
-        val leftParts = if (isLeftSkew && !isLeftCoalesced) {
-          val reducerId = leftPartSpec.startReducerIndex
-          val skewSpecs = createSkewPartitionSpecs(
-            left.mapStats.shuffleId, reducerId, leftTargetSize)
-          if (skewSpecs.isDefined) {
-            logDebug(s"Left side partition $partitionIndex " +
-              s"(${FileUtils.byteCountToDisplaySize(leftActualSize)}) is skewed, " +
-              s"split it into ${skewSpecs.get.length} parts.")
-            numSkewedLeft += 1
-          }
-          skewSpecs.getOrElse(Seq(leftPartSpec))
-        } else {
-          Seq(leftPartSpec)
+  private def tryToOptimizedChildren(

Review comment:
       nit: the `origin smj` in above comment needs to be updated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-828183693


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/138017/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826307358


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42439/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826336023


   **[Test build #137918 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137918/testReport)** for PR 32328 at commit [`d081031`](https://github.com/apache/spark/commit/d081031b54189f4c7059318aacfa03102a78a319).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826287306


   **[Test build #137907 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137907/testReport)** for PR 32328 at commit [`d081031`](https://github.com/apache/spark/commit/d081031b54189f4c7059318aacfa03102a78a319).
    * This patch **fails Spark unit tests**.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826341628


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/137918/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826669098


   **[Test build #137950 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137950/testReport)** for PR 32328 at commit [`ea209e3`](https://github.com/apache/spark/commit/ea209e356bc1e12d75055ba5508d136ea28bc22c).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826714952


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42471/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-828169586


   **[Test build #138017 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138017/testReport)** for PR 32328 at commit [`cab5290`](https://github.com/apache/spark/commit/cab5290a89526a32f826f1d2fffff9132d6e2a8d).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826305831


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42439/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826289675


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/137907/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maropu commented on a change in pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
maropu commented on a change in pull request #32328:
URL: https://github.com/apache/spark/pull/32328#discussion_r620047543



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
##########
@@ -157,98 +157,121 @@ object OptimizeSkewedJoin extends CustomShuffleReaderRule {
    * 4. Wrap the join right child with a special shuffle reader that reads partition0 3 times by
    *    3 tasks separately.
    */
-  def optimizeSkewJoin(plan: SparkPlan): SparkPlan = plan.transformUp {
-    case smj @ SortMergeJoinExec(_, _, joinType, _,
-        s1 @ SortExec(_, _, ShuffleStage(left: ShuffleStageInfo), _),
-        s2 @ SortExec(_, _, ShuffleStage(right: ShuffleStageInfo), _), _)
-        if supportedJoinTypes.contains(joinType) =>
-      assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length)
-      val numPartitions = left.partitionsWithSizes.length
-      // We use the median size of the original shuffle partitions to detect skewed partitions.
-      val leftMedSize = medianSize(left.mapStats)
-      val rightMedSize = medianSize(right.mapStats)
-      logDebug(
-        s"""
-          |Optimizing skewed join.
-          |Left side partitions size info:
-          |${getSizeInfo(leftMedSize, left.mapStats.bytesByPartitionId)}
-          |Right side partitions size info:
-          |${getSizeInfo(rightMedSize, right.mapStats.bytesByPartitionId)}
-        """.stripMargin)
-      val canSplitLeft = canSplitLeftSide(joinType)
-      val canSplitRight = canSplitRightSide(joinType)
-      // We use the actual partition sizes (may be coalesced) to calculate target size, so that
-      // the final data distribution is even (coalesced partitions + split partitions).
-      val leftActualSizes = left.partitionsWithSizes.map(_._2)
-      val rightActualSizes = right.partitionsWithSizes.map(_._2)
-      val leftTargetSize = targetSize(leftActualSizes, leftMedSize)
-      val rightTargetSize = targetSize(rightActualSizes, rightMedSize)
-
-      val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
-      val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
-      var numSkewedLeft = 0
-      var numSkewedRight = 0
-      for (partitionIndex <- 0 until numPartitions) {
-        val leftActualSize = leftActualSizes(partitionIndex)
-        val isLeftSkew = isSkewed(leftActualSize, leftMedSize) && canSplitLeft
-        val leftPartSpec = left.partitionsWithSizes(partitionIndex)._1
-        val isLeftCoalesced = leftPartSpec.startReducerIndex + 1 < leftPartSpec.endReducerIndex
-
-        val rightActualSize = rightActualSizes(partitionIndex)
-        val isRightSkew = isSkewed(rightActualSize, rightMedSize) && canSplitRight
-        val rightPartSpec = right.partitionsWithSizes(partitionIndex)._1
-        val isRightCoalesced = rightPartSpec.startReducerIndex + 1 < rightPartSpec.endReducerIndex
-
-        // A skewed partition should never be coalesced, but skip it here just to be safe.
-        val leftParts = if (isLeftSkew && !isLeftCoalesced) {
-          val reducerId = leftPartSpec.startReducerIndex
-          val skewSpecs = createSkewPartitionSpecs(
-            left.mapStats.shuffleId, reducerId, leftTargetSize)
-          if (skewSpecs.isDefined) {
-            logDebug(s"Left side partition $partitionIndex " +
-              s"(${FileUtils.byteCountToDisplaySize(leftActualSize)}) is skewed, " +
-              s"split it into ${skewSpecs.get.length} parts.")
-            numSkewedLeft += 1
-          }
-          skewSpecs.getOrElse(Seq(leftPartSpec))
-        } else {
-          Seq(leftPartSpec)
+  private def getOptimizedChildren(
+      left: ShuffleStageInfo,
+      right: ShuffleStageInfo,
+      joinType: JoinType): Option[(SparkPlan, SparkPlan)] = {
+    assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length)
+    val numPartitions = left.partitionsWithSizes.length
+    // We use the median size of the original shuffle partitions to detect skewed partitions.
+    val leftMedSize = medianSize(left.mapStats)
+    val rightMedSize = medianSize(right.mapStats)
+    logDebug(
+      s"""
+         |Optimizing skewed join.
+         |Left side partitions size info:
+         |${getSizeInfo(leftMedSize, left.mapStats.bytesByPartitionId)}
+         |Right side partitions size info:
+         |${getSizeInfo(rightMedSize, right.mapStats.bytesByPartitionId)}
+      """.stripMargin)
+    val canSplitLeft = canSplitLeftSide(joinType)
+    val canSplitRight = canSplitRightSide(joinType)
+    // We use the actual partition sizes (may be coalesced) to calculate target size, so that
+    // the final data distribution is even (coalesced partitions + split partitions).
+    val leftActualSizes = left.partitionsWithSizes.map(_._2)
+    val rightActualSizes = right.partitionsWithSizes.map(_._2)
+    val leftTargetSize = targetSize(leftActualSizes, leftMedSize)
+    val rightTargetSize = targetSize(rightActualSizes, rightMedSize)
+
+    val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
+    val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
+    var numSkewedLeft = 0
+    var numSkewedRight = 0
+    for (partitionIndex <- 0 until numPartitions) {
+      val leftActualSize = leftActualSizes(partitionIndex)
+      val isLeftSkew = isSkewed(leftActualSize, leftMedSize) && canSplitLeft
+      val leftPartSpec = left.partitionsWithSizes(partitionIndex)._1
+      val isLeftCoalesced = leftPartSpec.startReducerIndex + 1 < leftPartSpec.endReducerIndex
+
+      val rightActualSize = rightActualSizes(partitionIndex)
+      val isRightSkew = isSkewed(rightActualSize, rightMedSize) && canSplitRight
+      val rightPartSpec = right.partitionsWithSizes(partitionIndex)._1
+      val isRightCoalesced = rightPartSpec.startReducerIndex + 1 < rightPartSpec.endReducerIndex
+
+      // A skewed partition should never be coalesced, but skip it here just to be safe.
+      val leftParts = if (isLeftSkew && !isLeftCoalesced) {
+        val reducerId = leftPartSpec.startReducerIndex
+        val skewSpecs = createSkewPartitionSpecs(
+          left.mapStats.shuffleId, reducerId, leftTargetSize)
+        if (skewSpecs.isDefined) {
+          logDebug(s"Left side partition $partitionIndex " +
+            s"(${FileUtils.byteCountToDisplaySize(leftActualSize)}) is skewed, " +
+            s"split it into ${skewSpecs.get.length} parts.")
+          numSkewedLeft += 1
         }
+        skewSpecs.getOrElse(Seq(leftPartSpec))
+      } else {
+        Seq(leftPartSpec)
+      }
 
-        // A skewed partition should never be coalesced, but skip it here just to be safe.
-        val rightParts = if (isRightSkew && !isRightCoalesced) {
-          val reducerId = rightPartSpec.startReducerIndex
-          val skewSpecs = createSkewPartitionSpecs(
-            right.mapStats.shuffleId, reducerId, rightTargetSize)
-          if (skewSpecs.isDefined) {
-            logDebug(s"Right side partition $partitionIndex " +
-              s"(${FileUtils.byteCountToDisplaySize(rightActualSize)}) is skewed, " +
-              s"split it into ${skewSpecs.get.length} parts.")
-            numSkewedRight += 1
-          }
-          skewSpecs.getOrElse(Seq(rightPartSpec))
-        } else {
-          Seq(rightPartSpec)
+      // A skewed partition should never be coalesced, but skip it here just to be safe.
+      val rightParts = if (isRightSkew && !isRightCoalesced) {
+        val reducerId = rightPartSpec.startReducerIndex
+        val skewSpecs = createSkewPartitionSpecs(
+          right.mapStats.shuffleId, reducerId, rightTargetSize)
+        if (skewSpecs.isDefined) {
+          logDebug(s"Right side partition $partitionIndex " +
+            s"(${FileUtils.byteCountToDisplaySize(rightActualSize)}) is skewed, " +
+            s"split it into ${skewSpecs.get.length} parts.")
+          numSkewedRight += 1
         }
+        skewSpecs.getOrElse(Seq(rightPartSpec))
+      } else {
+        Seq(rightPartSpec)
+      }
 
-        for {
-          leftSidePartition <- leftParts
-          rightSidePartition <- rightParts
-        } {
-          leftSidePartitions += leftSidePartition
-          rightSidePartitions += rightSidePartition
-        }
+      for {
+        leftSidePartition <- leftParts
+        rightSidePartition <- rightParts
+      } {
+        leftSidePartitions += leftSidePartition
+        rightSidePartitions += rightSidePartition
       }
+    }
+    logDebug(s"number of skewed partitions: left $numSkewedLeft, right $numSkewedRight")
+    if (numSkewedLeft > 0 || numSkewedRight > 0) {
+      Some((CustomShuffleReaderExec(left.shuffleStage, leftSidePartitions.toSeq),
+        CustomShuffleReaderExec(right.shuffleStage, rightSidePartitions.toSeq)))
+    } else {
+      None
+    }
+  }
 

Review comment:
       Ah, okay. In the previous commit, I found the unnecessary change removing the blank line.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
c21 commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826601325


   btw just some random thought here. E.g. for `LEFT OUTER JOIN` (build right side. stream left side), and we found left side is skewed and split the skewed partition `L1` into 3 smaller partitions (just an example here). Would the corresponding partition on right side `R1` needs to be built hash map 3 times for `L1`'s 3 smaller partitions? If that's true, it sounds to me that it may potentially introduce more OOM on build side, as tasks are sharing executor's off-heap memory to build hash maps?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826877781


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/137950/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826265715






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826803910


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/137942/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826341628


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/137918/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826289675


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/137907/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan edited a comment on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
cloud-fan edited a comment on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-827637229


   The overhead of skew join handling is to replicate the non-skew side, and the overhead is larger in SHJ because we need to build the hash relation. But the overhead of skew should be more significant than this.
   
   I don't think this PR makes OOM much more likely to happen. It does add more tasks that need to build a hash relation, but the parallelism on the executor JVM is not increased. E.g. we may still run 8 tasks at the same time on one executor to build hash relations, though the total number of tasks is increased from 100 to 110.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826299092


   **[Test build #137918 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137918/testReport)** for PR 32328 at commit [`d081031`](https://github.com/apache/spark/commit/d081031b54189f4c7059318aacfa03102a78a319).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-828076268


   **[Test build #138017 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138017/testReport)** for PR 32328 at commit [`cab5290`](https://github.com/apache/spark/commit/cab5290a89526a32f826f1d2fffff9132d6e2a8d).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826265715






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826635911


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42464/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maropu commented on a change in pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
maropu commented on a change in pull request #32328:
URL: https://github.com/apache/spark/pull/32328#discussion_r620000087



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledJoin.scala
##########
@@ -19,15 +19,23 @@ package org.apache.spark.sql.execution.joins
 
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, LeftExistence, LeftOuter, RightOuter}
-import org.apache.spark.sql.catalyst.plans.physical.{Distribution, HashClusteredDistribution, Partitioning, PartitioningCollection, UnknownPartitioning}
+import org.apache.spark.sql.catalyst.plans.physical.{Distribution, HashClusteredDistribution, Partitioning, PartitioningCollection, UnknownPartitioning, UnspecifiedDistribution}
 
 /**
  * Holds common logic for join operators by shuffling two child relations
  * using the join keys.
  */
 trait ShuffledJoin extends BaseJoinExec {
+  def isSkewJoin: Boolean

Review comment:
       Please update the description in SQLConf? https://github.com/apache/spark/blob/38ef4771d447f6135382ee2767b3f32b96cb1b0e/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L541-L548

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
##########
@@ -157,98 +157,121 @@ object OptimizeSkewedJoin extends CustomShuffleReaderRule {
    * 4. Wrap the join right child with a special shuffle reader that reads partition0 3 times by
    *    3 tasks separately.
    */
-  def optimizeSkewJoin(plan: SparkPlan): SparkPlan = plan.transformUp {
-    case smj @ SortMergeJoinExec(_, _, joinType, _,
-        s1 @ SortExec(_, _, ShuffleStage(left: ShuffleStageInfo), _),
-        s2 @ SortExec(_, _, ShuffleStage(right: ShuffleStageInfo), _), _)
-        if supportedJoinTypes.contains(joinType) =>
-      assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length)
-      val numPartitions = left.partitionsWithSizes.length
-      // We use the median size of the original shuffle partitions to detect skewed partitions.
-      val leftMedSize = medianSize(left.mapStats)
-      val rightMedSize = medianSize(right.mapStats)
-      logDebug(
-        s"""
-          |Optimizing skewed join.
-          |Left side partitions size info:
-          |${getSizeInfo(leftMedSize, left.mapStats.bytesByPartitionId)}
-          |Right side partitions size info:
-          |${getSizeInfo(rightMedSize, right.mapStats.bytesByPartitionId)}
-        """.stripMargin)
-      val canSplitLeft = canSplitLeftSide(joinType)
-      val canSplitRight = canSplitRightSide(joinType)
-      // We use the actual partition sizes (may be coalesced) to calculate target size, so that
-      // the final data distribution is even (coalesced partitions + split partitions).
-      val leftActualSizes = left.partitionsWithSizes.map(_._2)
-      val rightActualSizes = right.partitionsWithSizes.map(_._2)
-      val leftTargetSize = targetSize(leftActualSizes, leftMedSize)
-      val rightTargetSize = targetSize(rightActualSizes, rightMedSize)
-
-      val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
-      val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
-      var numSkewedLeft = 0
-      var numSkewedRight = 0
-      for (partitionIndex <- 0 until numPartitions) {
-        val leftActualSize = leftActualSizes(partitionIndex)
-        val isLeftSkew = isSkewed(leftActualSize, leftMedSize) && canSplitLeft
-        val leftPartSpec = left.partitionsWithSizes(partitionIndex)._1
-        val isLeftCoalesced = leftPartSpec.startReducerIndex + 1 < leftPartSpec.endReducerIndex
-
-        val rightActualSize = rightActualSizes(partitionIndex)
-        val isRightSkew = isSkewed(rightActualSize, rightMedSize) && canSplitRight
-        val rightPartSpec = right.partitionsWithSizes(partitionIndex)._1
-        val isRightCoalesced = rightPartSpec.startReducerIndex + 1 < rightPartSpec.endReducerIndex
-
-        // A skewed partition should never be coalesced, but skip it here just to be safe.
-        val leftParts = if (isLeftSkew && !isLeftCoalesced) {
-          val reducerId = leftPartSpec.startReducerIndex
-          val skewSpecs = createSkewPartitionSpecs(
-            left.mapStats.shuffleId, reducerId, leftTargetSize)
-          if (skewSpecs.isDefined) {
-            logDebug(s"Left side partition $partitionIndex " +
-              s"(${FileUtils.byteCountToDisplaySize(leftActualSize)}) is skewed, " +
-              s"split it into ${skewSpecs.get.length} parts.")
-            numSkewedLeft += 1
-          }
-          skewSpecs.getOrElse(Seq(leftPartSpec))
-        } else {
-          Seq(leftPartSpec)
+  private def getOptimizedChildren(

Review comment:
       nit: How about `getOptimizedChildren` -> `tryToOptimizedChildren`?

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
##########
@@ -685,64 +691,82 @@ class AdaptiveQueryExecSuite
   }
 
   test("SPARK-29544: adaptive skew join with different join types") {
-    withSQLConf(
-      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
-      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
-      SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
-      SQLConf.SHUFFLE_PARTITIONS.key -> "100",
-      SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "800",
-      SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "800") {
-      withTempView("skewData1", "skewData2") {
-        spark
-          .range(0, 1000, 1, 10)
-          .select(
-            when('id < 250, 249)
-              .when('id >= 750, 1000)
-              .otherwise('id).as("key1"),
-            'id as "value1")
-          .createOrReplaceTempView("skewData1")
-        spark
-          .range(0, 1000, 1, 10)
-          .select(
-            when('id < 250, 249)
-              .otherwise('id).as("key2"),
-            'id as "value2")
-          .createOrReplaceTempView("skewData2")
+    Seq("SHUFFLE_MERGE", "SHUFFLE_HASH").foreach { joinHint =>
+      val isSMJ = joinHint == "SHUFFLE_MERGE"
+      withSQLConf(
+        SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+        SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+        SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
+        SQLConf.SHUFFLE_PARTITIONS.key -> "100",
+        SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "800",
+        SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "800") {
+        withTempView("skewData1", "skewData2") {
+          spark
+            .range(0, 1000, 1, 10)
+            .select(
+              when('id < 250, 249)
+                .when('id >= 750, 1000)
+                .otherwise('id).as("key1"),
+              'id as "value1")
+            .createOrReplaceTempView("skewData1")
+          spark
+            .range(0, 1000, 1, 10)
+            .select(
+              when('id < 250, 249)
+                .otherwise('id).as("key2"),
+              'id as "value2")
+            .createOrReplaceTempView("skewData2")
 
-        def checkSkewJoin(
-            joins: Seq[SortMergeJoinExec],
-            leftSkewNum: Int,
-            rightSkewNum: Int): Unit = {
-          assert(joins.size == 1 && joins.head.isSkewJoin)
-          assert(joins.head.left.collect {
-            case r: CustomShuffleReaderExec => r
-          }.head.partitionSpecs.collect {
-            case p: PartialReducerPartitionSpec => p.reducerIndex
-          }.distinct.length == leftSkewNum)
-          assert(joins.head.right.collect {
-            case r: CustomShuffleReaderExec => r
-          }.head.partitionSpecs.collect {
-            case p: PartialReducerPartitionSpec => p.reducerIndex
-          }.distinct.length == rightSkewNum)
-        }
+          def checkSkewJoin(
+              joins: Seq[ShuffledJoin],
+              leftSkewNum: Int,
+              rightSkewNum: Int): Unit = {
+            assert(joins.size == 1 && joins.head.isSkewJoin)
+            assert(joins.head.left.collect {
+              case r: CustomShuffleReaderExec => r
+            }.head.partitionSpecs.collect {
+              case p: PartialReducerPartitionSpec => p.reducerIndex
+            }.distinct.length == leftSkewNum)
+            assert(joins.head.right.collect {
+              case r: CustomShuffleReaderExec => r
+            }.head.partitionSpecs.collect {
+              case p: PartialReducerPartitionSpec => p.reducerIndex
+            }.distinct.length == rightSkewNum)
+          }
 
-        // skewed inner join optimization
-        val (_, innerAdaptivePlan) = runAdaptiveAndVerifyResult(
-          "SELECT * FROM skewData1 join skewData2 ON key1 = key2")
-        val innerSmj = findTopLevelSortMergeJoin(innerAdaptivePlan)
-        checkSkewJoin(innerSmj, 2, 1)
-
-        // skewed left outer join optimization
-        val (_, leftAdaptivePlan) = runAdaptiveAndVerifyResult(
-          "SELECT * FROM skewData1 left outer join skewData2 ON key1 = key2")
-        val leftSmj = findTopLevelSortMergeJoin(leftAdaptivePlan)
-        checkSkewJoin(leftSmj, 2, 0)
-
-        // skewed right outer join optimization
-        val (_, rightAdaptivePlan) = runAdaptiveAndVerifyResult(
-          "SELECT * FROM skewData1 right outer join skewData2 ON key1 = key2")
-        val rightSmj = findTopLevelSortMergeJoin(rightAdaptivePlan)
-        checkSkewJoin(rightSmj, 0, 1)
+          // skewed inner join optimization
+          val (_, innerAdaptivePlan) = runAdaptiveAndVerifyResult(
+            s"SELECT /*+ $joinHint(skewData1) */ * FROM skewData1 " +
+              s"JOIN skewData2 ON key1 = key2")
+          val inner = if (isSMJ) {
+            findTopLevelSortMergeJoin(innerAdaptivePlan)
+          } else {
+            findTopLevelShuffledHashJoin(innerAdaptivePlan)
+          }
+          checkSkewJoin(inner, 2, 1)
+
+          // skewed left outer join optimization
+          val (_, leftAdaptivePlan) = runAdaptiveAndVerifyResult(
+            s"SELECT /*+ $joinHint(skewData2) */ * FROM skewData1 " +
+              s"LEFT OUTER JOIN skewData2 ON key1 = key2")
+          val leftSmj = if (isSMJ) {
+            findTopLevelSortMergeJoin(leftAdaptivePlan)
+          } else {
+            findTopLevelShuffledHashJoin(leftAdaptivePlan)
+          }
+          checkSkewJoin(leftSmj, 2, 0)
+
+          // skewed right outer join optimization
+          val (_, rightAdaptivePlan) = runAdaptiveAndVerifyResult(
+            s"SELECT /*+ $joinHint(skewData1) */ * FROM skewData1 " +
+              s"RIGHT OUTER JOIN skewData2 ON key1 = key2")

Review comment:
       ditto

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
##########
@@ -685,64 +691,82 @@ class AdaptiveQueryExecSuite
   }
 
   test("SPARK-29544: adaptive skew join with different join types") {
-    withSQLConf(
-      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
-      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
-      SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
-      SQLConf.SHUFFLE_PARTITIONS.key -> "100",
-      SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "800",
-      SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "800") {
-      withTempView("skewData1", "skewData2") {
-        spark
-          .range(0, 1000, 1, 10)
-          .select(
-            when('id < 250, 249)
-              .when('id >= 750, 1000)
-              .otherwise('id).as("key1"),
-            'id as "value1")
-          .createOrReplaceTempView("skewData1")
-        spark
-          .range(0, 1000, 1, 10)
-          .select(
-            when('id < 250, 249)
-              .otherwise('id).as("key2"),
-            'id as "value2")
-          .createOrReplaceTempView("skewData2")
+    Seq("SHUFFLE_MERGE", "SHUFFLE_HASH").foreach { joinHint =>
+      val isSMJ = joinHint == "SHUFFLE_MERGE"
+      withSQLConf(
+        SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+        SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+        SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
+        SQLConf.SHUFFLE_PARTITIONS.key -> "100",
+        SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "800",
+        SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "800") {
+        withTempView("skewData1", "skewData2") {
+          spark
+            .range(0, 1000, 1, 10)
+            .select(
+              when('id < 250, 249)
+                .when('id >= 750, 1000)
+                .otherwise('id).as("key1"),
+              'id as "value1")
+            .createOrReplaceTempView("skewData1")
+          spark
+            .range(0, 1000, 1, 10)
+            .select(
+              when('id < 250, 249)
+                .otherwise('id).as("key2"),
+              'id as "value2")
+            .createOrReplaceTempView("skewData2")
 
-        def checkSkewJoin(
-            joins: Seq[SortMergeJoinExec],
-            leftSkewNum: Int,
-            rightSkewNum: Int): Unit = {
-          assert(joins.size == 1 && joins.head.isSkewJoin)
-          assert(joins.head.left.collect {
-            case r: CustomShuffleReaderExec => r
-          }.head.partitionSpecs.collect {
-            case p: PartialReducerPartitionSpec => p.reducerIndex
-          }.distinct.length == leftSkewNum)
-          assert(joins.head.right.collect {
-            case r: CustomShuffleReaderExec => r
-          }.head.partitionSpecs.collect {
-            case p: PartialReducerPartitionSpec => p.reducerIndex
-          }.distinct.length == rightSkewNum)
-        }
+          def checkSkewJoin(
+              joins: Seq[ShuffledJoin],
+              leftSkewNum: Int,
+              rightSkewNum: Int): Unit = {
+            assert(joins.size == 1 && joins.head.isSkewJoin)
+            assert(joins.head.left.collect {
+              case r: CustomShuffleReaderExec => r
+            }.head.partitionSpecs.collect {
+              case p: PartialReducerPartitionSpec => p.reducerIndex
+            }.distinct.length == leftSkewNum)
+            assert(joins.head.right.collect {
+              case r: CustomShuffleReaderExec => r
+            }.head.partitionSpecs.collect {
+              case p: PartialReducerPartitionSpec => p.reducerIndex
+            }.distinct.length == rightSkewNum)
+          }
 
-        // skewed inner join optimization
-        val (_, innerAdaptivePlan) = runAdaptiveAndVerifyResult(
-          "SELECT * FROM skewData1 join skewData2 ON key1 = key2")
-        val innerSmj = findTopLevelSortMergeJoin(innerAdaptivePlan)
-        checkSkewJoin(innerSmj, 2, 1)
-
-        // skewed left outer join optimization
-        val (_, leftAdaptivePlan) = runAdaptiveAndVerifyResult(
-          "SELECT * FROM skewData1 left outer join skewData2 ON key1 = key2")
-        val leftSmj = findTopLevelSortMergeJoin(leftAdaptivePlan)
-        checkSkewJoin(leftSmj, 2, 0)
-
-        // skewed right outer join optimization
-        val (_, rightAdaptivePlan) = runAdaptiveAndVerifyResult(
-          "SELECT * FROM skewData1 right outer join skewData2 ON key1 = key2")
-        val rightSmj = findTopLevelSortMergeJoin(rightAdaptivePlan)
-        checkSkewJoin(rightSmj, 0, 1)
+          // skewed inner join optimization
+          val (_, innerAdaptivePlan) = runAdaptiveAndVerifyResult(
+            s"SELECT /*+ $joinHint(skewData1) */ * FROM skewData1 " +
+              s"JOIN skewData2 ON key1 = key2")

Review comment:
       nit: remove `s`.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
##########
@@ -157,98 +157,121 @@ object OptimizeSkewedJoin extends CustomShuffleReaderRule {
    * 4. Wrap the join right child with a special shuffle reader that reads partition0 3 times by
    *    3 tasks separately.
    */
-  def optimizeSkewJoin(plan: SparkPlan): SparkPlan = plan.transformUp {
-    case smj @ SortMergeJoinExec(_, _, joinType, _,
-        s1 @ SortExec(_, _, ShuffleStage(left: ShuffleStageInfo), _),
-        s2 @ SortExec(_, _, ShuffleStage(right: ShuffleStageInfo), _), _)
-        if supportedJoinTypes.contains(joinType) =>
-      assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length)
-      val numPartitions = left.partitionsWithSizes.length
-      // We use the median size of the original shuffle partitions to detect skewed partitions.
-      val leftMedSize = medianSize(left.mapStats)
-      val rightMedSize = medianSize(right.mapStats)
-      logDebug(
-        s"""
-          |Optimizing skewed join.
-          |Left side partitions size info:
-          |${getSizeInfo(leftMedSize, left.mapStats.bytesByPartitionId)}
-          |Right side partitions size info:
-          |${getSizeInfo(rightMedSize, right.mapStats.bytesByPartitionId)}
-        """.stripMargin)
-      val canSplitLeft = canSplitLeftSide(joinType)
-      val canSplitRight = canSplitRightSide(joinType)
-      // We use the actual partition sizes (may be coalesced) to calculate target size, so that
-      // the final data distribution is even (coalesced partitions + split partitions).
-      val leftActualSizes = left.partitionsWithSizes.map(_._2)
-      val rightActualSizes = right.partitionsWithSizes.map(_._2)
-      val leftTargetSize = targetSize(leftActualSizes, leftMedSize)
-      val rightTargetSize = targetSize(rightActualSizes, rightMedSize)
-
-      val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
-      val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
-      var numSkewedLeft = 0
-      var numSkewedRight = 0
-      for (partitionIndex <- 0 until numPartitions) {
-        val leftActualSize = leftActualSizes(partitionIndex)
-        val isLeftSkew = isSkewed(leftActualSize, leftMedSize) && canSplitLeft
-        val leftPartSpec = left.partitionsWithSizes(partitionIndex)._1
-        val isLeftCoalesced = leftPartSpec.startReducerIndex + 1 < leftPartSpec.endReducerIndex
-
-        val rightActualSize = rightActualSizes(partitionIndex)
-        val isRightSkew = isSkewed(rightActualSize, rightMedSize) && canSplitRight
-        val rightPartSpec = right.partitionsWithSizes(partitionIndex)._1
-        val isRightCoalesced = rightPartSpec.startReducerIndex + 1 < rightPartSpec.endReducerIndex
-
-        // A skewed partition should never be coalesced, but skip it here just to be safe.
-        val leftParts = if (isLeftSkew && !isLeftCoalesced) {
-          val reducerId = leftPartSpec.startReducerIndex
-          val skewSpecs = createSkewPartitionSpecs(
-            left.mapStats.shuffleId, reducerId, leftTargetSize)
-          if (skewSpecs.isDefined) {
-            logDebug(s"Left side partition $partitionIndex " +
-              s"(${FileUtils.byteCountToDisplaySize(leftActualSize)}) is skewed, " +
-              s"split it into ${skewSpecs.get.length} parts.")
-            numSkewedLeft += 1
-          }
-          skewSpecs.getOrElse(Seq(leftPartSpec))
-        } else {
-          Seq(leftPartSpec)
+  private def getOptimizedChildren(
+      left: ShuffleStageInfo,
+      right: ShuffleStageInfo,
+      joinType: JoinType): Option[(SparkPlan, SparkPlan)] = {
+    assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length)
+    val numPartitions = left.partitionsWithSizes.length
+    // We use the median size of the original shuffle partitions to detect skewed partitions.
+    val leftMedSize = medianSize(left.mapStats)
+    val rightMedSize = medianSize(right.mapStats)
+    logDebug(
+      s"""
+         |Optimizing skewed join.
+         |Left side partitions size info:
+         |${getSizeInfo(leftMedSize, left.mapStats.bytesByPartitionId)}
+         |Right side partitions size info:
+         |${getSizeInfo(rightMedSize, right.mapStats.bytesByPartitionId)}
+      """.stripMargin)
+    val canSplitLeft = canSplitLeftSide(joinType)
+    val canSplitRight = canSplitRightSide(joinType)
+    // We use the actual partition sizes (may be coalesced) to calculate target size, so that
+    // the final data distribution is even (coalesced partitions + split partitions).
+    val leftActualSizes = left.partitionsWithSizes.map(_._2)
+    val rightActualSizes = right.partitionsWithSizes.map(_._2)
+    val leftTargetSize = targetSize(leftActualSizes, leftMedSize)
+    val rightTargetSize = targetSize(rightActualSizes, rightMedSize)
+
+    val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
+    val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
+    var numSkewedLeft = 0
+    var numSkewedRight = 0
+    for (partitionIndex <- 0 until numPartitions) {
+      val leftActualSize = leftActualSizes(partitionIndex)
+      val isLeftSkew = isSkewed(leftActualSize, leftMedSize) && canSplitLeft
+      val leftPartSpec = left.partitionsWithSizes(partitionIndex)._1
+      val isLeftCoalesced = leftPartSpec.startReducerIndex + 1 < leftPartSpec.endReducerIndex
+
+      val rightActualSize = rightActualSizes(partitionIndex)
+      val isRightSkew = isSkewed(rightActualSize, rightMedSize) && canSplitRight
+      val rightPartSpec = right.partitionsWithSizes(partitionIndex)._1
+      val isRightCoalesced = rightPartSpec.startReducerIndex + 1 < rightPartSpec.endReducerIndex
+
+      // A skewed partition should never be coalesced, but skip it here just to be safe.
+      val leftParts = if (isLeftSkew && !isLeftCoalesced) {
+        val reducerId = leftPartSpec.startReducerIndex
+        val skewSpecs = createSkewPartitionSpecs(
+          left.mapStats.shuffleId, reducerId, leftTargetSize)
+        if (skewSpecs.isDefined) {
+          logDebug(s"Left side partition $partitionIndex " +
+            s"(${FileUtils.byteCountToDisplaySize(leftActualSize)}) is skewed, " +
+            s"split it into ${skewSpecs.get.length} parts.")
+          numSkewedLeft += 1
         }
+        skewSpecs.getOrElse(Seq(leftPartSpec))
+      } else {
+        Seq(leftPartSpec)
+      }
 
-        // A skewed partition should never be coalesced, but skip it here just to be safe.
-        val rightParts = if (isRightSkew && !isRightCoalesced) {
-          val reducerId = rightPartSpec.startReducerIndex
-          val skewSpecs = createSkewPartitionSpecs(
-            right.mapStats.shuffleId, reducerId, rightTargetSize)
-          if (skewSpecs.isDefined) {
-            logDebug(s"Right side partition $partitionIndex " +
-              s"(${FileUtils.byteCountToDisplaySize(rightActualSize)}) is skewed, " +
-              s"split it into ${skewSpecs.get.length} parts.")
-            numSkewedRight += 1
-          }
-          skewSpecs.getOrElse(Seq(rightPartSpec))
-        } else {
-          Seq(rightPartSpec)
+      // A skewed partition should never be coalesced, but skip it here just to be safe.
+      val rightParts = if (isRightSkew && !isRightCoalesced) {
+        val reducerId = rightPartSpec.startReducerIndex
+        val skewSpecs = createSkewPartitionSpecs(
+          right.mapStats.shuffleId, reducerId, rightTargetSize)
+        if (skewSpecs.isDefined) {
+          logDebug(s"Right side partition $partitionIndex " +
+            s"(${FileUtils.byteCountToDisplaySize(rightActualSize)}) is skewed, " +
+            s"split it into ${skewSpecs.get.length} parts.")
+          numSkewedRight += 1
         }
+        skewSpecs.getOrElse(Seq(rightPartSpec))
+      } else {
+        Seq(rightPartSpec)
+      }
 
-        for {
-          leftSidePartition <- leftParts
-          rightSidePartition <- rightParts
-        } {
-          leftSidePartitions += leftSidePartition
-          rightSidePartitions += rightSidePartition
-        }
+      for {
+        leftSidePartition <- leftParts
+        rightSidePartition <- rightParts
+      } {
+        leftSidePartitions += leftSidePartition
+        rightSidePartitions += rightSidePartition
       }
+    }
+    logDebug(s"number of skewed partitions: left $numSkewedLeft, right $numSkewedRight")
+    if (numSkewedLeft > 0 || numSkewedRight > 0) {
+      Some((CustomShuffleReaderExec(left.shuffleStage, leftSidePartitions.toSeq),
+        CustomShuffleReaderExec(right.shuffleStage, rightSidePartitions.toSeq)))
+    } else {
+      None
+    }
+  }
 

Review comment:
       nit: It looks a unnecessary change.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
##########
@@ -52,16 +51,6 @@ case class SortMergeJoinExec(
 
   override def stringArgs: Iterator[Any] = super.stringArgs.toSeq.dropRight(1).iterator
 
-  override def requiredChildDistribution: Seq[Distribution] = {

Review comment:
       Could you move `nodeName`, too?
   ```
     override def nodeName: String = {
       if (isSkewJoin) super.nodeName + "(skew=true)" else super.nodeName
     }
   ```

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
##########
@@ -685,64 +691,82 @@ class AdaptiveQueryExecSuite
   }
 
   test("SPARK-29544: adaptive skew join with different join types") {
-    withSQLConf(
-      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
-      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
-      SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
-      SQLConf.SHUFFLE_PARTITIONS.key -> "100",
-      SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "800",
-      SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "800") {
-      withTempView("skewData1", "skewData2") {
-        spark
-          .range(0, 1000, 1, 10)
-          .select(
-            when('id < 250, 249)
-              .when('id >= 750, 1000)
-              .otherwise('id).as("key1"),
-            'id as "value1")
-          .createOrReplaceTempView("skewData1")
-        spark
-          .range(0, 1000, 1, 10)
-          .select(
-            when('id < 250, 249)
-              .otherwise('id).as("key2"),
-            'id as "value2")
-          .createOrReplaceTempView("skewData2")
+    Seq("SHUFFLE_MERGE", "SHUFFLE_HASH").foreach { joinHint =>
+      val isSMJ = joinHint == "SHUFFLE_MERGE"
+      withSQLConf(
+        SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+        SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+        SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
+        SQLConf.SHUFFLE_PARTITIONS.key -> "100",
+        SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "800",
+        SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "800") {
+        withTempView("skewData1", "skewData2") {
+          spark
+            .range(0, 1000, 1, 10)
+            .select(
+              when('id < 250, 249)
+                .when('id >= 750, 1000)
+                .otherwise('id).as("key1"),
+              'id as "value1")
+            .createOrReplaceTempView("skewData1")
+          spark
+            .range(0, 1000, 1, 10)
+            .select(
+              when('id < 250, 249)
+                .otherwise('id).as("key2"),
+              'id as "value2")
+            .createOrReplaceTempView("skewData2")
 
-        def checkSkewJoin(
-            joins: Seq[SortMergeJoinExec],
-            leftSkewNum: Int,
-            rightSkewNum: Int): Unit = {
-          assert(joins.size == 1 && joins.head.isSkewJoin)
-          assert(joins.head.left.collect {
-            case r: CustomShuffleReaderExec => r
-          }.head.partitionSpecs.collect {
-            case p: PartialReducerPartitionSpec => p.reducerIndex
-          }.distinct.length == leftSkewNum)
-          assert(joins.head.right.collect {
-            case r: CustomShuffleReaderExec => r
-          }.head.partitionSpecs.collect {
-            case p: PartialReducerPartitionSpec => p.reducerIndex
-          }.distinct.length == rightSkewNum)
-        }
+          def checkSkewJoin(
+              joins: Seq[ShuffledJoin],
+              leftSkewNum: Int,
+              rightSkewNum: Int): Unit = {
+            assert(joins.size == 1 && joins.head.isSkewJoin)
+            assert(joins.head.left.collect {
+              case r: CustomShuffleReaderExec => r
+            }.head.partitionSpecs.collect {
+              case p: PartialReducerPartitionSpec => p.reducerIndex
+            }.distinct.length == leftSkewNum)
+            assert(joins.head.right.collect {
+              case r: CustomShuffleReaderExec => r
+            }.head.partitionSpecs.collect {
+              case p: PartialReducerPartitionSpec => p.reducerIndex
+            }.distinct.length == rightSkewNum)
+          }
 
-        // skewed inner join optimization
-        val (_, innerAdaptivePlan) = runAdaptiveAndVerifyResult(
-          "SELECT * FROM skewData1 join skewData2 ON key1 = key2")
-        val innerSmj = findTopLevelSortMergeJoin(innerAdaptivePlan)
-        checkSkewJoin(innerSmj, 2, 1)
-
-        // skewed left outer join optimization
-        val (_, leftAdaptivePlan) = runAdaptiveAndVerifyResult(
-          "SELECT * FROM skewData1 left outer join skewData2 ON key1 = key2")
-        val leftSmj = findTopLevelSortMergeJoin(leftAdaptivePlan)
-        checkSkewJoin(leftSmj, 2, 0)
-
-        // skewed right outer join optimization
-        val (_, rightAdaptivePlan) = runAdaptiveAndVerifyResult(
-          "SELECT * FROM skewData1 right outer join skewData2 ON key1 = key2")
-        val rightSmj = findTopLevelSortMergeJoin(rightAdaptivePlan)
-        checkSkewJoin(rightSmj, 0, 1)
+          // skewed inner join optimization
+          val (_, innerAdaptivePlan) = runAdaptiveAndVerifyResult(
+            s"SELECT /*+ $joinHint(skewData1) */ * FROM skewData1 " +
+              s"JOIN skewData2 ON key1 = key2")
+          val inner = if (isSMJ) {
+            findTopLevelSortMergeJoin(innerAdaptivePlan)
+          } else {
+            findTopLevelShuffledHashJoin(innerAdaptivePlan)
+          }
+          checkSkewJoin(inner, 2, 1)
+
+          // skewed left outer join optimization
+          val (_, leftAdaptivePlan) = runAdaptiveAndVerifyResult(
+            s"SELECT /*+ $joinHint(skewData2) */ * FROM skewData1 " +
+              s"LEFT OUTER JOIN skewData2 ON key1 = key2")

Review comment:
       ditto




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826709887


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42471/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
ulysses-you commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-828322637


   thank you all !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826260384






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
ulysses-you commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826293222






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-827637229


   The overhead of skew join handling is to replicate the non-skew side, and the overhead is larger in SHJ because we need to build the hash relation. But the overhead of skew should be more significant than this.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you edited a comment on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
ulysses-you edited a comment on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-828077872


   @c21 ah, thank you, I'm glad to have some discussion, and also missed consider parallelism part : )  @cloud-fan thank you, addressed all comment.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
ulysses-you commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826585963


   @maropu thanks for the review, has addressed comment.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826259257






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #32328:
URL: https://github.com/apache/spark/pull/32328#discussion_r621850427



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
##########
@@ -148,7 +148,7 @@ object OptimizeSkewedJoin extends CustomShuffleReaderRule {
   /*
    * This method aim to optimize the skewed join with the following steps:

Review comment:
       My hunch is when the build side can be potentially OOM-ed, it should already be considered as skewed. So after AQE skew handling, some of potentially OOM-ed build side (inner join only) can be avoided.
   
   However, for queries with other join types, queries not having shuffle before join, and queries with run-time hash map being significantly larger than partition size, we should have run-time fallback mechanism in shuffled hash join itself. This PR and #32210 should be good to have and orthogonal to each other.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-828097504






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #32328:
URL: https://github.com/apache/spark/pull/32328#discussion_r621248692



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
##########
@@ -157,98 +157,121 @@ object OptimizeSkewedJoin extends CustomShuffleReaderRule {
    * 4. Wrap the join right child with a special shuffle reader that reads partition0 3 times by
    *    3 tasks separately.
    */
-  def optimizeSkewJoin(plan: SparkPlan): SparkPlan = plan.transformUp {
-    case smj @ SortMergeJoinExec(_, _, joinType, _,
-        s1 @ SortExec(_, _, ShuffleStage(left: ShuffleStageInfo), _),
-        s2 @ SortExec(_, _, ShuffleStage(right: ShuffleStageInfo), _), _)
-        if supportedJoinTypes.contains(joinType) =>
-      assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length)
-      val numPartitions = left.partitionsWithSizes.length
-      // We use the median size of the original shuffle partitions to detect skewed partitions.
-      val leftMedSize = medianSize(left.mapStats)
-      val rightMedSize = medianSize(right.mapStats)
-      logDebug(
-        s"""
-          |Optimizing skewed join.
-          |Left side partitions size info:
-          |${getSizeInfo(leftMedSize, left.mapStats.bytesByPartitionId)}
-          |Right side partitions size info:
-          |${getSizeInfo(rightMedSize, right.mapStats.bytesByPartitionId)}
-        """.stripMargin)
-      val canSplitLeft = canSplitLeftSide(joinType)
-      val canSplitRight = canSplitRightSide(joinType)
-      // We use the actual partition sizes (may be coalesced) to calculate target size, so that
-      // the final data distribution is even (coalesced partitions + split partitions).
-      val leftActualSizes = left.partitionsWithSizes.map(_._2)
-      val rightActualSizes = right.partitionsWithSizes.map(_._2)
-      val leftTargetSize = targetSize(leftActualSizes, leftMedSize)
-      val rightTargetSize = targetSize(rightActualSizes, rightMedSize)
-
-      val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
-      val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
-      var numSkewedLeft = 0
-      var numSkewedRight = 0
-      for (partitionIndex <- 0 until numPartitions) {
-        val leftActualSize = leftActualSizes(partitionIndex)
-        val isLeftSkew = isSkewed(leftActualSize, leftMedSize) && canSplitLeft
-        val leftPartSpec = left.partitionsWithSizes(partitionIndex)._1
-        val isLeftCoalesced = leftPartSpec.startReducerIndex + 1 < leftPartSpec.endReducerIndex
-
-        val rightActualSize = rightActualSizes(partitionIndex)
-        val isRightSkew = isSkewed(rightActualSize, rightMedSize) && canSplitRight
-        val rightPartSpec = right.partitionsWithSizes(partitionIndex)._1
-        val isRightCoalesced = rightPartSpec.startReducerIndex + 1 < rightPartSpec.endReducerIndex
-
-        // A skewed partition should never be coalesced, but skip it here just to be safe.
-        val leftParts = if (isLeftSkew && !isLeftCoalesced) {
-          val reducerId = leftPartSpec.startReducerIndex
-          val skewSpecs = createSkewPartitionSpecs(
-            left.mapStats.shuffleId, reducerId, leftTargetSize)
-          if (skewSpecs.isDefined) {
-            logDebug(s"Left side partition $partitionIndex " +
-              s"(${FileUtils.byteCountToDisplaySize(leftActualSize)}) is skewed, " +
-              s"split it into ${skewSpecs.get.length} parts.")
-            numSkewedLeft += 1
-          }
-          skewSpecs.getOrElse(Seq(leftPartSpec))
-        } else {
-          Seq(leftPartSpec)
+  private def tryToOptimizedChildren(
+      left: ShuffleStageInfo,
+      right: ShuffleStageInfo,
+      joinType: JoinType): Option[(SparkPlan, SparkPlan)] = {
+    assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length)
+    val numPartitions = left.partitionsWithSizes.length
+    // We use the median size of the original shuffle partitions to detect skewed partitions.
+    val leftMedSize = medianSize(left.mapStats)
+    val rightMedSize = medianSize(right.mapStats)
+    logDebug(
+      s"""
+         |Optimizing skewed join.
+         |Left side partitions size info:
+         |${getSizeInfo(leftMedSize, left.mapStats.bytesByPartitionId)}
+         |Right side partitions size info:
+         |${getSizeInfo(rightMedSize, right.mapStats.bytesByPartitionId)}
+      """.stripMargin)
+    val canSplitLeft = canSplitLeftSide(joinType)
+    val canSplitRight = canSplitRightSide(joinType)
+    // We use the actual partition sizes (may be coalesced) to calculate target size, so that
+    // the final data distribution is even (coalesced partitions + split partitions).
+    val leftActualSizes = left.partitionsWithSizes.map(_._2)
+    val rightActualSizes = right.partitionsWithSizes.map(_._2)
+    val leftTargetSize = targetSize(leftActualSizes, leftMedSize)
+    val rightTargetSize = targetSize(rightActualSizes, rightMedSize)
+
+    val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
+    val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
+    var numSkewedLeft = 0
+    var numSkewedRight = 0
+    for (partitionIndex <- 0 until numPartitions) {
+      val leftActualSize = leftActualSizes(partitionIndex)
+      val isLeftSkew = isSkewed(leftActualSize, leftMedSize) && canSplitLeft
+      val leftPartSpec = left.partitionsWithSizes(partitionIndex)._1
+      val isLeftCoalesced = leftPartSpec.startReducerIndex + 1 < leftPartSpec.endReducerIndex
+
+      val rightActualSize = rightActualSizes(partitionIndex)
+      val isRightSkew = isSkewed(rightActualSize, rightMedSize) && canSplitRight
+      val rightPartSpec = right.partitionsWithSizes(partitionIndex)._1
+      val isRightCoalesced = rightPartSpec.startReducerIndex + 1 < rightPartSpec.endReducerIndex
+
+      // A skewed partition should never be coalesced, but skip it here just to be safe.
+      val leftParts = if (isLeftSkew && !isLeftCoalesced) {
+        val reducerId = leftPartSpec.startReducerIndex
+        val skewSpecs = createSkewPartitionSpecs(
+          left.mapStats.shuffleId, reducerId, leftTargetSize)
+        if (skewSpecs.isDefined) {
+          logDebug(s"Left side partition $partitionIndex " +
+            s"(${FileUtils.byteCountToDisplaySize(leftActualSize)}) is skewed, " +
+            s"split it into ${skewSpecs.get.length} parts.")
+          numSkewedLeft += 1
         }
+        skewSpecs.getOrElse(Seq(leftPartSpec))
+      } else {
+        Seq(leftPartSpec)
+      }
 
-        // A skewed partition should never be coalesced, but skip it here just to be safe.
-        val rightParts = if (isRightSkew && !isRightCoalesced) {
-          val reducerId = rightPartSpec.startReducerIndex
-          val skewSpecs = createSkewPartitionSpecs(
-            right.mapStats.shuffleId, reducerId, rightTargetSize)
-          if (skewSpecs.isDefined) {
-            logDebug(s"Right side partition $partitionIndex " +
-              s"(${FileUtils.byteCountToDisplaySize(rightActualSize)}) is skewed, " +
-              s"split it into ${skewSpecs.get.length} parts.")
-            numSkewedRight += 1
-          }
-          skewSpecs.getOrElse(Seq(rightPartSpec))
-        } else {
-          Seq(rightPartSpec)
+      // A skewed partition should never be coalesced, but skip it here just to be safe.
+      val rightParts = if (isRightSkew && !isRightCoalesced) {
+        val reducerId = rightPartSpec.startReducerIndex
+        val skewSpecs = createSkewPartitionSpecs(
+          right.mapStats.shuffleId, reducerId, rightTargetSize)
+        if (skewSpecs.isDefined) {
+          logDebug(s"Right side partition $partitionIndex " +
+            s"(${FileUtils.byteCountToDisplaySize(rightActualSize)}) is skewed, " +
+            s"split it into ${skewSpecs.get.length} parts.")
+          numSkewedRight += 1
         }
+        skewSpecs.getOrElse(Seq(rightPartSpec))
+      } else {
+        Seq(rightPartSpec)
+      }
 
-        for {
-          leftSidePartition <- leftParts
-          rightSidePartition <- rightParts
-        } {
-          leftSidePartitions += leftSidePartition
-          rightSidePartitions += rightSidePartition
-        }
+      for {
+        leftSidePartition <- leftParts
+        rightSidePartition <- rightParts
+      } {
+        leftSidePartitions += leftSidePartition
+        rightSidePartitions += rightSidePartition
       }
+    }
+    logDebug(s"number of skewed partitions: left $numSkewedLeft, right $numSkewedRight")
+    if (numSkewedLeft > 0 || numSkewedRight > 0) {
+      Some((CustomShuffleReaderExec(left.shuffleStage, leftSidePartitions.toSeq),
+        CustomShuffleReaderExec(right.shuffleStage, rightSidePartitions.toSeq)))
+    } else {
+      None
+    }
+  }
 
-      logDebug(s"number of skewed partitions: left $numSkewedLeft, right $numSkewedRight")
-      if (numSkewedLeft > 0 || numSkewedRight > 0) {
-        val newLeft = CustomShuffleReaderExec(left.shuffleStage, leftSidePartitions.toSeq)
-        val newRight = CustomShuffleReaderExec(right.shuffleStage, rightSidePartitions.toSeq)
+  def optimizeSkewJoin(plan: SparkPlan): SparkPlan = plan.transformUp {
+    case smj @ SortMergeJoinExec(_, _, joinType, _,
+        s1 @ SortExec(_, _, ShuffleStage(left: ShuffleStageInfo), _),
+        s2 @ SortExec(_, _, ShuffleStage(right: ShuffleStageInfo), _), _)
+        if supportedJoinTypes.contains(joinType) =>
+      val newChildren = tryToOptimizedChildren(left, right, joinType)
+      if (newChildren.isDefined) {
+        val (newLeft, newRight) = newChildren.get
         smj.copy(
           left = s1.copy(child = newLeft), right = s2.copy(child = newRight), isSkewJoin = true)
       } else {
         smj
       }
+
+    case shj @ ShuffledHashJoinExec(_, _, joinType, _, _,
+        ShuffleStage(left: ShuffleStageInfo),
+        ShuffleStage(right: ShuffleStageInfo), _)

Review comment:
       ditto




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826298867


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/137906/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826260384


   **[Test build #137907 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137907/testReport)** for PR 32328 at commit [`d081031`](https://github.com/apache/spark/commit/d081031b54189f4c7059318aacfa03102a78a319).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826635951


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42464/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maropu commented on a change in pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
maropu commented on a change in pull request #32328:
URL: https://github.com/apache/spark/pull/32328#discussion_r620047543



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
##########
@@ -157,98 +157,121 @@ object OptimizeSkewedJoin extends CustomShuffleReaderRule {
    * 4. Wrap the join right child with a special shuffle reader that reads partition0 3 times by
    *    3 tasks separately.
    */
-  def optimizeSkewJoin(plan: SparkPlan): SparkPlan = plan.transformUp {
-    case smj @ SortMergeJoinExec(_, _, joinType, _,
-        s1 @ SortExec(_, _, ShuffleStage(left: ShuffleStageInfo), _),
-        s2 @ SortExec(_, _, ShuffleStage(right: ShuffleStageInfo), _), _)
-        if supportedJoinTypes.contains(joinType) =>
-      assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length)
-      val numPartitions = left.partitionsWithSizes.length
-      // We use the median size of the original shuffle partitions to detect skewed partitions.
-      val leftMedSize = medianSize(left.mapStats)
-      val rightMedSize = medianSize(right.mapStats)
-      logDebug(
-        s"""
-          |Optimizing skewed join.
-          |Left side partitions size info:
-          |${getSizeInfo(leftMedSize, left.mapStats.bytesByPartitionId)}
-          |Right side partitions size info:
-          |${getSizeInfo(rightMedSize, right.mapStats.bytesByPartitionId)}
-        """.stripMargin)
-      val canSplitLeft = canSplitLeftSide(joinType)
-      val canSplitRight = canSplitRightSide(joinType)
-      // We use the actual partition sizes (may be coalesced) to calculate target size, so that
-      // the final data distribution is even (coalesced partitions + split partitions).
-      val leftActualSizes = left.partitionsWithSizes.map(_._2)
-      val rightActualSizes = right.partitionsWithSizes.map(_._2)
-      val leftTargetSize = targetSize(leftActualSizes, leftMedSize)
-      val rightTargetSize = targetSize(rightActualSizes, rightMedSize)
-
-      val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
-      val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
-      var numSkewedLeft = 0
-      var numSkewedRight = 0
-      for (partitionIndex <- 0 until numPartitions) {
-        val leftActualSize = leftActualSizes(partitionIndex)
-        val isLeftSkew = isSkewed(leftActualSize, leftMedSize) && canSplitLeft
-        val leftPartSpec = left.partitionsWithSizes(partitionIndex)._1
-        val isLeftCoalesced = leftPartSpec.startReducerIndex + 1 < leftPartSpec.endReducerIndex
-
-        val rightActualSize = rightActualSizes(partitionIndex)
-        val isRightSkew = isSkewed(rightActualSize, rightMedSize) && canSplitRight
-        val rightPartSpec = right.partitionsWithSizes(partitionIndex)._1
-        val isRightCoalesced = rightPartSpec.startReducerIndex + 1 < rightPartSpec.endReducerIndex
-
-        // A skewed partition should never be coalesced, but skip it here just to be safe.
-        val leftParts = if (isLeftSkew && !isLeftCoalesced) {
-          val reducerId = leftPartSpec.startReducerIndex
-          val skewSpecs = createSkewPartitionSpecs(
-            left.mapStats.shuffleId, reducerId, leftTargetSize)
-          if (skewSpecs.isDefined) {
-            logDebug(s"Left side partition $partitionIndex " +
-              s"(${FileUtils.byteCountToDisplaySize(leftActualSize)}) is skewed, " +
-              s"split it into ${skewSpecs.get.length} parts.")
-            numSkewedLeft += 1
-          }
-          skewSpecs.getOrElse(Seq(leftPartSpec))
-        } else {
-          Seq(leftPartSpec)
+  private def getOptimizedChildren(
+      left: ShuffleStageInfo,
+      right: ShuffleStageInfo,
+      joinType: JoinType): Option[(SparkPlan, SparkPlan)] = {
+    assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length)
+    val numPartitions = left.partitionsWithSizes.length
+    // We use the median size of the original shuffle partitions to detect skewed partitions.
+    val leftMedSize = medianSize(left.mapStats)
+    val rightMedSize = medianSize(right.mapStats)
+    logDebug(
+      s"""
+         |Optimizing skewed join.
+         |Left side partitions size info:
+         |${getSizeInfo(leftMedSize, left.mapStats.bytesByPartitionId)}
+         |Right side partitions size info:
+         |${getSizeInfo(rightMedSize, right.mapStats.bytesByPartitionId)}
+      """.stripMargin)
+    val canSplitLeft = canSplitLeftSide(joinType)
+    val canSplitRight = canSplitRightSide(joinType)
+    // We use the actual partition sizes (may be coalesced) to calculate target size, so that
+    // the final data distribution is even (coalesced partitions + split partitions).
+    val leftActualSizes = left.partitionsWithSizes.map(_._2)
+    val rightActualSizes = right.partitionsWithSizes.map(_._2)
+    val leftTargetSize = targetSize(leftActualSizes, leftMedSize)
+    val rightTargetSize = targetSize(rightActualSizes, rightMedSize)
+
+    val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
+    val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
+    var numSkewedLeft = 0
+    var numSkewedRight = 0
+    for (partitionIndex <- 0 until numPartitions) {
+      val leftActualSize = leftActualSizes(partitionIndex)
+      val isLeftSkew = isSkewed(leftActualSize, leftMedSize) && canSplitLeft
+      val leftPartSpec = left.partitionsWithSizes(partitionIndex)._1
+      val isLeftCoalesced = leftPartSpec.startReducerIndex + 1 < leftPartSpec.endReducerIndex
+
+      val rightActualSize = rightActualSizes(partitionIndex)
+      val isRightSkew = isSkewed(rightActualSize, rightMedSize) && canSplitRight
+      val rightPartSpec = right.partitionsWithSizes(partitionIndex)._1
+      val isRightCoalesced = rightPartSpec.startReducerIndex + 1 < rightPartSpec.endReducerIndex
+
+      // A skewed partition should never be coalesced, but skip it here just to be safe.
+      val leftParts = if (isLeftSkew && !isLeftCoalesced) {
+        val reducerId = leftPartSpec.startReducerIndex
+        val skewSpecs = createSkewPartitionSpecs(
+          left.mapStats.shuffleId, reducerId, leftTargetSize)
+        if (skewSpecs.isDefined) {
+          logDebug(s"Left side partition $partitionIndex " +
+            s"(${FileUtils.byteCountToDisplaySize(leftActualSize)}) is skewed, " +
+            s"split it into ${skewSpecs.get.length} parts.")
+          numSkewedLeft += 1
         }
+        skewSpecs.getOrElse(Seq(leftPartSpec))
+      } else {
+        Seq(leftPartSpec)
+      }
 
-        // A skewed partition should never be coalesced, but skip it here just to be safe.
-        val rightParts = if (isRightSkew && !isRightCoalesced) {
-          val reducerId = rightPartSpec.startReducerIndex
-          val skewSpecs = createSkewPartitionSpecs(
-            right.mapStats.shuffleId, reducerId, rightTargetSize)
-          if (skewSpecs.isDefined) {
-            logDebug(s"Right side partition $partitionIndex " +
-              s"(${FileUtils.byteCountToDisplaySize(rightActualSize)}) is skewed, " +
-              s"split it into ${skewSpecs.get.length} parts.")
-            numSkewedRight += 1
-          }
-          skewSpecs.getOrElse(Seq(rightPartSpec))
-        } else {
-          Seq(rightPartSpec)
+      // A skewed partition should never be coalesced, but skip it here just to be safe.
+      val rightParts = if (isRightSkew && !isRightCoalesced) {
+        val reducerId = rightPartSpec.startReducerIndex
+        val skewSpecs = createSkewPartitionSpecs(
+          right.mapStats.shuffleId, reducerId, rightTargetSize)
+        if (skewSpecs.isDefined) {
+          logDebug(s"Right side partition $partitionIndex " +
+            s"(${FileUtils.byteCountToDisplaySize(rightActualSize)}) is skewed, " +
+            s"split it into ${skewSpecs.get.length} parts.")
+          numSkewedRight += 1
         }
+        skewSpecs.getOrElse(Seq(rightPartSpec))
+      } else {
+        Seq(rightPartSpec)
+      }
 
-        for {
-          leftSidePartition <- leftParts
-          rightSidePartition <- rightParts
-        } {
-          leftSidePartitions += leftSidePartition
-          rightSidePartitions += rightSidePartition
-        }
+      for {
+        leftSidePartition <- leftParts
+        rightSidePartition <- rightParts
+      } {
+        leftSidePartitions += leftSidePartition
+        rightSidePartitions += rightSidePartition
       }
+    }
+    logDebug(s"number of skewed partitions: left $numSkewedLeft, right $numSkewedRight")
+    if (numSkewedLeft > 0 || numSkewedRight > 0) {
+      Some((CustomShuffleReaderExec(left.shuffleStage, leftSidePartitions.toSeq),
+        CustomShuffleReaderExec(right.shuffleStage, rightSidePartitions.toSeq)))
+    } else {
+      None
+    }
+  }
 

Review comment:
       Ah, okay. In the last commit, I found the unnecessary change removing the blank line.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-828097530


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42536/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maropu commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
maropu commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826505942


   Also, cc: @c21 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maropu closed pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
maropu closed pull request #32328:
URL: https://github.com/apache/spark/pull/32328


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
ulysses-you commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826293222


   retest this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826265715






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a change in pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on a change in pull request #32328:
URL: https://github.com/apache/spark/pull/32328#discussion_r621764319



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
##########
@@ -148,7 +148,7 @@ object OptimizeSkewedJoin extends CustomShuffleReaderRule {
   /*
    * This method aim to optimize the skewed join with the following steps:

Review comment:
       immature thoughts on this optimization:
   
   `OptimizeSkewedJoin` try to make sizes of shuffle partitions more even. 
   
   For bhj, should we change the *optimization goal of build side* to making sure build side fit in memory to avoid OOM or fallback to smj.
   
   Is it possible that the build side is skewed according current criterions, but the shuffle partitions all fit in memory?
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-828076268


   **[Test build #138017 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138017/testReport)** for PR 32328 at commit [`cab5290`](https://github.com/apache/spark/commit/cab5290a89526a32f826f1d2fffff9132d6e2a8d).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
ulysses-you commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826626797


   @c21  thanks for the input.
   
   Yes,  we can not optimize skew with all type of joins's build side in AQE. But at least, currently we can handle the skew inner like join with both stream and build side.
   
   > If that's true, it sounds to me that it may potentially introduce more OOM on build side, as tasks are sharing executor's off-heap memory to build hash maps
   
   Yes, it's a side effect for `OptimizeSkewedJoin`, but smj's advantage is it could spill. IMO, if user specify the shuffled hash join to do execution that means they know the benefit and issue of it. And in the other hand, we can easily increase the menroy but hard to make skew join fast.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826298867


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/137906/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826292015


   **[Test build #137906 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137906/testReport)** for PR 32328 at commit [`08a779a`](https://github.com/apache/spark/commit/08a779a24763e4b46ee8e7baf4f68f590c9e1a70).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826306414


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42439/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826714952


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42471/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826585501


   **[Test build #137942 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137942/testReport)** for PR 32328 at commit [`d42a732`](https://github.com/apache/spark/commit/d42a73287556fc7c8875a29d6cc1ee21325828d5).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826877781


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/137950/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826802299


   **[Test build #137942 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137942/testReport)** for PR 32328 at commit [`d42a732`](https://github.com/apache/spark/commit/d42a73287556fc7c8875a29d6cc1ee21325828d5).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826669098


   **[Test build #137950 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137950/testReport)** for PR 32328 at commit [`ea209e3`](https://github.com/apache/spark/commit/ea209e356bc1e12d75055ba5508d136ea28bc22c).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826259257


   **[Test build #137906 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137906/testReport)** for PR 32328 at commit [`08a779a`](https://github.com/apache/spark/commit/08a779a24763e4b46ee8e7baf4f68f590c9e1a70).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
c21 commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-827072227


   > Yes, it's a side effect for OptimizeSkewedJoin, but smj's advantage is it could spill. IMO, if user specify the shuffled hash join to do execution that means they know the benefit and issue of it. And in the other hand, we can easily increase the memory but hard to make skew join fast. So this optimization can be the extra choice for user.
   
   I agree this adds extra choice for user given current status of thing. But in the long-term, we would like to work towards enabling shuffled hash join by default (i.e. `spark.sql.join.preferSortMergeJoin`=false). This seems to me add more [risk](https://github.com/apache/spark/pull/32328#issuecomment-826601325) to the long term direction. So I think we should be more cautious with it and have more discussion. cc @cloud-fan.
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #32328:
URL: https://github.com/apache/spark/pull/32328#discussion_r621248493



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
##########
@@ -157,98 +157,121 @@ object OptimizeSkewedJoin extends CustomShuffleReaderRule {
    * 4. Wrap the join right child with a special shuffle reader that reads partition0 3 times by
    *    3 tasks separately.
    */
-  def optimizeSkewJoin(plan: SparkPlan): SparkPlan = plan.transformUp {
-    case smj @ SortMergeJoinExec(_, _, joinType, _,
-        s1 @ SortExec(_, _, ShuffleStage(left: ShuffleStageInfo), _),
-        s2 @ SortExec(_, _, ShuffleStage(right: ShuffleStageInfo), _), _)
-        if supportedJoinTypes.contains(joinType) =>
-      assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length)
-      val numPartitions = left.partitionsWithSizes.length
-      // We use the median size of the original shuffle partitions to detect skewed partitions.
-      val leftMedSize = medianSize(left.mapStats)
-      val rightMedSize = medianSize(right.mapStats)
-      logDebug(
-        s"""
-          |Optimizing skewed join.
-          |Left side partitions size info:
-          |${getSizeInfo(leftMedSize, left.mapStats.bytesByPartitionId)}
-          |Right side partitions size info:
-          |${getSizeInfo(rightMedSize, right.mapStats.bytesByPartitionId)}
-        """.stripMargin)
-      val canSplitLeft = canSplitLeftSide(joinType)
-      val canSplitRight = canSplitRightSide(joinType)
-      // We use the actual partition sizes (may be coalesced) to calculate target size, so that
-      // the final data distribution is even (coalesced partitions + split partitions).
-      val leftActualSizes = left.partitionsWithSizes.map(_._2)
-      val rightActualSizes = right.partitionsWithSizes.map(_._2)
-      val leftTargetSize = targetSize(leftActualSizes, leftMedSize)
-      val rightTargetSize = targetSize(rightActualSizes, rightMedSize)
-
-      val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
-      val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
-      var numSkewedLeft = 0
-      var numSkewedRight = 0
-      for (partitionIndex <- 0 until numPartitions) {
-        val leftActualSize = leftActualSizes(partitionIndex)
-        val isLeftSkew = isSkewed(leftActualSize, leftMedSize) && canSplitLeft
-        val leftPartSpec = left.partitionsWithSizes(partitionIndex)._1
-        val isLeftCoalesced = leftPartSpec.startReducerIndex + 1 < leftPartSpec.endReducerIndex
-
-        val rightActualSize = rightActualSizes(partitionIndex)
-        val isRightSkew = isSkewed(rightActualSize, rightMedSize) && canSplitRight
-        val rightPartSpec = right.partitionsWithSizes(partitionIndex)._1
-        val isRightCoalesced = rightPartSpec.startReducerIndex + 1 < rightPartSpec.endReducerIndex
-
-        // A skewed partition should never be coalesced, but skip it here just to be safe.
-        val leftParts = if (isLeftSkew && !isLeftCoalesced) {
-          val reducerId = leftPartSpec.startReducerIndex
-          val skewSpecs = createSkewPartitionSpecs(
-            left.mapStats.shuffleId, reducerId, leftTargetSize)
-          if (skewSpecs.isDefined) {
-            logDebug(s"Left side partition $partitionIndex " +
-              s"(${FileUtils.byteCountToDisplaySize(leftActualSize)}) is skewed, " +
-              s"split it into ${skewSpecs.get.length} parts.")
-            numSkewedLeft += 1
-          }
-          skewSpecs.getOrElse(Seq(leftPartSpec))
-        } else {
-          Seq(leftPartSpec)
+  private def tryToOptimizedChildren(
+      left: ShuffleStageInfo,
+      right: ShuffleStageInfo,
+      joinType: JoinType): Option[(SparkPlan, SparkPlan)] = {
+    assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length)
+    val numPartitions = left.partitionsWithSizes.length
+    // We use the median size of the original shuffle partitions to detect skewed partitions.
+    val leftMedSize = medianSize(left.mapStats)
+    val rightMedSize = medianSize(right.mapStats)
+    logDebug(
+      s"""
+         |Optimizing skewed join.
+         |Left side partitions size info:
+         |${getSizeInfo(leftMedSize, left.mapStats.bytesByPartitionId)}
+         |Right side partitions size info:
+         |${getSizeInfo(rightMedSize, right.mapStats.bytesByPartitionId)}
+      """.stripMargin)
+    val canSplitLeft = canSplitLeftSide(joinType)
+    val canSplitRight = canSplitRightSide(joinType)
+    // We use the actual partition sizes (may be coalesced) to calculate target size, so that
+    // the final data distribution is even (coalesced partitions + split partitions).
+    val leftActualSizes = left.partitionsWithSizes.map(_._2)
+    val rightActualSizes = right.partitionsWithSizes.map(_._2)
+    val leftTargetSize = targetSize(leftActualSizes, leftMedSize)
+    val rightTargetSize = targetSize(rightActualSizes, rightMedSize)
+
+    val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
+    val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
+    var numSkewedLeft = 0
+    var numSkewedRight = 0
+    for (partitionIndex <- 0 until numPartitions) {
+      val leftActualSize = leftActualSizes(partitionIndex)
+      val isLeftSkew = isSkewed(leftActualSize, leftMedSize) && canSplitLeft
+      val leftPartSpec = left.partitionsWithSizes(partitionIndex)._1
+      val isLeftCoalesced = leftPartSpec.startReducerIndex + 1 < leftPartSpec.endReducerIndex
+
+      val rightActualSize = rightActualSizes(partitionIndex)
+      val isRightSkew = isSkewed(rightActualSize, rightMedSize) && canSplitRight
+      val rightPartSpec = right.partitionsWithSizes(partitionIndex)._1
+      val isRightCoalesced = rightPartSpec.startReducerIndex + 1 < rightPartSpec.endReducerIndex
+
+      // A skewed partition should never be coalesced, but skip it here just to be safe.
+      val leftParts = if (isLeftSkew && !isLeftCoalesced) {
+        val reducerId = leftPartSpec.startReducerIndex
+        val skewSpecs = createSkewPartitionSpecs(
+          left.mapStats.shuffleId, reducerId, leftTargetSize)
+        if (skewSpecs.isDefined) {
+          logDebug(s"Left side partition $partitionIndex " +
+            s"(${FileUtils.byteCountToDisplaySize(leftActualSize)}) is skewed, " +
+            s"split it into ${skewSpecs.get.length} parts.")
+          numSkewedLeft += 1
         }
+        skewSpecs.getOrElse(Seq(leftPartSpec))
+      } else {
+        Seq(leftPartSpec)
+      }
 
-        // A skewed partition should never be coalesced, but skip it here just to be safe.
-        val rightParts = if (isRightSkew && !isRightCoalesced) {
-          val reducerId = rightPartSpec.startReducerIndex
-          val skewSpecs = createSkewPartitionSpecs(
-            right.mapStats.shuffleId, reducerId, rightTargetSize)
-          if (skewSpecs.isDefined) {
-            logDebug(s"Right side partition $partitionIndex " +
-              s"(${FileUtils.byteCountToDisplaySize(rightActualSize)}) is skewed, " +
-              s"split it into ${skewSpecs.get.length} parts.")
-            numSkewedRight += 1
-          }
-          skewSpecs.getOrElse(Seq(rightPartSpec))
-        } else {
-          Seq(rightPartSpec)
+      // A skewed partition should never be coalesced, but skip it here just to be safe.
+      val rightParts = if (isRightSkew && !isRightCoalesced) {
+        val reducerId = rightPartSpec.startReducerIndex
+        val skewSpecs = createSkewPartitionSpecs(
+          right.mapStats.shuffleId, reducerId, rightTargetSize)
+        if (skewSpecs.isDefined) {
+          logDebug(s"Right side partition $partitionIndex " +
+            s"(${FileUtils.byteCountToDisplaySize(rightActualSize)}) is skewed, " +
+            s"split it into ${skewSpecs.get.length} parts.")
+          numSkewedRight += 1
         }
+        skewSpecs.getOrElse(Seq(rightPartSpec))
+      } else {
+        Seq(rightPartSpec)
+      }
 
-        for {
-          leftSidePartition <- leftParts
-          rightSidePartition <- rightParts
-        } {
-          leftSidePartitions += leftSidePartition
-          rightSidePartitions += rightSidePartition
-        }
+      for {
+        leftSidePartition <- leftParts
+        rightSidePartition <- rightParts
+      } {
+        leftSidePartitions += leftSidePartition
+        rightSidePartitions += rightSidePartition
       }
+    }
+    logDebug(s"number of skewed partitions: left $numSkewedLeft, right $numSkewedRight")
+    if (numSkewedLeft > 0 || numSkewedRight > 0) {
+      Some((CustomShuffleReaderExec(left.shuffleStage, leftSidePartitions.toSeq),
+        CustomShuffleReaderExec(right.shuffleStage, rightSidePartitions.toSeq)))
+    } else {
+      None
+    }
+  }
 
-      logDebug(s"number of skewed partitions: left $numSkewedLeft, right $numSkewedRight")
-      if (numSkewedLeft > 0 || numSkewedRight > 0) {
-        val newLeft = CustomShuffleReaderExec(left.shuffleStage, leftSidePartitions.toSeq)
-        val newRight = CustomShuffleReaderExec(right.shuffleStage, rightSidePartitions.toSeq)
+  def optimizeSkewJoin(plan: SparkPlan): SparkPlan = plan.transformUp {
+    case smj @ SortMergeJoinExec(_, _, joinType, _,
+        s1 @ SortExec(_, _, ShuffleStage(left: ShuffleStageInfo), _),
+        s2 @ SortExec(_, _, ShuffleStage(right: ShuffleStageInfo), _), _)

Review comment:
       nit: let's make sure we don't repeatedly optimize skew join. The pattern match should make sure `isSkewJoin` is false.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan edited a comment on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
cloud-fan edited a comment on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-827637229


   The overhead of skew join handling is to replicate the non-skew side, and the overhead is larger in SHJ because we need to build the hash relation. But the overhead of skew should be more significant than this.
   
   I don't think this PR makes OOM much more likely to happen. It does add more tasks that need to build a hash relation, but the parallelism on the executor JVM is not increased. E.g. we may still run 8 tasks at the same time on one executor to build hash relations.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maropu commented on a change in pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
maropu commented on a change in pull request #32328:
URL: https://github.com/apache/spark/pull/32328#discussion_r620000087



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledJoin.scala
##########
@@ -19,15 +19,23 @@ package org.apache.spark.sql.execution.joins
 
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, LeftExistence, LeftOuter, RightOuter}
-import org.apache.spark.sql.catalyst.plans.physical.{Distribution, HashClusteredDistribution, Partitioning, PartitioningCollection, UnknownPartitioning}
+import org.apache.spark.sql.catalyst.plans.physical.{Distribution, HashClusteredDistribution, Partitioning, PartitioningCollection, UnknownPartitioning, UnspecifiedDistribution}
 
 /**
  * Holds common logic for join operators by shuffling two child relations
  * using the join keys.
  */
 trait ShuffledJoin extends BaseJoinExec {
+  def isSkewJoin: Boolean

Review comment:
       Please update the description in SQLConf? https://github.com/apache/spark/blob/38ef4771d447f6135382ee2767b3f32b96cb1b0e/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L541-L548

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
##########
@@ -157,98 +157,121 @@ object OptimizeSkewedJoin extends CustomShuffleReaderRule {
    * 4. Wrap the join right child with a special shuffle reader that reads partition0 3 times by
    *    3 tasks separately.
    */
-  def optimizeSkewJoin(plan: SparkPlan): SparkPlan = plan.transformUp {
-    case smj @ SortMergeJoinExec(_, _, joinType, _,
-        s1 @ SortExec(_, _, ShuffleStage(left: ShuffleStageInfo), _),
-        s2 @ SortExec(_, _, ShuffleStage(right: ShuffleStageInfo), _), _)
-        if supportedJoinTypes.contains(joinType) =>
-      assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length)
-      val numPartitions = left.partitionsWithSizes.length
-      // We use the median size of the original shuffle partitions to detect skewed partitions.
-      val leftMedSize = medianSize(left.mapStats)
-      val rightMedSize = medianSize(right.mapStats)
-      logDebug(
-        s"""
-          |Optimizing skewed join.
-          |Left side partitions size info:
-          |${getSizeInfo(leftMedSize, left.mapStats.bytesByPartitionId)}
-          |Right side partitions size info:
-          |${getSizeInfo(rightMedSize, right.mapStats.bytesByPartitionId)}
-        """.stripMargin)
-      val canSplitLeft = canSplitLeftSide(joinType)
-      val canSplitRight = canSplitRightSide(joinType)
-      // We use the actual partition sizes (may be coalesced) to calculate target size, so that
-      // the final data distribution is even (coalesced partitions + split partitions).
-      val leftActualSizes = left.partitionsWithSizes.map(_._2)
-      val rightActualSizes = right.partitionsWithSizes.map(_._2)
-      val leftTargetSize = targetSize(leftActualSizes, leftMedSize)
-      val rightTargetSize = targetSize(rightActualSizes, rightMedSize)
-
-      val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
-      val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
-      var numSkewedLeft = 0
-      var numSkewedRight = 0
-      for (partitionIndex <- 0 until numPartitions) {
-        val leftActualSize = leftActualSizes(partitionIndex)
-        val isLeftSkew = isSkewed(leftActualSize, leftMedSize) && canSplitLeft
-        val leftPartSpec = left.partitionsWithSizes(partitionIndex)._1
-        val isLeftCoalesced = leftPartSpec.startReducerIndex + 1 < leftPartSpec.endReducerIndex
-
-        val rightActualSize = rightActualSizes(partitionIndex)
-        val isRightSkew = isSkewed(rightActualSize, rightMedSize) && canSplitRight
-        val rightPartSpec = right.partitionsWithSizes(partitionIndex)._1
-        val isRightCoalesced = rightPartSpec.startReducerIndex + 1 < rightPartSpec.endReducerIndex
-
-        // A skewed partition should never be coalesced, but skip it here just to be safe.
-        val leftParts = if (isLeftSkew && !isLeftCoalesced) {
-          val reducerId = leftPartSpec.startReducerIndex
-          val skewSpecs = createSkewPartitionSpecs(
-            left.mapStats.shuffleId, reducerId, leftTargetSize)
-          if (skewSpecs.isDefined) {
-            logDebug(s"Left side partition $partitionIndex " +
-              s"(${FileUtils.byteCountToDisplaySize(leftActualSize)}) is skewed, " +
-              s"split it into ${skewSpecs.get.length} parts.")
-            numSkewedLeft += 1
-          }
-          skewSpecs.getOrElse(Seq(leftPartSpec))
-        } else {
-          Seq(leftPartSpec)
+  private def getOptimizedChildren(

Review comment:
       nit: How about `getOptimizedChildren` -> `tryToOptimizedChildren`?

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
##########
@@ -685,64 +691,82 @@ class AdaptiveQueryExecSuite
   }
 
   test("SPARK-29544: adaptive skew join with different join types") {
-    withSQLConf(
-      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
-      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
-      SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
-      SQLConf.SHUFFLE_PARTITIONS.key -> "100",
-      SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "800",
-      SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "800") {
-      withTempView("skewData1", "skewData2") {
-        spark
-          .range(0, 1000, 1, 10)
-          .select(
-            when('id < 250, 249)
-              .when('id >= 750, 1000)
-              .otherwise('id).as("key1"),
-            'id as "value1")
-          .createOrReplaceTempView("skewData1")
-        spark
-          .range(0, 1000, 1, 10)
-          .select(
-            when('id < 250, 249)
-              .otherwise('id).as("key2"),
-            'id as "value2")
-          .createOrReplaceTempView("skewData2")
+    Seq("SHUFFLE_MERGE", "SHUFFLE_HASH").foreach { joinHint =>
+      val isSMJ = joinHint == "SHUFFLE_MERGE"
+      withSQLConf(
+        SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+        SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+        SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
+        SQLConf.SHUFFLE_PARTITIONS.key -> "100",
+        SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "800",
+        SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "800") {
+        withTempView("skewData1", "skewData2") {
+          spark
+            .range(0, 1000, 1, 10)
+            .select(
+              when('id < 250, 249)
+                .when('id >= 750, 1000)
+                .otherwise('id).as("key1"),
+              'id as "value1")
+            .createOrReplaceTempView("skewData1")
+          spark
+            .range(0, 1000, 1, 10)
+            .select(
+              when('id < 250, 249)
+                .otherwise('id).as("key2"),
+              'id as "value2")
+            .createOrReplaceTempView("skewData2")
 
-        def checkSkewJoin(
-            joins: Seq[SortMergeJoinExec],
-            leftSkewNum: Int,
-            rightSkewNum: Int): Unit = {
-          assert(joins.size == 1 && joins.head.isSkewJoin)
-          assert(joins.head.left.collect {
-            case r: CustomShuffleReaderExec => r
-          }.head.partitionSpecs.collect {
-            case p: PartialReducerPartitionSpec => p.reducerIndex
-          }.distinct.length == leftSkewNum)
-          assert(joins.head.right.collect {
-            case r: CustomShuffleReaderExec => r
-          }.head.partitionSpecs.collect {
-            case p: PartialReducerPartitionSpec => p.reducerIndex
-          }.distinct.length == rightSkewNum)
-        }
+          def checkSkewJoin(
+              joins: Seq[ShuffledJoin],
+              leftSkewNum: Int,
+              rightSkewNum: Int): Unit = {
+            assert(joins.size == 1 && joins.head.isSkewJoin)
+            assert(joins.head.left.collect {
+              case r: CustomShuffleReaderExec => r
+            }.head.partitionSpecs.collect {
+              case p: PartialReducerPartitionSpec => p.reducerIndex
+            }.distinct.length == leftSkewNum)
+            assert(joins.head.right.collect {
+              case r: CustomShuffleReaderExec => r
+            }.head.partitionSpecs.collect {
+              case p: PartialReducerPartitionSpec => p.reducerIndex
+            }.distinct.length == rightSkewNum)
+          }
 
-        // skewed inner join optimization
-        val (_, innerAdaptivePlan) = runAdaptiveAndVerifyResult(
-          "SELECT * FROM skewData1 join skewData2 ON key1 = key2")
-        val innerSmj = findTopLevelSortMergeJoin(innerAdaptivePlan)
-        checkSkewJoin(innerSmj, 2, 1)
-
-        // skewed left outer join optimization
-        val (_, leftAdaptivePlan) = runAdaptiveAndVerifyResult(
-          "SELECT * FROM skewData1 left outer join skewData2 ON key1 = key2")
-        val leftSmj = findTopLevelSortMergeJoin(leftAdaptivePlan)
-        checkSkewJoin(leftSmj, 2, 0)
-
-        // skewed right outer join optimization
-        val (_, rightAdaptivePlan) = runAdaptiveAndVerifyResult(
-          "SELECT * FROM skewData1 right outer join skewData2 ON key1 = key2")
-        val rightSmj = findTopLevelSortMergeJoin(rightAdaptivePlan)
-        checkSkewJoin(rightSmj, 0, 1)
+          // skewed inner join optimization
+          val (_, innerAdaptivePlan) = runAdaptiveAndVerifyResult(
+            s"SELECT /*+ $joinHint(skewData1) */ * FROM skewData1 " +
+              s"JOIN skewData2 ON key1 = key2")
+          val inner = if (isSMJ) {
+            findTopLevelSortMergeJoin(innerAdaptivePlan)
+          } else {
+            findTopLevelShuffledHashJoin(innerAdaptivePlan)
+          }
+          checkSkewJoin(inner, 2, 1)
+
+          // skewed left outer join optimization
+          val (_, leftAdaptivePlan) = runAdaptiveAndVerifyResult(
+            s"SELECT /*+ $joinHint(skewData2) */ * FROM skewData1 " +
+              s"LEFT OUTER JOIN skewData2 ON key1 = key2")
+          val leftSmj = if (isSMJ) {
+            findTopLevelSortMergeJoin(leftAdaptivePlan)
+          } else {
+            findTopLevelShuffledHashJoin(leftAdaptivePlan)
+          }
+          checkSkewJoin(leftSmj, 2, 0)
+
+          // skewed right outer join optimization
+          val (_, rightAdaptivePlan) = runAdaptiveAndVerifyResult(
+            s"SELECT /*+ $joinHint(skewData1) */ * FROM skewData1 " +
+              s"RIGHT OUTER JOIN skewData2 ON key1 = key2")

Review comment:
       ditto

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
##########
@@ -685,64 +691,82 @@ class AdaptiveQueryExecSuite
   }
 
   test("SPARK-29544: adaptive skew join with different join types") {
-    withSQLConf(
-      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
-      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
-      SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
-      SQLConf.SHUFFLE_PARTITIONS.key -> "100",
-      SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "800",
-      SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "800") {
-      withTempView("skewData1", "skewData2") {
-        spark
-          .range(0, 1000, 1, 10)
-          .select(
-            when('id < 250, 249)
-              .when('id >= 750, 1000)
-              .otherwise('id).as("key1"),
-            'id as "value1")
-          .createOrReplaceTempView("skewData1")
-        spark
-          .range(0, 1000, 1, 10)
-          .select(
-            when('id < 250, 249)
-              .otherwise('id).as("key2"),
-            'id as "value2")
-          .createOrReplaceTempView("skewData2")
+    Seq("SHUFFLE_MERGE", "SHUFFLE_HASH").foreach { joinHint =>
+      val isSMJ = joinHint == "SHUFFLE_MERGE"
+      withSQLConf(
+        SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+        SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+        SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
+        SQLConf.SHUFFLE_PARTITIONS.key -> "100",
+        SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "800",
+        SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "800") {
+        withTempView("skewData1", "skewData2") {
+          spark
+            .range(0, 1000, 1, 10)
+            .select(
+              when('id < 250, 249)
+                .when('id >= 750, 1000)
+                .otherwise('id).as("key1"),
+              'id as "value1")
+            .createOrReplaceTempView("skewData1")
+          spark
+            .range(0, 1000, 1, 10)
+            .select(
+              when('id < 250, 249)
+                .otherwise('id).as("key2"),
+              'id as "value2")
+            .createOrReplaceTempView("skewData2")
 
-        def checkSkewJoin(
-            joins: Seq[SortMergeJoinExec],
-            leftSkewNum: Int,
-            rightSkewNum: Int): Unit = {
-          assert(joins.size == 1 && joins.head.isSkewJoin)
-          assert(joins.head.left.collect {
-            case r: CustomShuffleReaderExec => r
-          }.head.partitionSpecs.collect {
-            case p: PartialReducerPartitionSpec => p.reducerIndex
-          }.distinct.length == leftSkewNum)
-          assert(joins.head.right.collect {
-            case r: CustomShuffleReaderExec => r
-          }.head.partitionSpecs.collect {
-            case p: PartialReducerPartitionSpec => p.reducerIndex
-          }.distinct.length == rightSkewNum)
-        }
+          def checkSkewJoin(
+              joins: Seq[ShuffledJoin],
+              leftSkewNum: Int,
+              rightSkewNum: Int): Unit = {
+            assert(joins.size == 1 && joins.head.isSkewJoin)
+            assert(joins.head.left.collect {
+              case r: CustomShuffleReaderExec => r
+            }.head.partitionSpecs.collect {
+              case p: PartialReducerPartitionSpec => p.reducerIndex
+            }.distinct.length == leftSkewNum)
+            assert(joins.head.right.collect {
+              case r: CustomShuffleReaderExec => r
+            }.head.partitionSpecs.collect {
+              case p: PartialReducerPartitionSpec => p.reducerIndex
+            }.distinct.length == rightSkewNum)
+          }
 
-        // skewed inner join optimization
-        val (_, innerAdaptivePlan) = runAdaptiveAndVerifyResult(
-          "SELECT * FROM skewData1 join skewData2 ON key1 = key2")
-        val innerSmj = findTopLevelSortMergeJoin(innerAdaptivePlan)
-        checkSkewJoin(innerSmj, 2, 1)
-
-        // skewed left outer join optimization
-        val (_, leftAdaptivePlan) = runAdaptiveAndVerifyResult(
-          "SELECT * FROM skewData1 left outer join skewData2 ON key1 = key2")
-        val leftSmj = findTopLevelSortMergeJoin(leftAdaptivePlan)
-        checkSkewJoin(leftSmj, 2, 0)
-
-        // skewed right outer join optimization
-        val (_, rightAdaptivePlan) = runAdaptiveAndVerifyResult(
-          "SELECT * FROM skewData1 right outer join skewData2 ON key1 = key2")
-        val rightSmj = findTopLevelSortMergeJoin(rightAdaptivePlan)
-        checkSkewJoin(rightSmj, 0, 1)
+          // skewed inner join optimization
+          val (_, innerAdaptivePlan) = runAdaptiveAndVerifyResult(
+            s"SELECT /*+ $joinHint(skewData1) */ * FROM skewData1 " +
+              s"JOIN skewData2 ON key1 = key2")

Review comment:
       nit: remove `s`.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
##########
@@ -157,98 +157,121 @@ object OptimizeSkewedJoin extends CustomShuffleReaderRule {
    * 4. Wrap the join right child with a special shuffle reader that reads partition0 3 times by
    *    3 tasks separately.
    */
-  def optimizeSkewJoin(plan: SparkPlan): SparkPlan = plan.transformUp {
-    case smj @ SortMergeJoinExec(_, _, joinType, _,
-        s1 @ SortExec(_, _, ShuffleStage(left: ShuffleStageInfo), _),
-        s2 @ SortExec(_, _, ShuffleStage(right: ShuffleStageInfo), _), _)
-        if supportedJoinTypes.contains(joinType) =>
-      assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length)
-      val numPartitions = left.partitionsWithSizes.length
-      // We use the median size of the original shuffle partitions to detect skewed partitions.
-      val leftMedSize = medianSize(left.mapStats)
-      val rightMedSize = medianSize(right.mapStats)
-      logDebug(
-        s"""
-          |Optimizing skewed join.
-          |Left side partitions size info:
-          |${getSizeInfo(leftMedSize, left.mapStats.bytesByPartitionId)}
-          |Right side partitions size info:
-          |${getSizeInfo(rightMedSize, right.mapStats.bytesByPartitionId)}
-        """.stripMargin)
-      val canSplitLeft = canSplitLeftSide(joinType)
-      val canSplitRight = canSplitRightSide(joinType)
-      // We use the actual partition sizes (may be coalesced) to calculate target size, so that
-      // the final data distribution is even (coalesced partitions + split partitions).
-      val leftActualSizes = left.partitionsWithSizes.map(_._2)
-      val rightActualSizes = right.partitionsWithSizes.map(_._2)
-      val leftTargetSize = targetSize(leftActualSizes, leftMedSize)
-      val rightTargetSize = targetSize(rightActualSizes, rightMedSize)
-
-      val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
-      val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
-      var numSkewedLeft = 0
-      var numSkewedRight = 0
-      for (partitionIndex <- 0 until numPartitions) {
-        val leftActualSize = leftActualSizes(partitionIndex)
-        val isLeftSkew = isSkewed(leftActualSize, leftMedSize) && canSplitLeft
-        val leftPartSpec = left.partitionsWithSizes(partitionIndex)._1
-        val isLeftCoalesced = leftPartSpec.startReducerIndex + 1 < leftPartSpec.endReducerIndex
-
-        val rightActualSize = rightActualSizes(partitionIndex)
-        val isRightSkew = isSkewed(rightActualSize, rightMedSize) && canSplitRight
-        val rightPartSpec = right.partitionsWithSizes(partitionIndex)._1
-        val isRightCoalesced = rightPartSpec.startReducerIndex + 1 < rightPartSpec.endReducerIndex
-
-        // A skewed partition should never be coalesced, but skip it here just to be safe.
-        val leftParts = if (isLeftSkew && !isLeftCoalesced) {
-          val reducerId = leftPartSpec.startReducerIndex
-          val skewSpecs = createSkewPartitionSpecs(
-            left.mapStats.shuffleId, reducerId, leftTargetSize)
-          if (skewSpecs.isDefined) {
-            logDebug(s"Left side partition $partitionIndex " +
-              s"(${FileUtils.byteCountToDisplaySize(leftActualSize)}) is skewed, " +
-              s"split it into ${skewSpecs.get.length} parts.")
-            numSkewedLeft += 1
-          }
-          skewSpecs.getOrElse(Seq(leftPartSpec))
-        } else {
-          Seq(leftPartSpec)
+  private def getOptimizedChildren(
+      left: ShuffleStageInfo,
+      right: ShuffleStageInfo,
+      joinType: JoinType): Option[(SparkPlan, SparkPlan)] = {
+    assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length)
+    val numPartitions = left.partitionsWithSizes.length
+    // We use the median size of the original shuffle partitions to detect skewed partitions.
+    val leftMedSize = medianSize(left.mapStats)
+    val rightMedSize = medianSize(right.mapStats)
+    logDebug(
+      s"""
+         |Optimizing skewed join.
+         |Left side partitions size info:
+         |${getSizeInfo(leftMedSize, left.mapStats.bytesByPartitionId)}
+         |Right side partitions size info:
+         |${getSizeInfo(rightMedSize, right.mapStats.bytesByPartitionId)}
+      """.stripMargin)
+    val canSplitLeft = canSplitLeftSide(joinType)
+    val canSplitRight = canSplitRightSide(joinType)
+    // We use the actual partition sizes (may be coalesced) to calculate target size, so that
+    // the final data distribution is even (coalesced partitions + split partitions).
+    val leftActualSizes = left.partitionsWithSizes.map(_._2)
+    val rightActualSizes = right.partitionsWithSizes.map(_._2)
+    val leftTargetSize = targetSize(leftActualSizes, leftMedSize)
+    val rightTargetSize = targetSize(rightActualSizes, rightMedSize)
+
+    val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
+    val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
+    var numSkewedLeft = 0
+    var numSkewedRight = 0
+    for (partitionIndex <- 0 until numPartitions) {
+      val leftActualSize = leftActualSizes(partitionIndex)
+      val isLeftSkew = isSkewed(leftActualSize, leftMedSize) && canSplitLeft
+      val leftPartSpec = left.partitionsWithSizes(partitionIndex)._1
+      val isLeftCoalesced = leftPartSpec.startReducerIndex + 1 < leftPartSpec.endReducerIndex
+
+      val rightActualSize = rightActualSizes(partitionIndex)
+      val isRightSkew = isSkewed(rightActualSize, rightMedSize) && canSplitRight
+      val rightPartSpec = right.partitionsWithSizes(partitionIndex)._1
+      val isRightCoalesced = rightPartSpec.startReducerIndex + 1 < rightPartSpec.endReducerIndex
+
+      // A skewed partition should never be coalesced, but skip it here just to be safe.
+      val leftParts = if (isLeftSkew && !isLeftCoalesced) {
+        val reducerId = leftPartSpec.startReducerIndex
+        val skewSpecs = createSkewPartitionSpecs(
+          left.mapStats.shuffleId, reducerId, leftTargetSize)
+        if (skewSpecs.isDefined) {
+          logDebug(s"Left side partition $partitionIndex " +
+            s"(${FileUtils.byteCountToDisplaySize(leftActualSize)}) is skewed, " +
+            s"split it into ${skewSpecs.get.length} parts.")
+          numSkewedLeft += 1
         }
+        skewSpecs.getOrElse(Seq(leftPartSpec))
+      } else {
+        Seq(leftPartSpec)
+      }
 
-        // A skewed partition should never be coalesced, but skip it here just to be safe.
-        val rightParts = if (isRightSkew && !isRightCoalesced) {
-          val reducerId = rightPartSpec.startReducerIndex
-          val skewSpecs = createSkewPartitionSpecs(
-            right.mapStats.shuffleId, reducerId, rightTargetSize)
-          if (skewSpecs.isDefined) {
-            logDebug(s"Right side partition $partitionIndex " +
-              s"(${FileUtils.byteCountToDisplaySize(rightActualSize)}) is skewed, " +
-              s"split it into ${skewSpecs.get.length} parts.")
-            numSkewedRight += 1
-          }
-          skewSpecs.getOrElse(Seq(rightPartSpec))
-        } else {
-          Seq(rightPartSpec)
+      // A skewed partition should never be coalesced, but skip it here just to be safe.
+      val rightParts = if (isRightSkew && !isRightCoalesced) {
+        val reducerId = rightPartSpec.startReducerIndex
+        val skewSpecs = createSkewPartitionSpecs(
+          right.mapStats.shuffleId, reducerId, rightTargetSize)
+        if (skewSpecs.isDefined) {
+          logDebug(s"Right side partition $partitionIndex " +
+            s"(${FileUtils.byteCountToDisplaySize(rightActualSize)}) is skewed, " +
+            s"split it into ${skewSpecs.get.length} parts.")
+          numSkewedRight += 1
         }
+        skewSpecs.getOrElse(Seq(rightPartSpec))
+      } else {
+        Seq(rightPartSpec)
+      }
 
-        for {
-          leftSidePartition <- leftParts
-          rightSidePartition <- rightParts
-        } {
-          leftSidePartitions += leftSidePartition
-          rightSidePartitions += rightSidePartition
-        }
+      for {
+        leftSidePartition <- leftParts
+        rightSidePartition <- rightParts
+      } {
+        leftSidePartitions += leftSidePartition
+        rightSidePartitions += rightSidePartition
       }
+    }
+    logDebug(s"number of skewed partitions: left $numSkewedLeft, right $numSkewedRight")
+    if (numSkewedLeft > 0 || numSkewedRight > 0) {
+      Some((CustomShuffleReaderExec(left.shuffleStage, leftSidePartitions.toSeq),
+        CustomShuffleReaderExec(right.shuffleStage, rightSidePartitions.toSeq)))
+    } else {
+      None
+    }
+  }
 

Review comment:
       nit: It looks a unnecessary change.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
##########
@@ -52,16 +51,6 @@ case class SortMergeJoinExec(
 
   override def stringArgs: Iterator[Any] = super.stringArgs.toSeq.dropRight(1).iterator
 
-  override def requiredChildDistribution: Seq[Distribution] = {

Review comment:
       Could you move `nodeName`, too?
   ```
     override def nodeName: String = {
       if (isSkewJoin) super.nodeName + "(skew=true)" else super.nodeName
     }
   ```

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
##########
@@ -685,64 +691,82 @@ class AdaptiveQueryExecSuite
   }
 
   test("SPARK-29544: adaptive skew join with different join types") {
-    withSQLConf(
-      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
-      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
-      SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
-      SQLConf.SHUFFLE_PARTITIONS.key -> "100",
-      SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "800",
-      SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "800") {
-      withTempView("skewData1", "skewData2") {
-        spark
-          .range(0, 1000, 1, 10)
-          .select(
-            when('id < 250, 249)
-              .when('id >= 750, 1000)
-              .otherwise('id).as("key1"),
-            'id as "value1")
-          .createOrReplaceTempView("skewData1")
-        spark
-          .range(0, 1000, 1, 10)
-          .select(
-            when('id < 250, 249)
-              .otherwise('id).as("key2"),
-            'id as "value2")
-          .createOrReplaceTempView("skewData2")
+    Seq("SHUFFLE_MERGE", "SHUFFLE_HASH").foreach { joinHint =>
+      val isSMJ = joinHint == "SHUFFLE_MERGE"
+      withSQLConf(
+        SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+        SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+        SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
+        SQLConf.SHUFFLE_PARTITIONS.key -> "100",
+        SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "800",
+        SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "800") {
+        withTempView("skewData1", "skewData2") {
+          spark
+            .range(0, 1000, 1, 10)
+            .select(
+              when('id < 250, 249)
+                .when('id >= 750, 1000)
+                .otherwise('id).as("key1"),
+              'id as "value1")
+            .createOrReplaceTempView("skewData1")
+          spark
+            .range(0, 1000, 1, 10)
+            .select(
+              when('id < 250, 249)
+                .otherwise('id).as("key2"),
+              'id as "value2")
+            .createOrReplaceTempView("skewData2")
 
-        def checkSkewJoin(
-            joins: Seq[SortMergeJoinExec],
-            leftSkewNum: Int,
-            rightSkewNum: Int): Unit = {
-          assert(joins.size == 1 && joins.head.isSkewJoin)
-          assert(joins.head.left.collect {
-            case r: CustomShuffleReaderExec => r
-          }.head.partitionSpecs.collect {
-            case p: PartialReducerPartitionSpec => p.reducerIndex
-          }.distinct.length == leftSkewNum)
-          assert(joins.head.right.collect {
-            case r: CustomShuffleReaderExec => r
-          }.head.partitionSpecs.collect {
-            case p: PartialReducerPartitionSpec => p.reducerIndex
-          }.distinct.length == rightSkewNum)
-        }
+          def checkSkewJoin(
+              joins: Seq[ShuffledJoin],
+              leftSkewNum: Int,
+              rightSkewNum: Int): Unit = {
+            assert(joins.size == 1 && joins.head.isSkewJoin)
+            assert(joins.head.left.collect {
+              case r: CustomShuffleReaderExec => r
+            }.head.partitionSpecs.collect {
+              case p: PartialReducerPartitionSpec => p.reducerIndex
+            }.distinct.length == leftSkewNum)
+            assert(joins.head.right.collect {
+              case r: CustomShuffleReaderExec => r
+            }.head.partitionSpecs.collect {
+              case p: PartialReducerPartitionSpec => p.reducerIndex
+            }.distinct.length == rightSkewNum)
+          }
 
-        // skewed inner join optimization
-        val (_, innerAdaptivePlan) = runAdaptiveAndVerifyResult(
-          "SELECT * FROM skewData1 join skewData2 ON key1 = key2")
-        val innerSmj = findTopLevelSortMergeJoin(innerAdaptivePlan)
-        checkSkewJoin(innerSmj, 2, 1)
-
-        // skewed left outer join optimization
-        val (_, leftAdaptivePlan) = runAdaptiveAndVerifyResult(
-          "SELECT * FROM skewData1 left outer join skewData2 ON key1 = key2")
-        val leftSmj = findTopLevelSortMergeJoin(leftAdaptivePlan)
-        checkSkewJoin(leftSmj, 2, 0)
-
-        // skewed right outer join optimization
-        val (_, rightAdaptivePlan) = runAdaptiveAndVerifyResult(
-          "SELECT * FROM skewData1 right outer join skewData2 ON key1 = key2")
-        val rightSmj = findTopLevelSortMergeJoin(rightAdaptivePlan)
-        checkSkewJoin(rightSmj, 0, 1)
+          // skewed inner join optimization
+          val (_, innerAdaptivePlan) = runAdaptiveAndVerifyResult(
+            s"SELECT /*+ $joinHint(skewData1) */ * FROM skewData1 " +
+              s"JOIN skewData2 ON key1 = key2")
+          val inner = if (isSMJ) {
+            findTopLevelSortMergeJoin(innerAdaptivePlan)
+          } else {
+            findTopLevelShuffledHashJoin(innerAdaptivePlan)
+          }
+          checkSkewJoin(inner, 2, 1)
+
+          // skewed left outer join optimization
+          val (_, leftAdaptivePlan) = runAdaptiveAndVerifyResult(
+            s"SELECT /*+ $joinHint(skewData2) */ * FROM skewData1 " +
+              s"LEFT OUTER JOIN skewData2 ON key1 = key2")

Review comment:
       ditto




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
ulysses-you commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826440413


   Do you have time to take a look ? Thank you. @maropu @cloud-fan @JkSelf @maryannxue 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826631848


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42464/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on a change in pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
ulysses-you commented on a change in pull request #32328:
URL: https://github.com/apache/spark/pull/32328#discussion_r621825338



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
##########
@@ -148,7 +148,7 @@ object OptimizeSkewedJoin extends CustomShuffleReaderRule {
   /*
    * This method aim to optimize the skewed join with the following steps:

Review comment:
       In current implementaion, we have already supported optimize skew build side with inner join. For other join type we cann't optimize it due to the semantics.
   
   IMO It's better to consider OOM at shj itself instead of fallback to smj which might make things more complicated. Actually, we might change smj to shj at `reOptimize` if we disable the `preferSortMerge`, so it's confused to change join strategy again.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826713623


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42471/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826803910


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/137942/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-828097530


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42536/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826585501


   **[Test build #137942 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137942/testReport)** for PR 32328 at commit [`d42a732`](https://github.com/apache/spark/commit/d42a73287556fc7c8875a29d6cc1ee21325828d5).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826872755


   **[Test build #137950 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137950/testReport)** for PR 32328 at commit [`ea209e3`](https://github.com/apache/spark/commit/ea209e356bc1e12d75055ba5508d136ea28bc22c).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maropu commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
maropu commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826505942


   Also, cc: @c21 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826635951


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42464/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a change in pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on a change in pull request #32328:
URL: https://github.com/apache/spark/pull/32328#discussion_r620046259



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
##########
@@ -231,11 +231,12 @@ object EnsureRequirements extends Rule[SparkPlan] {
    */
   private def reorderJoinPredicates(plan: SparkPlan): SparkPlan = {
     plan match {
-      case ShuffledHashJoinExec(leftKeys, rightKeys, joinType, buildSide, condition, left, right) =>
+      case ShuffledHashJoinExec(
+        leftKeys, rightKeys, joinType, buildSide, condition, left, right, isSkew) =>
         val (reorderedLeftKeys, reorderedRightKeys) =
           reorderJoinKeys(leftKeys, rightKeys, left.outputPartitioning, right.outputPartitioning)
         ShuffledHashJoinExec(reorderedLeftKeys, reorderedRightKeys, joinType, buildSide, condition,
-          left, right)
+          left, right, isSkew)
 
       case SortMergeJoinExec(leftKeys, rightKeys, joinType, condition, left, right, isPartial) =>

Review comment:
       totally nit: what about renaming `isPartial` to `isSkew` here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826265547






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826260384


   **[Test build #137907 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137907/testReport)** for PR 32328 at commit [`d081031`](https://github.com/apache/spark/commit/d081031b54189f4c7059318aacfa03102a78a319).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #32328:
URL: https://github.com/apache/spark/pull/32328#discussion_r621235456



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
##########
@@ -157,98 +157,121 @@ object OptimizeSkewedJoin extends CustomShuffleReaderRule {
    * 4. Wrap the join right child with a special shuffle reader that reads partition0 3 times by
    *    3 tasks separately.
    */
-  def optimizeSkewJoin(plan: SparkPlan): SparkPlan = plan.transformUp {
-    case smj @ SortMergeJoinExec(_, _, joinType, _,
-        s1 @ SortExec(_, _, ShuffleStage(left: ShuffleStageInfo), _),
-        s2 @ SortExec(_, _, ShuffleStage(right: ShuffleStageInfo), _), _)
-        if supportedJoinTypes.contains(joinType) =>
-      assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length)
-      val numPartitions = left.partitionsWithSizes.length
-      // We use the median size of the original shuffle partitions to detect skewed partitions.
-      val leftMedSize = medianSize(left.mapStats)
-      val rightMedSize = medianSize(right.mapStats)
-      logDebug(
-        s"""
-          |Optimizing skewed join.
-          |Left side partitions size info:
-          |${getSizeInfo(leftMedSize, left.mapStats.bytesByPartitionId)}
-          |Right side partitions size info:
-          |${getSizeInfo(rightMedSize, right.mapStats.bytesByPartitionId)}
-        """.stripMargin)
-      val canSplitLeft = canSplitLeftSide(joinType)
-      val canSplitRight = canSplitRightSide(joinType)
-      // We use the actual partition sizes (may be coalesced) to calculate target size, so that
-      // the final data distribution is even (coalesced partitions + split partitions).
-      val leftActualSizes = left.partitionsWithSizes.map(_._2)
-      val rightActualSizes = right.partitionsWithSizes.map(_._2)
-      val leftTargetSize = targetSize(leftActualSizes, leftMedSize)
-      val rightTargetSize = targetSize(rightActualSizes, rightMedSize)
-
-      val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
-      val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
-      var numSkewedLeft = 0
-      var numSkewedRight = 0
-      for (partitionIndex <- 0 until numPartitions) {
-        val leftActualSize = leftActualSizes(partitionIndex)
-        val isLeftSkew = isSkewed(leftActualSize, leftMedSize) && canSplitLeft
-        val leftPartSpec = left.partitionsWithSizes(partitionIndex)._1
-        val isLeftCoalesced = leftPartSpec.startReducerIndex + 1 < leftPartSpec.endReducerIndex
-
-        val rightActualSize = rightActualSizes(partitionIndex)
-        val isRightSkew = isSkewed(rightActualSize, rightMedSize) && canSplitRight
-        val rightPartSpec = right.partitionsWithSizes(partitionIndex)._1
-        val isRightCoalesced = rightPartSpec.startReducerIndex + 1 < rightPartSpec.endReducerIndex
-
-        // A skewed partition should never be coalesced, but skip it here just to be safe.
-        val leftParts = if (isLeftSkew && !isLeftCoalesced) {
-          val reducerId = leftPartSpec.startReducerIndex
-          val skewSpecs = createSkewPartitionSpecs(
-            left.mapStats.shuffleId, reducerId, leftTargetSize)
-          if (skewSpecs.isDefined) {
-            logDebug(s"Left side partition $partitionIndex " +
-              s"(${FileUtils.byteCountToDisplaySize(leftActualSize)}) is skewed, " +
-              s"split it into ${skewSpecs.get.length} parts.")
-            numSkewedLeft += 1
-          }
-          skewSpecs.getOrElse(Seq(leftPartSpec))
-        } else {
-          Seq(leftPartSpec)
+  private def tryToOptimizedChildren(

Review comment:
       `tryOptimizeJoinChildren`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826307358


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42439/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maropu commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
maropu commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-828238814


   Thanks! Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you edited a comment on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
ulysses-you edited a comment on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826626797


   @c21  thanks for the input.
   
   Yes,  we can not optimize skew with all type of joins's build side in AQE. But at least, currently we can handle the skew inner like join with both stream and build side.
   
   > If that's true, it sounds to me that it may potentially introduce more OOM on build side, as tasks are sharing executor's off-heap memory to build hash maps
   
   Yes, it's a side effect for `OptimizeSkewedJoin`, but smj's advantage is it could spill. IMO, if user specify the shuffled hash join to do execution that means they know the benefit and issue of it. And in the other hand, we can easily increase the memory but hard to make skew join fast. So this optimization can be the extra choice for user.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826265715






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
c21 commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-827818671


   > I don't think this PR makes OOM much more likely to happen. It does add more tasks that need to build a hash relation, but the parallelism on the executor JVM is not increased.
   
   @cloud-fan - yes you are right. I miss to think for the parallelism part. Then I think this PR is good to go and no risk. Sorry for chiming in, @ulysses-you .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826263493


   Kubernetes integration test unable to build dist.
   
   exiting with code: 1
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42430/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on a change in pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
ulysses-you commented on a change in pull request #32328:
URL: https://github.com/apache/spark/pull/32328#discussion_r620044607



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
##########
@@ -157,98 +157,121 @@ object OptimizeSkewedJoin extends CustomShuffleReaderRule {
    * 4. Wrap the join right child with a special shuffle reader that reads partition0 3 times by
    *    3 tasks separately.
    */
-  def optimizeSkewJoin(plan: SparkPlan): SparkPlan = plan.transformUp {
-    case smj @ SortMergeJoinExec(_, _, joinType, _,
-        s1 @ SortExec(_, _, ShuffleStage(left: ShuffleStageInfo), _),
-        s2 @ SortExec(_, _, ShuffleStage(right: ShuffleStageInfo), _), _)
-        if supportedJoinTypes.contains(joinType) =>
-      assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length)
-      val numPartitions = left.partitionsWithSizes.length
-      // We use the median size of the original shuffle partitions to detect skewed partitions.
-      val leftMedSize = medianSize(left.mapStats)
-      val rightMedSize = medianSize(right.mapStats)
-      logDebug(
-        s"""
-          |Optimizing skewed join.
-          |Left side partitions size info:
-          |${getSizeInfo(leftMedSize, left.mapStats.bytesByPartitionId)}
-          |Right side partitions size info:
-          |${getSizeInfo(rightMedSize, right.mapStats.bytesByPartitionId)}
-        """.stripMargin)
-      val canSplitLeft = canSplitLeftSide(joinType)
-      val canSplitRight = canSplitRightSide(joinType)
-      // We use the actual partition sizes (may be coalesced) to calculate target size, so that
-      // the final data distribution is even (coalesced partitions + split partitions).
-      val leftActualSizes = left.partitionsWithSizes.map(_._2)
-      val rightActualSizes = right.partitionsWithSizes.map(_._2)
-      val leftTargetSize = targetSize(leftActualSizes, leftMedSize)
-      val rightTargetSize = targetSize(rightActualSizes, rightMedSize)
-
-      val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
-      val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
-      var numSkewedLeft = 0
-      var numSkewedRight = 0
-      for (partitionIndex <- 0 until numPartitions) {
-        val leftActualSize = leftActualSizes(partitionIndex)
-        val isLeftSkew = isSkewed(leftActualSize, leftMedSize) && canSplitLeft
-        val leftPartSpec = left.partitionsWithSizes(partitionIndex)._1
-        val isLeftCoalesced = leftPartSpec.startReducerIndex + 1 < leftPartSpec.endReducerIndex
-
-        val rightActualSize = rightActualSizes(partitionIndex)
-        val isRightSkew = isSkewed(rightActualSize, rightMedSize) && canSplitRight
-        val rightPartSpec = right.partitionsWithSizes(partitionIndex)._1
-        val isRightCoalesced = rightPartSpec.startReducerIndex + 1 < rightPartSpec.endReducerIndex
-
-        // A skewed partition should never be coalesced, but skip it here just to be safe.
-        val leftParts = if (isLeftSkew && !isLeftCoalesced) {
-          val reducerId = leftPartSpec.startReducerIndex
-          val skewSpecs = createSkewPartitionSpecs(
-            left.mapStats.shuffleId, reducerId, leftTargetSize)
-          if (skewSpecs.isDefined) {
-            logDebug(s"Left side partition $partitionIndex " +
-              s"(${FileUtils.byteCountToDisplaySize(leftActualSize)}) is skewed, " +
-              s"split it into ${skewSpecs.get.length} parts.")
-            numSkewedLeft += 1
-          }
-          skewSpecs.getOrElse(Seq(leftPartSpec))
-        } else {
-          Seq(leftPartSpec)
+  private def getOptimizedChildren(
+      left: ShuffleStageInfo,
+      right: ShuffleStageInfo,
+      joinType: JoinType): Option[(SparkPlan, SparkPlan)] = {
+    assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length)
+    val numPartitions = left.partitionsWithSizes.length
+    // We use the median size of the original shuffle partitions to detect skewed partitions.
+    val leftMedSize = medianSize(left.mapStats)
+    val rightMedSize = medianSize(right.mapStats)
+    logDebug(
+      s"""
+         |Optimizing skewed join.
+         |Left side partitions size info:
+         |${getSizeInfo(leftMedSize, left.mapStats.bytesByPartitionId)}
+         |Right side partitions size info:
+         |${getSizeInfo(rightMedSize, right.mapStats.bytesByPartitionId)}
+      """.stripMargin)
+    val canSplitLeft = canSplitLeftSide(joinType)
+    val canSplitRight = canSplitRightSide(joinType)
+    // We use the actual partition sizes (may be coalesced) to calculate target size, so that
+    // the final data distribution is even (coalesced partitions + split partitions).
+    val leftActualSizes = left.partitionsWithSizes.map(_._2)
+    val rightActualSizes = right.partitionsWithSizes.map(_._2)
+    val leftTargetSize = targetSize(leftActualSizes, leftMedSize)
+    val rightTargetSize = targetSize(rightActualSizes, rightMedSize)
+
+    val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
+    val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
+    var numSkewedLeft = 0
+    var numSkewedRight = 0
+    for (partitionIndex <- 0 until numPartitions) {
+      val leftActualSize = leftActualSizes(partitionIndex)
+      val isLeftSkew = isSkewed(leftActualSize, leftMedSize) && canSplitLeft
+      val leftPartSpec = left.partitionsWithSizes(partitionIndex)._1
+      val isLeftCoalesced = leftPartSpec.startReducerIndex + 1 < leftPartSpec.endReducerIndex
+
+      val rightActualSize = rightActualSizes(partitionIndex)
+      val isRightSkew = isSkewed(rightActualSize, rightMedSize) && canSplitRight
+      val rightPartSpec = right.partitionsWithSizes(partitionIndex)._1
+      val isRightCoalesced = rightPartSpec.startReducerIndex + 1 < rightPartSpec.endReducerIndex
+
+      // A skewed partition should never be coalesced, but skip it here just to be safe.
+      val leftParts = if (isLeftSkew && !isLeftCoalesced) {
+        val reducerId = leftPartSpec.startReducerIndex
+        val skewSpecs = createSkewPartitionSpecs(
+          left.mapStats.shuffleId, reducerId, leftTargetSize)
+        if (skewSpecs.isDefined) {
+          logDebug(s"Left side partition $partitionIndex " +
+            s"(${FileUtils.byteCountToDisplaySize(leftActualSize)}) is skewed, " +
+            s"split it into ${skewSpecs.get.length} parts.")
+          numSkewedLeft += 1
         }
+        skewSpecs.getOrElse(Seq(leftPartSpec))
+      } else {
+        Seq(leftPartSpec)
+      }
 
-        // A skewed partition should never be coalesced, but skip it here just to be safe.
-        val rightParts = if (isRightSkew && !isRightCoalesced) {
-          val reducerId = rightPartSpec.startReducerIndex
-          val skewSpecs = createSkewPartitionSpecs(
-            right.mapStats.shuffleId, reducerId, rightTargetSize)
-          if (skewSpecs.isDefined) {
-            logDebug(s"Right side partition $partitionIndex " +
-              s"(${FileUtils.byteCountToDisplaySize(rightActualSize)}) is skewed, " +
-              s"split it into ${skewSpecs.get.length} parts.")
-            numSkewedRight += 1
-          }
-          skewSpecs.getOrElse(Seq(rightPartSpec))
-        } else {
-          Seq(rightPartSpec)
+      // A skewed partition should never be coalesced, but skip it here just to be safe.
+      val rightParts = if (isRightSkew && !isRightCoalesced) {
+        val reducerId = rightPartSpec.startReducerIndex
+        val skewSpecs = createSkewPartitionSpecs(
+          right.mapStats.shuffleId, reducerId, rightTargetSize)
+        if (skewSpecs.isDefined) {
+          logDebug(s"Right side partition $partitionIndex " +
+            s"(${FileUtils.byteCountToDisplaySize(rightActualSize)}) is skewed, " +
+            s"split it into ${skewSpecs.get.length} parts.")
+          numSkewedRight += 1
         }
+        skewSpecs.getOrElse(Seq(rightPartSpec))
+      } else {
+        Seq(rightPartSpec)
+      }
 
-        for {
-          leftSidePartition <- leftParts
-          rightSidePartition <- rightParts
-        } {
-          leftSidePartitions += leftSidePartition
-          rightSidePartitions += rightSidePartition
-        }
+      for {
+        leftSidePartition <- leftParts
+        rightSidePartition <- rightParts
+      } {
+        leftSidePartitions += leftSidePartition
+        rightSidePartitions += rightSidePartition
       }
+    }
+    logDebug(s"number of skewed partitions: left $numSkewedLeft, right $numSkewedRight")
+    if (numSkewedLeft > 0 || numSkewedRight > 0) {
+      Some((CustomShuffleReaderExec(left.shuffleStage, leftSidePartitions.toSeq),
+        CustomShuffleReaderExec(right.shuffleStage, rightSidePartitions.toSeq)))
+    } else {
+      None
+    }
+  }
 

Review comment:
       Do you mean the `None` ?  Not sure I see your point.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826299092


   **[Test build #137918 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137918/testReport)** for PR 32328 at commit [`d081031`](https://github.com/apache/spark/commit/d081031b54189f4c7059318aacfa03102a78a319).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-826259257


   **[Test build #137906 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137906/testReport)** for PR 32328 at commit [`08a779a`](https://github.com/apache/spark/commit/08a779a24763e4b46ee8e7baf4f68f590c9e1a70).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a change in pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on a change in pull request #32328:
URL: https://github.com/apache/spark/pull/32328#discussion_r621841712



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
##########
@@ -148,7 +148,7 @@ object OptimizeSkewedJoin extends CustomShuffleReaderRule {
   /*
    * This method aim to optimize the skewed join with the following steps:

Review comment:
       Sorry my comment was misleading, the `fallback to smj` there means https://github.com/apache/spark/pull/32210
   
   My thought is that we may take potential OOM into account here,
   let build side A inner join stream side B, for a build partition A_0:
   
   - if A_0 is skewed but is less than a OOM threshold, we may not split it;
   - if A_0 is not skewed but greater than that threshold, we may still need to split it;
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32328: [SPARK-35214][SQL] OptimizeSkewedJoin support ShuffledHashJoinExec

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32328:
URL: https://github.com/apache/spark/pull/32328#issuecomment-828183693


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/138017/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org