You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/01/02 01:19:26 UTC

spark git commit: [SPARK-22932][SQL] Refactor AnalysisContext

Repository: spark
Updated Branches:
  refs/heads/master e734a4b9c -> e0c090f22


[SPARK-22932][SQL] Refactor AnalysisContext

## What changes were proposed in this pull request?
Add a `reset` function to ensure the state in `AnalysisContext ` is per-query.

## How was this patch tested?
The existing test cases

Author: gatorsmile <ga...@gmail.com>

Closes #20127 from gatorsmile/refactorAnalysisContext.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e0c090f2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e0c090f2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e0c090f2

Branch: refs/heads/master
Commit: e0c090f227e9b64e595b47d4d1f96f8a2fff5bf7
Parents: e734a4b
Author: gatorsmile <ga...@gmail.com>
Authored: Tue Jan 2 09:19:18 2018 +0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Tue Jan 2 09:19:18 2018 +0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala  | 25 ++++++++++++++++----
 1 file changed, 20 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e0c090f2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 6d294d4..35b3511 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -52,6 +52,7 @@ object SimpleAnalyzer extends Analyzer(
 /**
  * Provides a way to keep state during the analysis, this enables us to decouple the concerns
  * of analysis environment from the catalog.
+ * The state that is kept here is per-query.
  *
  * Note this is thread local.
  *
@@ -70,6 +71,8 @@ object AnalysisContext {
   }
 
   def get: AnalysisContext = value.get()
+  def reset(): Unit = value.remove()
+
   private def set(context: AnalysisContext): Unit = value.set(context)
 
   def withAnalysisContext[A](database: Option[String])(f: => A): A = {
@@ -95,6 +98,17 @@ class Analyzer(
     this(catalog, conf, conf.optimizerMaxIterations)
   }
 
+  override def execute(plan: LogicalPlan): LogicalPlan = {
+    AnalysisContext.reset()
+    try {
+      executeSameContext(plan)
+    } finally {
+      AnalysisContext.reset()
+    }
+  }
+
+  private def executeSameContext(plan: LogicalPlan): LogicalPlan = super.execute(plan)
+
   def resolver: Resolver = conf.resolver
 
   protected val fixedPoint = FixedPoint(maxIterations)
@@ -176,7 +190,7 @@ class Analyzer(
       case With(child, relations) =>
         substituteCTE(child, relations.foldLeft(Seq.empty[(String, LogicalPlan)]) {
           case (resolved, (name, relation)) =>
-            resolved :+ name -> execute(substituteCTE(relation, resolved))
+            resolved :+ name -> executeSameContext(substituteCTE(relation, resolved))
         })
       case other => other
     }
@@ -600,7 +614,7 @@ class Analyzer(
               "avoid errors. Increase the value of spark.sql.view.maxNestedViewDepth to work " +
               "aroud this.")
           }
-          execute(child)
+          executeSameContext(child)
         }
         view.copy(child = newChild)
       case p @ SubqueryAlias(_, view: View) =>
@@ -1269,7 +1283,7 @@ class Analyzer(
       do {
         // Try to resolve the subquery plan using the regular analyzer.
         previous = current
-        current = execute(current)
+        current = executeSameContext(current)
 
         // Use the outer references to resolve the subquery plan if it isn't resolved yet.
         val i = plans.iterator
@@ -1392,7 +1406,7 @@ class Analyzer(
               grouping,
               Alias(cond, "havingCondition")() :: Nil,
               child)
-          val resolvedOperator = execute(aggregatedCondition)
+          val resolvedOperator = executeSameContext(aggregatedCondition)
           def resolvedAggregateFilter =
             resolvedOperator
               .asInstanceOf[Aggregate]
@@ -1450,7 +1464,8 @@ class Analyzer(
           val aliasedOrdering =
             unresolvedSortOrders.map(o => Alias(o.child, "aggOrder")())
           val aggregatedOrdering = aggregate.copy(aggregateExpressions = aliasedOrdering)
-          val resolvedAggregate: Aggregate = execute(aggregatedOrdering).asInstanceOf[Aggregate]
+          val resolvedAggregate: Aggregate =
+            executeSameContext(aggregatedOrdering).asInstanceOf[Aggregate]
           val resolvedAliasedOrdering: Seq[Alias] =
             resolvedAggregate.aggregateExpressions.asInstanceOf[Seq[Alias]]
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org