You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/05/30 08:12:22 UTC

[spark] branch branch-3.0 updated: [SPARK-31864][SQL] Adjust AQE skew join trigger condition

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 51a68d9  [SPARK-31864][SQL] Adjust AQE skew join trigger condition
51a68d9 is described below

commit 51a68d9eac389a82f3137e5d6e60bfb130d172d1
Author: Maryann Xue <ma...@gmail.com>
AuthorDate: Sat May 30 07:47:29 2020 +0000

    [SPARK-31864][SQL] Adjust AQE skew join trigger condition
    
    This PR makes a minor change in deciding whether a partition is skewed by comparing the partition size to the median size of coalesced partitions instead of median size of raw partitions before coalescing.
    
    This change is line with target size criteria of splitting skew join partitions and can also cope with situations of extra empty partitions caused by over-partitioning. This PR has also improved skew join tests in AQE tests.
    
    No.
    
    Updated UTs.
    
    Closes #28669 from maryannxue/spark-31864.
    
    Authored-by: Maryann Xue <ma...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit b9737c3c228f465d332e41f1ea0cece2a5f7667e)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    |  2 +-
 .../execution/adaptive/OptimizeSkewedJoin.scala    | 31 +++++------
 .../adaptive/AdaptiveQueryExecSuite.scala          | 65 +++++++++-------------
 3 files changed, 42 insertions(+), 56 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index d0b55ce..b36c01d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -494,7 +494,7 @@ object SQLConf {
       .version("3.0.0")
       .intConf
       .checkValue(_ > 0, "The skew factor must be positive.")
-      .createWithDefault(10)
+      .createWithDefault(5)
 
   val SKEW_JOIN_SKEWED_PARTITION_THRESHOLD =
     buildConf("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
index d94999a..91ae0b9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
@@ -21,7 +21,7 @@ import scala.collection.mutable
 
 import org.apache.commons.io.FileUtils
 
-import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv}
+import org.apache.spark.{MapOutputTrackerMaster, SparkEnv}
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution._
@@ -70,9 +70,9 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
       size > conf.getConf(SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD)
   }
 
