You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/02/21 15:05:16 UTC
[spark] branch branch-3.0 updated: [SPARK-30906][SQL] Turning off
AQE in CacheManager is not thread-safe
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 36c59a4 [SPARK-30906][SQL] Turning off AQE in CacheManager is not thread-safe
36c59a4 is described below
commit 36c59a473719fa7bb6a61048b50b717ff8f43c5f
Author: maryannxue <ma...@apache.org>
AuthorDate: Fri Feb 21 22:49:20 2020 +0800
[SPARK-30906][SQL] Turning off AQE in CacheManager is not thread-safe
### What changes were proposed in this pull request?
This PR aims to fix the thread-safety issue in turning off AQE for CacheManager by cloning the current session and changing the AQE conf on the cloned session.
This PR also adds a utility function for cloning the session with AQE disabled conf value, which can be shared by another caller.
### Why are the changes needed?
To fix the potential thread-unsafe problem.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Manually tested CachedTableSuite with AQE settings enabled.
Closes #27659 from maryannxue/spark-30906.
Authored-by: maryannxue <ma...@apache.org>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit 6058ce97b989606ffedddf32778ce38f322cfb9e)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../apache/spark/sql/execution/CacheManager.scala | 28 +++++++++++-----------
.../spark/sql/execution/QueryExecution.scala | 27 +++++++++++++--------
.../execution/adaptive/AdaptiveSparkPlanExec.scala | 1 -
3 files changed, 31 insertions(+), 25 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 413bd7b..ad33ce5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -79,20 +79,16 @@ class CacheManager extends Logging {
if (lookupCachedData(planToCache).nonEmpty) {
logWarning("Asked to cache already cached data.")
} else {
- val sparkSession = query.sparkSession
- val qe = sparkSession.sessionState.executePlan(planToCache)
- val originalValue = sparkSession.sessionState.conf.getConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED)
- val inMemoryRelation = try {
- // Avoiding changing the output partitioning, here disable AQE.
- sparkSession.sessionState.conf.setConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED, false)
+ // Turn off AQE so that the outputPartitioning of the underlying plan can be leveraged.
+ val sessionWithAqeOff = QueryExecution.getOrCloneSessionWithAqeOff(query.sparkSession)
+ val inMemoryRelation = sessionWithAqeOff.withActive {
+ val qe = sessionWithAqeOff.sessionState.executePlan(planToCache)
InMemoryRelation(
- sparkSession.sessionState.conf.useCompression,
- sparkSession.sessionState.conf.columnBatchSize, storageLevel,
+ sessionWithAqeOff.sessionState.conf.useCompression,
+ sessionWithAqeOff.sessionState.conf.columnBatchSize, storageLevel,
qe.executedPlan,
tableName,
optimizedPlan = qe.optimizedPlan)
- } finally {
- sparkSession.sessionState.conf.setConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED, originalValue)
}
this.synchronized {
@@ -194,10 +190,14 @@ class CacheManager extends Logging {
}
needToRecache.map { cd =>
cd.cachedRepresentation.cacheBuilder.clearCache()
- val qe = spark.sessionState.executePlan(cd.plan)
- val newCache = InMemoryRelation(
- cacheBuilder = cd.cachedRepresentation.cacheBuilder.copy(cachedPlan = qe.executedPlan),
- optimizedPlan = qe.optimizedPlan)
+ // Turn off AQE so that the outputPartitioning of the underlying plan can be leveraged.
+ val sessionWithAqeOff = QueryExecution.getOrCloneSessionWithAqeOff(spark)
+ val newCache = sessionWithAqeOff.withActive {
+ val qe = sessionWithAqeOff.sessionState.executePlan(cd.plan)
+ InMemoryRelation(
+ cacheBuilder = cd.cachedRepresentation.cacheBuilder.copy(cachedPlan = qe.executedPlan),
+ optimizedPlan = qe.optimizedPlan)
+ }
val recomputedPlan = cd.copy(cachedRepresentation = newCache)
this.synchronized {
if (lookupCachedData(recomputedPlan.plan).nonEmpty) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 4571e69..20bc289 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -276,14 +276,7 @@ object QueryExecution {
*/
private[execution] def preparations(sparkSession: SparkSession): Seq[Rule[SparkPlan]] = {
- val sparkSessionWithAdaptiveExecutionOff =
- if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
- val session = sparkSession.cloneSession()
- session.sessionState.conf.setConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED, false)
- session
- } else {
- sparkSession
- }
+ val sparkSessionWithAqeOff = getOrCloneSessionWithAqeOff(sparkSession)
Seq(
// `AdaptiveSparkPlanExec` is a leaf node. If inserted, all the following rules will be no-op
@@ -291,8 +284,8 @@ object QueryExecution {
InsertAdaptiveSparkPlan(AdaptiveExecutionContext(sparkSession)),
// If the following rules apply, it means the main query is not AQE-ed, so we make sure the
// subqueries are not AQE-ed either.
- PlanDynamicPruningFilters(sparkSessionWithAdaptiveExecutionOff),
- PlanSubqueries(sparkSessionWithAdaptiveExecutionOff),
+ PlanDynamicPruningFilters(sparkSessionWithAqeOff),
+ PlanSubqueries(sparkSessionWithAqeOff),
EnsureRequirements(sparkSession.sessionState.conf),
ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.conf,
sparkSession.sessionState.columnarRules),
@@ -341,4 +334,18 @@ object QueryExecution {
val sparkPlan = createSparkPlan(spark, spark.sessionState.planner, plan.clone())
prepareExecutedPlan(spark, sparkPlan)
}
+
+ /**
+ * Returns a cloned [[SparkSession]] with adaptive execution disabled, or the original
+ * [[SparkSession]] if its adaptive execution is already disabled.
+ */
+ def getOrCloneSessionWithAqeOff[T](session: SparkSession): SparkSession = {
+ if (!session.sessionState.conf.adaptiveExecutionEnabled) {
+ session
+ } else {
+ val newSession = session.cloneSession()
+ newSession.sessionState.conf.setConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED, false)
+ newSession
+ }
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index 3f20b59..4036424 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -467,7 +467,6 @@ case class AdaptiveSparkPlanExec(
private def reOptimize(logicalPlan: LogicalPlan): (SparkPlan, LogicalPlan) = {
logicalPlan.invalidateStatsCache()
val optimized = optimizer.execute(logicalPlan)
- SparkSession.setActiveSession(context.session)
val sparkPlan = context.session.sessionState.planner.plan(ReturnAnswer(optimized)).next()
val newPlan = applyPhysicalRules(sparkPlan, preprocessingRules ++ queryStagePreparationRules)
(newPlan, optimized)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org