You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/07/07 12:04:22 UTC

[GitHub] [spark] LantaoJin commented on a change in pull request #29021: [WIP][SPARK-32201][SQL] More general skew join pattern matching

LantaoJin commented on a change in pull request #29021:
URL: https://github.com/apache/spark/pull/29021#discussion_r450666963



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
##########
@@ -730,6 +713,67 @@ class AdaptiveQueryExecSuite
     }
   }
 
+  private def checkSkewJoin(
+      joins: Seq[SortMergeJoinExec],
+      leftSkewNum: Int,
+      rightSkewNum: Int): Unit = {
+    assert(joins.size == 1 && joins.head.isSkewJoin)
+    assert(joins.head.left.collect {
+      case r: CustomShuffleReaderExec => r
+    }.head.partitionSpecs.collect {
+      case p: PartialReducerPartitionSpec => p.reducerIndex
+    }.distinct.length == leftSkewNum)
+    assert(joins.head.right.collect {
+      case r: CustomShuffleReaderExec => r
+    }.head.partitionSpecs.collect {
+      case p: PartialReducerPartitionSpec => p.reducerIndex
+    }.distinct.length == rightSkewNum)
+  }
+
+  test("SPARK-32201: handle general skew join pattern") {

Review comment:
       It's hard to build a test case like the case in PR description. So I just want to give a simple one. But the rule is same. In this test case, we build a SMJ like:
   ```
   SMJ
        Sort
          CustomShuffleReader(coalesced)
            Shuffle
        Sort
          HashAggregate
            CustomShuffleReader(coalesced)
              Shuffle
   ```
   It has already breaked current matching pattern. The plan will be optimized to
   ```
   SMJ
        Sort
          CustomShuffleReader(skewed)
            Shuffle
        Sort
          HashAggregate
            CustomShuffleReader(coalesced)
              Shuffle
   ```
   We don't split the shuffle side with any AggregateExec.
   
   But this test case will fail since below condition:
   https://github.com/apache/spark/blob/5d296ed39e3dd79ddb10c68657e773adba40a5e0/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala#L274-L281
   In this test case the HashAggregate will introduce an additional exchange. But this is not related to this PR. They are totally irrelevant.
   
   The plan before patch is
   ```
   AdaptiveSparkPlan(isFinalPlan=true)
   +- *(5) SortMergeJoin [key1#183L], [key2#189L], Inner, false
      :- *(3) Sort [key1#183L ASC NULLS FIRST], false, 0
      :  +- CustomShuffleReader coalesced
      :     +- ShuffleQueryStage 0
      :        +- Exchange hashpartitioning(key1#183L, 100), true
      :           +- *(1) Project [CASE WHEN (id#181L < 250) THEN 249 WHEN (id#181L >= 750) THEN 1000 ELSE id#181L END AS key1#183L, id#181L AS value1#184L]
      :              +- *(1) Range (0, 1000, step=1, splits=10)
      +- *(4) Sort [key2#189L ASC NULLS FIRST], false, 0
         +- *(4) HashAggregate(keys=[key2#189L], functions=[sum(value2#190L)], output=[key2#189L, sum2#193L])
            +- CustomShuffleReader coalesced
               +- ShuffleQueryStage 1
                  +- Exchange hashpartitioning(key2#189L, 100), true
                     +- *(2) HashAggregate(keys=[key2#189L], functions=[partial_sum(value2#190L)], output=[key2#189L, sum#200L])
                        +- *(2) Project [CASE WHEN (id#187L < 250) THEN 249 ELSE id#187L END AS key2#189L, id#187L AS value2#190L]
                           +- *(2) Range (0, 1000, step=1, splits=10)
   ```
   The plan after is
   ```
   AdaptiveSparkPlan(isFinalPlan=true)
   +- *(5) SortMergeJoin(skew=true) [key1#183L], [key2#189L], Inner, true
      :- *(3) Sort [key1#183L ASC NULLS FIRST], false, 0
      :  +- CustomShuffleReader coalesced and skewed
      :     +- ShuffleQueryStage 0
      :        +- Exchange hashpartitioning(key1#183L, 100), true
      :           +- *(1) Project [CASE WHEN (id#181L < 250) THEN 249 WHEN (id#181L >= 750) THEN 1000 ELSE id#181L END AS key1#183L, id#181L AS value1#184L]
      :              +- *(1) Range (0, 1000, step=1, splits=10)
      +- *(4) Sort [key2#189L ASC NULLS FIRST], false, 0
         +- *(4) HashAggregate(keys=[key2#189L], functions=[sum(value2#190L)], output=[key2#189L, sum2#193L])
            +- CustomShuffleReader coalesced
               +- ShuffleQueryStage 1
                  +- Exchange hashpartitioning(key2#189L, 100), true
                     +- *(2) HashAggregate(keys=[key2#189L], functions=[partial_sum(value2#190L)], output=[key2#189L, sum#200L])
                        +- *(2) Project [CASE WHEN (id#187L < 250) THEN 249 ELSE id#187L END AS key2#189L, id#187L AS value2#190L]
                           +- *(2) Range (0, 1000, step=1, splits=10)
   ```
   
   But the ensureRequirements will change to
   ```
   SortMergeJoin(skew=true) [key1#220L], [key2#226L], Inner
   :- Sort [key1#220L ASC NULLS FIRST], false, 0
   :  +- CustomShuffleReader coalesced and skewed
   :     +- ShuffleQueryStage 0
   :        +- Exchange hashpartitioning(key1#220L, 100), true, [id=#116]
   :           +- *(1) Project [CASE WHEN (id#218L < 250) THEN 249 WHEN (id#218L >= 750) THEN 1000 ELSE id#218L END AS key1#220L, id#218L AS value1#221L]
   :              +- *(1) Range (0, 1000, step=1, splits=10)
   +- Sort [key2#226L ASC NULLS FIRST], false, 0
      +- HashAggregate(keys=[key2#226L], functions=[sum(value2#227L)], output=[key2#226L, sum2#230L])
         +- Exchange hashpartitioning(key2#226L, 100), true, [id=#172]
            +- CustomShuffleReader coalesced
               +- ShuffleQueryStage 1
                  +- Exchange hashpartitioning(key2#226L, 100), true, [id=#127]
                     +- *(2) HashAggregate(keys=[key2#226L], functions=[partial_sum(value2#227L)], output=[key2#226L, sum#237L])
                        +- *(2) Project [CASE WHEN (id#224L < 250) THEN 249 ELSE id#224L END AS key2#226L, id#224L AS value2#227L]
                           +- *(2) Range (0, 1000, step=1, splits=10)
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org