You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/01/02 01:19:26 UTC
spark git commit: [SPARK-22932][SQL] Refactor AnalysisContext
Repository: spark
Updated Branches:
refs/heads/master e734a4b9c -> e0c090f22
[SPARK-22932][SQL] Refactor AnalysisContext
## What changes were proposed in this pull request?
Add a `reset` function to ensure the state in `AnalysisContext ` is per-query.
## How was this patch tested?
The existing test cases
Author: gatorsmile <ga...@gmail.com>
Closes #20127 from gatorsmile/refactorAnalysisContext.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e0c090f2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e0c090f2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e0c090f2
Branch: refs/heads/master
Commit: e0c090f227e9b64e595b47d4d1f96f8a2fff5bf7
Parents: e734a4b
Author: gatorsmile <ga...@gmail.com>
Authored: Tue Jan 2 09:19:18 2018 +0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Tue Jan 2 09:19:18 2018 +0800
----------------------------------------------------------------------
.../spark/sql/catalyst/analysis/Analyzer.scala | 25 ++++++++++++++++----
1 file changed, 20 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/e0c090f2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 6d294d4..35b3511 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -52,6 +52,7 @@ object SimpleAnalyzer extends Analyzer(
/**
* Provides a way to keep state during the analysis, this enables us to decouple the concerns
* of analysis environment from the catalog.
+ * The state that is kept here is per-query.
*
* Note this is thread local.
*
@@ -70,6 +71,8 @@ object AnalysisContext {
}
def get: AnalysisContext = value.get()
+ def reset(): Unit = value.remove()
+
private def set(context: AnalysisContext): Unit = value.set(context)
def withAnalysisContext[A](database: Option[String])(f: => A): A = {
@@ -95,6 +98,17 @@ class Analyzer(
this(catalog, conf, conf.optimizerMaxIterations)
}
+ override def execute(plan: LogicalPlan): LogicalPlan = {
+ AnalysisContext.reset()
+ try {
+ executeSameContext(plan)
+ } finally {
+ AnalysisContext.reset()
+ }
+ }
+
+ private def executeSameContext(plan: LogicalPlan): LogicalPlan = super.execute(plan)
+
def resolver: Resolver = conf.resolver
protected val fixedPoint = FixedPoint(maxIterations)
@@ -176,7 +190,7 @@ class Analyzer(
case With(child, relations) =>
substituteCTE(child, relations.foldLeft(Seq.empty[(String, LogicalPlan)]) {
case (resolved, (name, relation)) =>
- resolved :+ name -> execute(substituteCTE(relation, resolved))
+ resolved :+ name -> executeSameContext(substituteCTE(relation, resolved))
})
case other => other
}
@@ -600,7 +614,7 @@ class Analyzer(
"avoid errors. Increase the value of spark.sql.view.maxNestedViewDepth to work " +
"aroud this.")
}
- execute(child)
+ executeSameContext(child)
}
view.copy(child = newChild)
case p @ SubqueryAlias(_, view: View) =>
@@ -1269,7 +1283,7 @@ class Analyzer(
do {
// Try to resolve the subquery plan using the regular analyzer.
previous = current
- current = execute(current)
+ current = executeSameContext(current)
// Use the outer references to resolve the subquery plan if it isn't resolved yet.
val i = plans.iterator
@@ -1392,7 +1406,7 @@ class Analyzer(
grouping,
Alias(cond, "havingCondition")() :: Nil,
child)
- val resolvedOperator = execute(aggregatedCondition)
+ val resolvedOperator = executeSameContext(aggregatedCondition)
def resolvedAggregateFilter =
resolvedOperator
.asInstanceOf[Aggregate]
@@ -1450,7 +1464,8 @@ class Analyzer(
val aliasedOrdering =
unresolvedSortOrders.map(o => Alias(o.child, "aggOrder")())
val aggregatedOrdering = aggregate.copy(aggregateExpressions = aliasedOrdering)
- val resolvedAggregate: Aggregate = execute(aggregatedOrdering).asInstanceOf[Aggregate]
+ val resolvedAggregate: Aggregate =
+ executeSameContext(aggregatedOrdering).asInstanceOf[Aggregate]
val resolvedAliasedOrdering: Seq[Alias] =
resolvedAggregate.aggregateExpressions.asInstanceOf[Seq[Alias]]
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org