-  private def medianSize(stats: MapOutputStatistics): Long = {
-    val numPartitions = stats.bytesByPartitionId.length
-    val bytes = stats.bytesByPartitionId.sorted
+  private def medianSize(sizes: Seq[Long]): Long = {
+    val numPartitions = sizes.length
+    val bytes = sizes.sorted
     numPartitions match {
       case _ if (numPartitions % 2 == 0) =>
         math.max((bytes(numPartitions / 2) + bytes(numPartitions / 2 - 1)) / 2, 1)
@@ -160,16 +160,16 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
         if supportedJoinTypes.contains(joinType) =>
       assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length)
       val numPartitions = left.partitionsWithSizes.length
-      // We use the median size of the original shuffle partitions to detect skewed partitions.
-      val leftMedSize = medianSize(left.mapStats)
-      val rightMedSize = medianSize(right.mapStats)
+      // Use the median size of the actual (coalesced) partition sizes to detect skewed partitions.
+      val leftMedSize = medianSize(left.partitionsWithSizes.map(_._2))
+      val rightMedSize = medianSize(right.partitionsWithSizes.map(_._2))
       logDebug(
         s"""
           |Optimizing skewed join.
           |Left side partitions size info:
-          |${getSizeInfo(leftMedSize, left.mapStats.bytesByPartitionId)}
+          |${getSizeInfo(leftMedSize, left.partitionsWithSizes.map(_._2))}
           |Right side partitions size info:
-          |${getSizeInfo(rightMedSize, right.mapStats.bytesByPartitionId)}
+          |${getSizeInfo(rightMedSize, right.partitionsWithSizes.map(_._2))}
         """.stripMargin)
       val canSplitLeft = canSplitLeftSide(joinType)
       val canSplitRight = canSplitRightSide(joinType)
@@ -287,17 +287,15 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
 private object ShuffleStage {
   def unapply(plan: SparkPlan): Option[ShuffleStageInfo] = plan match {
     case s: ShuffleQueryStageExec if s.mapStats.isDefined =>
-      val mapStats = s.mapStats.get
-      val sizes = mapStats.bytesByPartitionId
+      val sizes = s.mapStats.get.bytesByPartitionId
       val partitions = sizes.zipWithIndex.map {
         case (size, i) => CoalescedPartitionSpec(i, i + 1) -> size
       }
-      Some(ShuffleStageInfo(s, mapStats, partitions))
+      Some(ShuffleStageInfo(s, partitions))
 
     case CustomShuffleReaderExec(s: ShuffleQueryStageExec, partitionSpecs, _)
-      if s.mapStats.isDefined =>
-      val mapStats = s.mapStats.get
-      val sizes = mapStats.bytesByPartitionId
+      if s.mapStats.isDefined && partitionSpecs.nonEmpty =>
+      val sizes = s.mapStats.get.bytesByPartitionId
       val partitions = partitionSpecs.map {
         case spec @ CoalescedPartitionSpec(start, end) =>
           var sum = 0L
@@ -310,7 +308,7 @@ private object ShuffleStage {
         case other => throw new IllegalArgumentException(
           s"Expect CoalescedPartitionSpec but got $other")
       }
-      Some(ShuffleStageInfo(s, mapStats, partitions))
+      Some(ShuffleStageInfo(s, partitions))
 
     case _ => None
   }
@@ -318,7 +316,6 @@ private object ShuffleStage {
 
 private case class ShuffleStageInfo(
     shuffleStage: ShuffleQueryStageExec,
-    mapStats: MapOutputStatistics,
     partitionsWithSizes: Seq[(CoalescedPartitionSpec, Long)])
 
 private class SkewDesc {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 37e8e13..fed7cdc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -25,7 +25,7 @@ import org.apache.log4j.Level
 import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerJobStart}
 import org.apache.spark.sql.{QueryTest, Row, SparkSession, Strategy}
 import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
-import org.apache.spark.sql.execution.{ReusedSubqueryExec, ShuffledRowRDD, SparkPlan}
+import org.apache.spark.sql.execution.{PartialReducerPartitionSpec, ReusedSubqueryExec, ShuffledRowRDD, SparkPlan}
 import org.apache.spark.sql.execution.adaptive.OptimizeLocalShuffleReader.LOCAL_SHUFFLE_READER_DESCRIPTION
 import org.apache.spark.sql.execution.command.DataWritingCommandExec
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ReusedExchangeExec}
@@ -616,76 +616,65 @@ class AdaptiveQueryExecSuite
     }
   }
 
-  // TODO: we need a way to customize data distribution after shuffle, to improve test coverage
-  //       of this case.
   test("SPARK-29544: adaptive skew join with different join types") {
     withSQLConf(
       SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
       SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
-      SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "2000",
-      SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "2000") {
+      SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
+      SQLConf.SHUFFLE_PARTITIONS.key -> "100",
+      SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "800",
+      SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "800") {
       withTempView("skewData1", "skewData2") {
         spark
           .range(0, 1000, 1, 10)
-          .selectExpr("id % 2 as key1", "id as value1")
+          .select(
+            when('id < 250, 249)
+              .when('id >= 750, 1000)
+              .otherwise('id).as("key1"),
+            'id as "value1")
           .createOrReplaceTempView("skewData1")
         spark
           .range(0, 1000, 1, 10)
-          .selectExpr("id % 1 as key2", "id as value2")
+          .select(
+            when('id < 250, 249)
+              .otherwise('id).as("key2"),
+            'id as "value2")
           .createOrReplaceTempView("skewData2")
 
-        def checkSkewJoin(joins: Seq[SortMergeJoinExec], expectedNumPartitions: Int): Unit = {
+        def checkSkewJoin(
+            joins: Seq[SortMergeJoinExec],
+            leftSkewNum: Int,
+            rightSkewNum: Int): Unit = {
           assert(joins.size == 1 && joins.head.isSkewJoin)
           assert(joins.head.left.collect {
             case r: CustomShuffleReaderExec => r
-          }.head.partitionSpecs.length == expectedNumPartitions)
+          }.head.partitionSpecs.collect {
+            case p: PartialReducerPartitionSpec => p.reducerIndex
+          }.distinct.length == leftSkewNum)
           assert(joins.head.right.collect {
             case r: CustomShuffleReaderExec => r
-          }.head.partitionSpecs.length == expectedNumPartitions)
+          }.head.partitionSpecs.collect {
+            case p: PartialReducerPartitionSpec => p.reducerIndex
+          }.distinct.length == rightSkewNum)
         }
 
         // skewed inner join optimization
         val (_, innerAdaptivePlan) = runAdaptiveAndVerifyResult(
           "SELECT * FROM skewData1 join skewData2 ON key1 = key2")
-        // left stats: [3496, 0, 0, 0, 4014]
-        // right stats:[6292, 0, 0, 0, 0]
-        // Partition 0: both left and right sides are skewed, left side is divided
-        //              into 2 splits and right side is divided into 4 splits, so
-        //              2 x 4 sub-partitions.
-        // Partition 1, 2, 3: not skewed, and coalesced into 1 partition.
-        // Partition 4: only left side is skewed, and divide into 2 splits, so
-        //              2 sub-partitions.
-        // So total (8 + 1 + 3) partitions.
         val innerSmj = findTopLevelSortMergeJoin(innerAdaptivePlan)
-        checkSkewJoin(innerSmj, 8 + 1 + 2)
+        checkSkewJoin(innerSmj, 2, 1)
 
         // skewed left outer join optimization
         val (_, leftAdaptivePlan) = runAdaptiveAndVerifyResult(
           "SELECT * FROM skewData1 left outer join skewData2 ON key1 = key2")
-        // left stats: [3496, 0, 0, 0, 4014]
-        // right stats:[6292, 0, 0, 0, 0]
-        // Partition 0: both left and right sides are skewed, but left join can't split right side,
-        //              so only left side is divided into 2 splits, and thus 2 sub-partitions.
-        // Partition 1, 2, 3: not skewed, and coalesced into 1 partition.
-        // Partition 4: only left side is skewed, and divide into 2 splits, so
-        //              2 sub-partitions.
-        // So total (2 + 1 + 2) partitions.
         val leftSmj = findTopLevelSortMergeJoin(leftAdaptivePlan)
-        checkSkewJoin(leftSmj, 2 + 1 + 2)
+        checkSkewJoin(leftSmj, 2, 0)
 
         // skewed right outer join optimization
         val (_, rightAdaptivePlan) = runAdaptiveAndVerifyResult(
           "SELECT * FROM skewData1 right outer join skewData2 ON key1 = key2")
-        // left stats: [3496, 0, 0, 0, 4014]
-        // right stats:[6292, 0, 0, 0, 0]
-        // Partition 0: both left and right sides are skewed, but right join can't split left side,
-        //              so only right side is divided into 4 splits, and thus 4 sub-partitions.
-        // Partition 1, 2, 3: not skewed, and coalesced into 1 partition.
-        // Partition 4: only left side is skewed, but right join can't split left side, so just
-        //              1 partition.
-        // So total (4 + 1 + 1) partitions.
         val rightSmj = findTopLevelSortMergeJoin(rightAdaptivePlan)
-        checkSkewJoin(rightSmj, 4 + 1 + 1)
+        checkSkewJoin(rightSmj, 0, 1)
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org