You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/09/10 07:41:36 UTC

[GitHub] [spark] maropu commented on a change in pull request #29692: [SPARK-32830][SQL] Optimize Skewed BroadcastNestedLoopJoin with AE

maropu commented on a change in pull request #29692:
URL: https://github.com/apache/spark/pull/29692#discussion_r486121989



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
##########
@@ -248,6 +249,74 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
       } else {
         smj
       }
+
+    case bnl @ BroadcastNestedLoopJoinExec(leftChild, rightChild, buildSide, joinType, _, _) =>
+      def resolveBroadcastNLJoinSkew(
+          stream: ShuffleStageInfo,
+          joinType: JoinType,
+          buildSide: BuildSide): SparkPlan = {
+        val streamMedSize = medianSize(stream.mapStats)
+        val numPartitions = stream.partitionsWithSizes.length
+        logDebug(
+          s"""
+             |Optimizing skewed join.
+             |Build Side:
+             |${buildSide}
+             |Stream side partitions size info:
+             |${getSizeInfo(streamMedSize, stream.mapStats.bytesByPartitionId)}
+        """.stripMargin)
+        val canSplitStream = canSplitLeftSide(joinType)

Review comment:
       Is this really correct? How about the case: `BuildLeft` and right-outer?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
##########
@@ -248,6 +249,74 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
       } else {
         smj
       }
