You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/02/21 15:05:16 UTC

[spark] branch branch-3.0 updated: [SPARK-30906][SQL] Turning off AQE in CacheManager is not thread-safe

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 36c59a4  [SPARK-30906][SQL] Turning off AQE in CacheManager is not thread-safe
36c59a4 is described below

commit 36c59a473719fa7bb6a61048b50b717ff8f43c5f
Author: maryannxue <ma...@apache.org>
AuthorDate: Fri Feb 21 22:49:20 2020 +0800

    [SPARK-30906][SQL] Turning off AQE in CacheManager is not thread-safe
    
    ### What changes were proposed in this pull request?
    This PR aims to fix the thread-safety issue in turning off AQE for CacheManager by cloning the current session and changing the AQE conf on the cloned session.
    This PR also adds a utility function for cloning the session with AQE disabled conf value, which can be shared by another caller.
    
    ### Why are the changes needed?
    To fix the potential thread-unsafe problem.
    
    ### Does this PR introduce any user-facing change?
    No.
    
    ### How was this patch tested?
    Manually tested CachedTableSuite with AQE settings enabled.
    
    Closes #27659 from maryannxue/spark-30906.
    
    Authored-by: maryannxue <ma...@apache.org>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 6058ce97b989606ffedddf32778ce38f322cfb9e)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../apache/spark/sql/execution/CacheManager.scala  | 28 +++++++++++-----------
 .../spark/sql/execution/QueryExecution.scala       | 27 +++++++++++++--------
 .../execution/adaptive/AdaptiveSparkPlanExec.scala |  1 -
 3 files changed, 31 insertions(+), 25 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 413bd7b..ad33ce5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -79,20 +79,16 @@ class CacheManager extends Logging {
     if (lookupCachedData(planToCache).nonEmpty) {
       logWarning("Asked to cache already cached data.")
     } else {
-      val sparkSession = query.sparkSession
-      val qe = sparkSession.sessionState.executePlan(planToCache)
-      val originalValue = sparkSession.sessionState.conf.getConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED)
-      val inMemoryRelation = try {
-        // Avoiding changing the output partitioning, here disable AQE.
-        sparkSession.sessionState.conf.setConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED, false)
+      // Turn off AQE so that the outputPartitioning of the underlying plan can be leveraged.
+      val sessionWithAqeOff = QueryExecution.getOrCloneSessionWithAqeOff(query.sparkSession)
+      val inMemoryRelation = sessionWithAqeOff.withActive {
+        val qe = sessionWithAqeOff.sessionState.executePlan(planToCache)
         InMemoryRelation(
-          sparkSession.sessionState.conf.useCompression,
-          sparkSession.sessionState.conf.columnBatchSize, storageLevel,
+          sessionWithAqeOff.sessionState.conf.useCompression,
+          sessionWithAqeOff.sessionState.conf.columnBatchSize, storageLevel,
           qe.executedPlan,
           tableName,
           optimizedPlan = qe.optimizedPlan)
-      } finally {
-        sparkSession.sessionState.conf.setConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED, originalValue)
       }
 
       this.synchronized {
@@ -194,10 +190,14 @@ class CacheManager extends Logging {
     }
     needToRecache.map { cd =>
       cd.cachedRepresentation.cacheBuilder.clearCache()
-      val qe = spark.sessionState.executePlan(cd.plan)
-      val newCache = InMemoryRelation(
-        cacheBuilder = cd.cachedRepresentation.cacheBuilder.copy(cachedPlan = qe.executedPlan),
-        optimizedPlan = qe.optimizedPlan)
+      // Turn off AQE so that the outputPartitioning of the underlying plan can be leveraged.
+      val sessionWithAqeOff = QueryExecution.getOrCloneSessionWithAqeOff(spark)
+      val newCache = sessionWithAqeOff.withActive {
+        val qe = sessionWithAqeOff.sessionState.executePlan(cd.plan)
+        InMemoryRelation(
+          cacheBuilder = cd.cachedRepresentation.cacheBuilder.copy(cachedPlan = qe.executedPlan),
+          optimizedPlan = qe.optimizedPlan)
+      }
       val recomputedPlan = cd.copy(cachedRepresentation = newCache)
       this.synchronized {
         if (lookupCachedData(recomputedPlan.plan).nonEmpty) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 4571e69..20bc289 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -276,14 +276,7 @@ object QueryExecution {
    */
   private[execution] def preparations(sparkSession: SparkSession): Seq[Rule[SparkPlan]] = {
 
-    val sparkSessionWithAdaptiveExecutionOff =
-    if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
-      val session = sparkSession.cloneSession()
-      session.sessionState.conf.setConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED, false)
-      session
-    } else {
-      sparkSession
-    }
+    val sparkSessionWithAqeOff = getOrCloneSessionWithAqeOff(sparkSession)
 
     Seq(
       // `AdaptiveSparkPlanExec` is a leaf node. If inserted, all the following rules will be no-op
@@ -291,8 +284,8 @@ object QueryExecution {
       InsertAdaptiveSparkPlan(AdaptiveExecutionContext(sparkSession)),
       // If the following rules apply, it means the main query is not AQE-ed, so we make sure the
       // subqueries are not AQE-ed either.
-      PlanDynamicPruningFilters(sparkSessionWithAdaptiveExecutionOff),
-      PlanSubqueries(sparkSessionWithAdaptiveExecutionOff),
+      PlanDynamicPruningFilters(sparkSessionWithAqeOff),
+      PlanSubqueries(sparkSessionWithAqeOff),
       EnsureRequirements(sparkSession.sessionState.conf),
       ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.conf,
         sparkSession.sessionState.columnarRules),
@@ -341,4 +334,18 @@ object QueryExecution {
     val sparkPlan = createSparkPlan(spark, spark.sessionState.planner, plan.clone())
     prepareExecutedPlan(spark, sparkPlan)
   }
+
+  /**
+   * Returns a cloned [[SparkSession]] with adaptive execution disabled, or the original
+   * [[SparkSession]] if its adaptive execution is already disabled.
+   */
+  def getOrCloneSessionWithAqeOff[T](session: SparkSession): SparkSession = {
+    if (!session.sessionState.conf.adaptiveExecutionEnabled) {
+      session
+    } else {
+      val newSession = session.cloneSession()
+      newSession.sessionState.conf.setConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED, false)
+      newSession
+    }
+  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index 3f20b59..4036424 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -467,7 +467,6 @@ case class AdaptiveSparkPlanExec(
   private def reOptimize(logicalPlan: LogicalPlan): (SparkPlan, LogicalPlan) = {
     logicalPlan.invalidateStatsCache()
     val optimized = optimizer.execute(logicalPlan)
-    SparkSession.setActiveSession(context.session)
     val sparkPlan = context.session.sessionState.planner.plan(ReturnAnswer(optimized)).next()
     val newPlan = applyPhysicalRules(sparkPlan, preprocessingRules ++ queryStagePreparationRules)
     (newPlan, optimized)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org