You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/04/18 07:09:48 UTC

[spark] branch master updated: [SPARK-31473][SQL] AQE should set active session during execution

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6198f38  [SPARK-31473][SQL] AQE should set active session during execution
6198f38 is described below

commit 6198f384054e7f86521891ceeb1a231f449a16a8
Author: Maryann Xue <ma...@gmail.com>
AuthorDate: Sat Apr 18 00:08:36 2020 -0700

    [SPARK-31473][SQL] AQE should set active session during execution
    
    ### What changes were proposed in this pull request?
    
    AQE creates new SparkPlan nodes during execution. This PR makes sure that the active session is set correctly during this process and AQE execution is not disrupted by external session change.
    
    ### Why are the changes needed?
    
    To prevent potential errors. If not changed, the physical plans generated by AQE would have the wrong SparkSession or even null SparkSession, which could lead to NPE.
    
    ### Does this PR introduce any user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Added UT.
    
    Closes #28247 from maryannxue/aqe-activesession.
    
    Authored-by: Maryann Xue <ma...@gmail.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../sql/execution/adaptive/AdaptiveSparkPlanExec.scala      |  9 +++++++--
 .../sql/execution/adaptive/AdaptiveQueryExecSuite.scala     | 13 ++++++++++++-
 2 files changed, 19 insertions(+), 3 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index 217817e..3ac4ea5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -139,7 +139,12 @@ case class AdaptiveSparkPlanExec(
   }
 
   private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized {
-    if (!isFinalPlan) {
+    if (isFinalPlan) return currentPhysicalPlan
+
+    // In case of this adaptive plan being executed out of `withActive` scoped functions, e.g.,
+    // `plan.queryExecution.rdd`, we need to set active session here as new plan nodes can be
+    // created in the middle of the execution.
+    context.session.withActive {
       // Subqueries do not have their own execution IDs and therefore rely on the main query to
       // update UI.
       val executionId = Option(context.session.sparkContext.getLocalProperty(
@@ -225,8 +230,8 @@ case class AdaptiveSparkPlanExec(
       isFinalPlan = true
       executionId.foreach(onUpdatePlan(_, Seq(currentPhysicalPlan)))
       logOnLevel(s"Final plan: $currentPhysicalPlan")
+      currentPhysicalPlan
     }
-    currentPhysicalPlan
   }
 
   override def executeCollect(): Array[InternalRow] = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 35dec44..694be98 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -23,12 +23,13 @@ import java.net.URI
 import org.apache.log4j.Level
 
 import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerJobStart}
-import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.{QueryTest, Row, SparkSession}
 import org.apache.spark.sql.execution.{ReusedSubqueryExec, ShuffledRowRDD, SparkPlan}
 import org.apache.spark.sql.execution.command.DataWritingCommandExec
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ReusedExchangeExec}
 import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, BuildRight, SortMergeJoinExec}
 import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types.{IntegerType, StructType}
@@ -907,4 +908,14 @@ class AdaptiveQueryExecSuite
       assert(plan.asInstanceOf[DataWritingCommandExec].child.isInstanceOf[AdaptiveSparkPlanExec])
     }
   }
+
+  test("AQE should set active session during execution") {
+    withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
+      val df = spark.range(10).select(sum('id))
+      assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
+      SparkSession.setActiveSession(null)
+      checkAnswer(df, Seq(Row(45)))
+      SparkSession.setActiveSession(spark) // recover the active session.
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org