+
+    case bnl @ BroadcastNestedLoopJoinExec(leftChild, rightChild, buildSide, joinType, _, _) =>
+      def resolveBroadcastNLJoinSkew(
+          stream: ShuffleStageInfo,
+          joinType: JoinType,
+          buildSide: BuildSide): SparkPlan = {
+        val streamMedSize = medianSize(stream.mapStats)
+        val numPartitions = stream.partitionsWithSizes.length
+        logDebug(
+          s"""
+             |Optimizing skewed join.
+             |Build Side:
+             |${buildSide}
+             |Stream side partitions size info:
+             |${getSizeInfo(streamMedSize, stream.mapStats.bytesByPartitionId)}

Review comment:
       nit: How about this update?
   ```
   Optimizing skewed join.
   ${if (buildSide == BuildLeft) "Left" else "Right"} stream side partitions size info:
   ${getSizeInfo(streamMedSize, stream.mapStats.bytesByPartitionId)}
   ```

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
##########
@@ -248,6 +249,74 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
       } else {
         smj
       }
+
+    case bnl @ BroadcastNestedLoopJoinExec(leftChild, rightChild, buildSide, joinType, _, _) =>
+      def resolveBroadcastNLJoinSkew(
+          stream: ShuffleStageInfo,
+          joinType: JoinType,
+          buildSide: BuildSide): SparkPlan = {
+        val streamMedSize = medianSize(stream.mapStats)
+        val numPartitions = stream.partitionsWithSizes.length
+        logDebug(
+          s"""
+             |Optimizing skewed join.
+             |Build Side:
+             |${buildSide}
+             |Stream side partitions size info:
+             |${getSizeInfo(streamMedSize, stream.mapStats.bytesByPartitionId)}
+        """.stripMargin)
+        val canSplitStream = canSplitLeftSide(joinType)
+        val streamActualSizes = stream.partitionsWithSizes.map(_._2)
+        val streamTargetSize = targetSize(streamActualSizes, streamMedSize)
+        val streamSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
+        var numSkewedStream = 0
+        for (partitionIndex <- 0 until numPartitions) {
+          val streamActualSize = streamActualSizes(partitionIndex)
+          val isStreamSkew = isSkewed(streamActualSize, streamMedSize) && canSplitStream
+          val streamPartSpec = stream.partitionsWithSizes(partitionIndex)._1
+          val isStreamCoalesced =
+            streamPartSpec.startReducerIndex + 1 < streamPartSpec.endReducerIndex
+          // A skewed partition should never be coalesced, but skip it here just to be safe.
+          val streamParts = if (isStreamSkew && !isStreamCoalesced) {
+            val reducerId = streamPartSpec.startReducerIndex
+            val skewSpecs = createSkewPartitionSpecs(
+              stream.mapStats.shuffleId, reducerId, streamTargetSize)
+            if (skewSpecs.isDefined) {
+              logDebug(s"Stream side partition $partitionIndex " +
+                s"(${FileUtils.byteCountToDisplaySize(streamActualSize)}) is skewed, " +
+                s"split it into ${skewSpecs.get.length} parts.")
+              numSkewedStream += 1
+            }
+            skewSpecs.getOrElse(Seq(streamPartSpec))
+          } else {
+            Seq(streamPartSpec)
+          }
+
+          for {
+            streamSidePartition <- streamParts
+          } {
+            streamSidePartitions += streamSidePartition
+          }
+        }
+
+        logDebug(s"number of skewed partitions: left $numSkewedStream")
+        if (numSkewedStream > 0) {
+          val newStream = CustomShuffleReaderExec(stream.shuffleStage, streamSidePartitions.toSeq)
+          buildSide match {
+            case BuildRight => bnl.copy(left = newStream, right = bnl.right, isSkewJoin = true)
+            case BuildLeft => bnl.copy(left = bnl.left, right = newStream, isSkewJoin = true)
+          }
+        } else {
+          bnl
+        }
+      }
+
+      (leftChild, rightChild, buildSide) match {
+        case (ShuffleStage(left: ShuffleStageInfo), _, BuildRight) =>
+          resolveBroadcastNLJoinSkew(left, joinType, buildSide)
+        case (_, ShuffleStage(right: ShuffleStageInfo), BuildLeft) =>
+          resolveBroadcastNLJoinSkew(right, joinType, buildSide)

Review comment:
       Looks non-match cases can happen here? How about it like this instead?
   ```
       case bnl @ BroadcastNestedLoopJoinExec(
           ShuffleStage(left: ShuffleStageInfo), _, BuildRight, joinType, _, _) =>
         resolveBroadcastNLJoinSkew(left, joinType, BuildRight)
       case bnl @ BroadcastNestedLoopJoinExec(
           _, ShuffleStage(right: ShuffleStageInfo), BuildLeft, joinType, _, _) =>
         resolveBroadcastNLJoinSkew(right, joinType, BuildLeft)
   ```

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
##########
@@ -248,6 +249,74 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
       } else {
         smj
       }
+
+    case bnl @ BroadcastNestedLoopJoinExec(leftChild, rightChild, buildSide, joinType, _, _) =>
+      def resolveBroadcastNLJoinSkew(

Review comment:
       This is just a suggestion; could we share code between smj and bnl cases? Most parts look duplicated. 

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
##########
@@ -248,6 +249,74 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
       } else {
         smj
       }
+
+    case bnl @ BroadcastNestedLoopJoinExec(leftChild, rightChild, buildSide, joinType, _, _) =>
+      def resolveBroadcastNLJoinSkew(
+          stream: ShuffleStageInfo,
+          joinType: JoinType,
+          buildSide: BuildSide): SparkPlan = {
+        val streamMedSize = medianSize(stream.mapStats)
+        val numPartitions = stream.partitionsWithSizes.length
+        logDebug(
+          s"""
+             |Optimizing skewed join.
+             |Build Side:
+             |${buildSide}
+             |Stream side partitions size info:
+             |${getSizeInfo(streamMedSize, stream.mapStats.bytesByPartitionId)}
+        """.stripMargin)
+        val canSplitStream = canSplitLeftSide(joinType)
+        val streamActualSizes = stream.partitionsWithSizes.map(_._2)
+        val streamTargetSize = targetSize(streamActualSizes, streamMedSize)
+        val streamSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
+        var numSkewedStream = 0
+        for (partitionIndex <- 0 until numPartitions) {
+          val streamActualSize = streamActualSizes(partitionIndex)
+          val isStreamSkew = isSkewed(streamActualSize, streamMedSize) && canSplitStream

Review comment:
       If `canSplitStream=false`, we need to process this for loop?

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
##########
@@ -739,6 +745,67 @@ class AdaptiveQueryExecSuite
     }
   }
 
+  test("SPARK-32830: Optimize Skewed BroadcastNestedLoopJoin") {
+    withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+      SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key -> "1k",
+      SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "5k",
+      SQLConf.SKEW_JOIN_SKEWED_PARTITION_FACTOR.key -> "3") {
+      withTempView("skewData1", "skewData2") {
+        spark
+          .range(0, 10000, 1, 50)
+          .select(
+            when('id > 1000, 100)
+              .when('id <= 1000, 'id % 10)
+              .otherwise('id).as("key1"),
+            'id as "value1", ('id + "str").as("str"))
+          .createOrReplaceTempView("skewData1")
+        spark
+          .range(5000, 10000, 1, 10)
+          .select(('id % 10).as("key2"),
+            'id as "value2", ('id + "str").as("str"))
+          .createOrReplaceTempView("skewData2")
+
+        def checkSkewJoin(
+            joins: Seq[BroadcastNestedLoopJoinExec],
+            leftSkewNum: Int,
+            rightSkewNum: Int): Unit = {
+          assert(joins.size == 1 && joins.head.isSkewJoin)
+          if (leftSkewNum > 0) {
+            assert(joins.head.left.collect {
+              case r: CustomShuffleReaderExec => r
+            }.head.partitionSpecs.collect {
+              case p: PartialReducerPartitionSpec => p.reducerIndex
+            }.distinct.length == leftSkewNum)
+          }
+          if (rightSkewNum > 0) {
+            assert(joins.head.right.collect {
+              case r: CustomShuffleReaderExec => r
+            }.head.partitionSpecs.collect {
+              case p: PartialReducerPartitionSpec => p.reducerIndex
+            }.distinct.length == rightSkewNum)
+          }
+        }
+
+        val (_, adaptivePlan1) = runAdaptiveAndVerifyResult(
+          """
+             |SELECT * FROM (SELECT /*+ REPARTITION(key1) */ * FROM skewData1) skewData1
+             |JOIN skewData2 ON key1 < key2

Review comment:
       I think we need to add more test cases for the other join types, e.g., left-outer, right-outer..




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org