You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/02/06 16:29:22 UTC

[GitHub] [spark] gatorsmile commented on a change in pull request #27452: [SPARK-30719][SQL] do not log warning if AQE is intentionally skipped and add a config to force apply

gatorsmile commented on a change in pull request #27452: [SPARK-30719][SQL] do not log warning if AQE is intentionally skipped and add a config to force apply
URL: https://github.com/apache/spark/pull/27452#discussion_r375942145
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
 ##########
 @@ -40,49 +40,60 @@ case class InsertAdaptiveSparkPlan(
 
   private val conf = adaptiveExecutionContext.session.sessionState.conf
 
-  def containShuffle(plan: SparkPlan): Boolean = {
-    plan.find {
-      case _: Exchange => true
-      case s: SparkPlan => !s.requiredChildDistribution.forall(_ == UnspecifiedDistribution)
-    }.isDefined
-  }
-
-  def containSubQuery(plan: SparkPlan): Boolean = {
-    plan.find(_.expressions.exists(_.find {
-      case _: SubqueryExpression => true
-      case _ => false
-    }.isDefined)).isDefined
-  }
-
   override def apply(plan: SparkPlan): SparkPlan = applyInternal(plan, false)
 
   private def applyInternal(plan: SparkPlan, isSubquery: Boolean): SparkPlan = plan match {
+    case _ if !conf.adaptiveExecutionEnabled => plan
     case _: ExecutedCommandExec => plan
-    case _ if conf.adaptiveExecutionEnabled && supportAdaptive(plan)
-      && (isSubquery || containShuffle(plan) || containSubQuery(plan)) =>
-      try {
-        // Plan sub-queries recursively and pass in the shared stage cache for exchange reuse. Fall
-        // back to non-adaptive mode if adaptive execution is supported in any of the sub-queries.
-        val subqueryMap = buildSubqueryMap(plan)
-        val planSubqueriesRule = PlanAdaptiveSubqueries(subqueryMap)
-        val preprocessingRules = Seq(
-          planSubqueriesRule)
-        // Run pre-processing rules.
-        val newPlan = AdaptiveSparkPlanExec.applyPhysicalRules(plan, preprocessingRules)
-        logDebug(s"Adaptive execution enabled for plan: $plan")
-        AdaptiveSparkPlanExec(newPlan, adaptiveExecutionContext, preprocessingRules, isSubquery)
-      } catch {
-        case SubqueryAdaptiveNotSupportedException(subquery) =>
-          logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled " +
-            s"but is not supported for sub-query: $subquery.")
-          plan
-      }
-    case _ =>
-      if (conf.adaptiveExecutionEnabled) {
+    case _ if shouldApplyAQE(plan, isSubquery) =>
+      if (supportAdaptive(plan)) {
+        try {
+          // Plan sub-queries recursively and pass in the shared stage cache for exchange reuse.
+          // Fall back to non-AQE mode if AQE is not supported in any of the sub-queries.
+          val subqueryMap = buildSubqueryMap(plan)
+          val planSubqueriesRule = PlanAdaptiveSubqueries(subqueryMap)
+          val preprocessingRules = Seq(
+            planSubqueriesRule)
+          // Run pre-processing rules.
+          val newPlan = AdaptiveSparkPlanExec.applyPhysicalRules(plan, preprocessingRules)
+          logDebug(s"Adaptive execution enabled for plan: $plan")
+          AdaptiveSparkPlanExec(newPlan, adaptiveExecutionContext, preprocessingRules, isSubquery)
+        } catch {
+          case SubqueryAdaptiveNotSupportedException(subquery) =>
+            logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled " +
+              s"but is not supported for sub-query: $subquery.")
+            plan
+        }
+      } else {
         logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled " +
           s"but is not supported for query: $plan.")
+        plan
       }
-      plan
+
+    case _ => plan
+  }
+
+  // AQE is only useful when the query has exchanges or sub-queries. This method returns true if
+  // one of the following conditions is satisfied:
+  //   - The config ADAPTIVE_EXECUTION_FORCE_APPLY is true.
+  //   - The input query is from a sub-query. When this happens, it means we've already decided to
+  //     apply AQE for the main query and we must continue to do it.
+  //   - The query contains exchanges.
+  //   - The query may need to add exchanges. It's an overkill to run `EnsureRequirements` here, so
+  //     we just check `SparkPlan.requiredChildDistribution` and see if it's possible that the
+  //     the query needs to add exchanges later.
+  //   - The query contains sub-query.
 
 Review comment:
   This is much clear than the original code. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org