You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/05/06 12:55:11 UTC

[spark] branch branch-3.0 updated: [SPARK-31650][SQL] Fix wrong UI in case of AdaptiveSparkPlanExec has unmanaged subqueries

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new c1b5a4f  [SPARK-31650][SQL] Fix wrong UI in case of AdaptiveSparkPlanExec has unmanaged subqueries
c1b5a4f is described below

commit c1b5a4f1877d057973cb0667cdbb7c27550033b8
Author: yi.wu <yi...@databricks.com>
AuthorDate: Wed May 6 12:52:53 2020 +0000

    [SPARK-31650][SQL] Fix wrong UI in case of AdaptiveSparkPlanExec has unmanaged subqueries
    
    ### What changes were proposed in this pull request?
    
    Make the non-subquery `AdaptiveSparkPlanExec` update UI again after execute/executeCollect/executeTake/executeTail if the `AdaptiveSparkPlanExec` has subqueries which do not belong to any query stages.
    
    ### Why are the changes needed?
    
    If there're subqueries do not belong to any query stages of the main query, the main query could get final physical plan and update UI before those subqueries finished. As a result, the UI can not reflect the change from the subqueries, e.g. new nodes generated from subqueries.
    
    Before:
    
    <img width="335" alt="before_aqe_ui" src="https://user-images.githubusercontent.com/16397174/81149758-671a9480-8fb1-11ea-84c4-9a4520e2b08e.png">
    
    After:
    <img width="546" alt="after_aqe_ui" src="https://user-images.githubusercontent.com/16397174/81149752-63870d80-8fb1-11ea-9852-f41e11afe216.png">
    
    ### Does this PR introduce _any_ user-facing change?
    
    No(AQE feature hasn't been released).
    
    ### How was this patch tested?
    
    Tested manually.
    
    Closes #28460 from Ngone51/fix_aqe_ui.
    
    Authored-by: yi.wu <yi...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit b16ea8e1ab58bd24c50d31ce0dfc6c79c87fa3b2)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../execution/adaptive/AdaptiveSparkPlanExec.scala | 41 ++++++++++++++++------
 .../adaptive/AdaptiveQueryExecSuite.scala          |  7 ++--
 2 files changed, 36 insertions(+), 12 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index f00dce2..cd6936b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -138,6 +138,13 @@ case class AdaptiveSparkPlanExec(
     executedPlan.resetMetrics()
   }
 
+  private def getExecutionId: Option[Long] = {
+    // If the `QueryExecution` does not match the current execution ID, it means the execution ID
+    // belongs to another (parent) query, and we should not call update UI in this query.
+    Option(context.session.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY))
+      .map(_.toLong).filter(SQLExecution.getQueryExecution(_) eq context.qe)
+  }
+
   private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized {
     if (isFinalPlan) return currentPhysicalPlan
 
@@ -145,11 +152,7 @@ case class AdaptiveSparkPlanExec(
     // `plan.queryExecution.rdd`, we need to set active session here as new plan nodes can be
     // created in the middle of the execution.
     context.session.withActive {
-      // If the `QueryExecution` does not match the current execution ID, it means the execution ID
-      // belongs to another (parent) query, and we should not call update UI in this query.
-      val executionId =
-        Option(context.session.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY))
-          .map(_.toLong).filter(SQLExecution.getQueryExecution(_) eq context.qe)
+      val executionId = getExecutionId
       var currentLogicalPlan = currentPhysicalPlan.logicalLink.get
       var result = createQueryStages(currentPhysicalPlan)
       val events = new LinkedBlockingQueue[StageMaterializationEvent]()
@@ -230,25 +233,43 @@ case class AdaptiveSparkPlanExec(
       currentPhysicalPlan = applyPhysicalRules(result.newPlan, queryStageOptimizerRules)
       isFinalPlan = true
       executionId.foreach(onUpdatePlan(_, Seq(currentPhysicalPlan)))
-      logOnLevel(s"Final plan: $currentPhysicalPlan")
       currentPhysicalPlan
     }
   }
 
+  // Use a lazy val to avoid this being called more than once.
+  @transient private lazy val finalPlanUpdate: Unit = {
+    // Subqueries that don't belong to any query stage of the main query will execute after the
+    // last UI update in `getFinalPhysicalPlan`, so we need to update UI here again to make sure
+    // the newly generated nodes of those subqueries are updated.
+    if (!isSubquery && currentPhysicalPlan.find(_.subqueries.nonEmpty).isDefined) {
+      getExecutionId.foreach(onUpdatePlan(_, Seq.empty))
+    }
+    logOnLevel(s"Final plan: $currentPhysicalPlan")
+  }
+
   override def executeCollect(): Array[InternalRow] = {
-    getFinalPhysicalPlan().executeCollect()
+    val rdd = getFinalPhysicalPlan().executeCollect()
+    finalPlanUpdate
+    rdd
   }
 
   override def executeTake(n: Int): Array[InternalRow] = {
-    getFinalPhysicalPlan().executeTake(n)
+    val rdd = getFinalPhysicalPlan().executeTake(n)
+    finalPlanUpdate
+    rdd
   }
 
   override def executeTail(n: Int): Array[InternalRow] = {
-    getFinalPhysicalPlan().executeTail(n)
+    val rdd = getFinalPhysicalPlan().executeTail(n)
+    finalPlanUpdate
+    rdd
   }
 
   override def doExecute(): RDD[InternalRow] = {
-    getFinalPhysicalPlan().execute()
+    val rdd = getFinalPhysicalPlan().execute()
+    finalPlanUpdate
+    rdd
   }
 
   protected override def stringArgs: Iterator[Any] = Iterator(s"isFinalPlan=$isFinalPlan")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index c6caffa..f30d1e9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -72,12 +72,15 @@ class AdaptiveQueryExecSuite
     }
     val planAfter = dfAdaptive.queryExecution.executedPlan
     assert(planAfter.toString.startsWith("AdaptiveSparkPlan isFinalPlan=true"))
+    val adaptivePlan = planAfter.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
 
     spark.sparkContext.listenerBus.waitUntilEmpty()
-    assert(finalPlanCnt == 1)
+    // AQE will post `SparkListenerSQLAdaptiveExecutionUpdate` twice in case of subqueries that
+    // exist out of query stages.
+    val expectedFinalPlanCnt = adaptivePlan.find(_.subqueries.nonEmpty).map(_ => 2).getOrElse(1)
+    assert(finalPlanCnt == expectedFinalPlanCnt)
     spark.sparkContext.removeSparkListener(listener)
 
-    val adaptivePlan = planAfter.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
     val exchanges = adaptivePlan.collect {
       case e: Exchange => e
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org