You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/03/29 17:14:55 UTC

[GitHub] [spark] cloud-fan commented on a change in pull request #31653: [SPARK-33832][SQL] v2. move OptimzieSkewedJoin to query stage preparation

cloud-fan commented on a change in pull request #31653:
URL: https://github.com/apache/spark/pull/31653#discussion_r603442853



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##########
@@ -89,16 +89,20 @@ case class AdaptiveSparkPlanExec(
     EnsureRequirements,
     RemoveRedundantSorts,
     DisableUnnecessaryBucketedScan
-  ) ++ context.session.sessionState.queryStagePrepRules
+  ) ++ context.session.sessionState.queryStagePrepRules // can be set when creating SparkSession
+
+  private def queryStagePreparationRules2: Seq[Rule[SparkPlan]] = Seq(

Review comment:
       This is not needed if we add `SkewJoinAwareCost`

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
##########
@@ -35,11 +37,17 @@ case class CoalesceShufflePartitions(session: SparkSession) extends CustomShuffl
     if (!conf.coalesceShufflePartitionsEnabled) {
       return plan
     }
-    if (!plan.collectLeaves().forall(_.isInstanceOf[QueryStageExec])
-        || plan.find(_.isInstanceOf[CustomShuffleReaderExec]).isDefined) {
+    /* This is running before new QueryStageExec creation so either all leaves are
+     QueryStageExec nodes or all leaves are CustomShuffleReaderExec if OptimizeSkewJoin
+     mitigated something in the new stage. */

Review comment:
       I agree with this comment, but then where shall we coalesce the partitions?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
##########
@@ -251,48 +253,129 @@ object OptimizeSkewedJoin extends CustomShuffleReaderRule {
       }
   }
 
+  /**
+   * A potential stage is from Exchange down.  Actual [[QueryStageExec]] nodes are created
+   * by [[AdaptiveSparkPlanExec.newQueryStage]] bounded by previously created [[QueryStageExec]]
+   * nodes below.
+   * Todo: need better way to identify which join the log msgs below refer to.  Tags?
+   */
+  private def handlePotentialQueryStage(plan: SparkPlan): SparkPlan = {
+    val shuffleStages = collectShuffleStages(plan)
+    val s = ExplainUtils.getAQELogPrefix(shuffleStages)
+
+    if (shuffleStages.length != 2 && !conf.adaptiveForceIfShuffle) {
+      /* Consider Case II.  Shuffle above SMJ1.  We should see 3 SQSE nodes but
+       with adaptiveForceIfShuffle() we should be able to add a new shuffle
+       above SMJ2 to enable skew mitigation of SMJ2.  W/o ability to add a new
+       shuffle skew mitigation is still possible in some cases - to be handled later.
+
+       Add a test for this.
+       See test("skew in deeply nested join - test ShuffleAddedException") and
+       add a similar test with just 2 joins */
+      logInfo(s"OptimizeSkewedJoin: rule is not applied since" +
+        s" shuffleStages.length=${shuffleStages.length} != 2 and " +
+        s"${SQLConf.ADAPTIVE_FORCE_IF_SHUFFLE.key}=false; $s")
+      return plan
+    }
+    val numShufflesBefore = plan.collect {
+      case e: ShuffleExchangeExec => e
+    }.length
+    val mitigatedPlan = optimizeSkewJoin(plan)
+    if (mitigatedPlan eq plan) {
+      return plan
+    }
+    val executedPlan = ensureRequirements.apply(mitigatedPlan)
+    val numNewShuffles = executedPlan.collect {
+      case e: ShuffleExchangeExec => e
+    }.length - numShufflesBefore
+    if(numNewShuffles > 0) {
+      if (conf.adaptiveForceIfShuffle) {
+        logInfo(s"OptimizeSkewedJoin: rule is applied. " +
+          s"$numNewShuffles additional shuffles will be introduced; $s")
+        executedPlan // make sure to return plan with new shuffles
+      } else {
+        logInfo(s"OptimizeSkewedJoin: rule is not applied due" +
+          s" to $numNewShuffles additional shuffles will be introduced; $s")
+        plan
+      }
+    } else {
+      executedPlan
+    }
+  }
+
+  def collectShuffleStages(plan: SparkPlan): Seq[ShuffleQueryStageExec] = plan match {
+    case stage: ShuffleQueryStageExec => Seq(stage)
+    case _ => plan.children.flatMap(collectShuffleStages)
+  }
+  /**
+   * Now this runs as part of queryStagePreparationRules() which means it runs over the whole plan
+   * which may have any number of ExchangeExec nodes, i.e. multiple "query stages"

Review comment:
       Currently this rule is very limited: it only applies to SMJ with 2 shuffle query stages as the direct children.
   
   I don't think it's a big difference if we move this rule to `queryStagePreparationRules`. We still need to match SMJ with 2 shuffle query stages as the direct children, with an additional check that the query stages must be materialized, which is guaranteed for query stage optimization rules.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
##########
@@ -87,8 +107,12 @@ case class CustomShuffleReaderExec private(
     Iterator(desc)
   }
 
-  def hasCoalescedPartition: Boolean =
+  def hasCoalescedPartition: Boolean = {
+    // shouldn't this check that at least some index ranges are > 1?
+    // otherwise it's just reading original shuffle results
+    // that is how OptimizeSkewedJoin.optimizeSkewJoin defines it

Review comment:
       Good point. We can fix it separatedly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org