You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/02/28 04:51:30 UTC

spark git commit: [SPARK-13526][SQL] Move SQLContext per-session states to new class

Repository: spark
Updated Branches:
  refs/heads/master 4c5e968db -> cca79fad6


[SPARK-13526][SQL] Move SQLContext per-session states to new class

## What changes were proposed in this pull request?

This creates a `SessionState`, which groups a few fields that existed in `SQLContext`. Because `HiveContext` extends `SQLContext` we also need to make changes there. This is mainly a cleanup task that will soon pave the way for merging the two contexts.

## How was this patch tested?

Existing unit tests; this patch introduces no change in behavior.

Author: Andrew Or <an...@databricks.com>

Closes #11405 from andrewor14/refactor-session.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cca79fad
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cca79fad
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cca79fad

Branch: refs/heads/master
Commit: cca79fad66c4315b0ed6de59fd87700a540e6646
Parents: 4c5e968
Author: Andrew Or <an...@databricks.com>
Authored: Sat Feb 27 19:51:28 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Sat Feb 27 19:51:28 2016 -0800

----------------------------------------------------------------------
 project/MimaExcludes.scala                      |   7 +-
 .../scala/org/apache/spark/sql/SQLContext.scala |  88 +++++----------
 .../org/apache/spark/sql/UDFRegistration.scala  |   5 +-
 .../spark/sql/internal/SessionState.scala       | 111 +++++++++++++++++++
 .../apache/spark/sql/test/TestSQLContext.scala  |  22 ++--
 .../hive/execution/HiveCompatibilitySuite.scala |   4 +-
 .../org/apache/spark/sql/hive/HiveContext.scala |  81 ++------------
 .../spark/sql/hive/HiveSessionState.scala       | 103 +++++++++++++++++
 .../apache/spark/sql/hive/test/TestHive.scala   |  30 ++---
 .../apache/spark/sql/hive/parquetSuites.scala   |   7 +-
 10 files changed, 294 insertions(+), 164 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cca79fad/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 165280a..9ce37fc 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -271,13 +271,18 @@ object MimaExcludes {
       ) ++ Seq(
         // SPARK-13220 Deprecate yarn-client and yarn-cluster mode
         ProblemFilters.exclude[MissingMethodProblem](
-          "org.apache.spark.SparkContext.org$apache$spark$SparkContext$$createTaskScheduler"),
+          "org.apache.spark.SparkContext.org$apache$spark$SparkContext$$createTaskScheduler")
+      ) ++ Seq(
         // SPARK-13465 TaskContext.
         ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.TaskContext.addTaskFailureListener")
       ) ++ Seq (
         // SPARK-7729 Executor which has been killed should also be displayed on Executor Tab
         ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.status.api.v1.ExecutorSummary.this")
       ) ++ Seq(
+        // SPARK-13526 Move SQLContext per-session states to new class
+        ProblemFilters.exclude[IncompatibleMethTypeProblem](
+          "org.apache.spark.sql.UDFRegistration.this")
+      ) ++ Seq(
         // [SPARK-13486][SQL] Move SQLConf into an internal package
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.SQLConf"),
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.SQLConf$SQLConfEntry"),

http://git-wip-us.apache.org/repos/asf/spark/blob/cca79fad/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 1c24d9e..cb4a639 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -25,13 +25,12 @@ import scala.collection.JavaConverters._
 import scala.collection.immutable
 import scala.reflect.runtime.universe.TypeTag
 
-import org.apache.spark.{SparkContext, SparkException}
+import org.apache.spark.{Logging, SparkContext, SparkException}
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
-import org.apache.spark.sql.{execution => sparkexecution}
-import org.apache.spark.sql.catalyst.{InternalRow, _}
+import org.apache.spark.sql.catalyst._
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.encoders.encoderFor
 import org.apache.spark.sql.catalyst.expressions._
@@ -40,9 +39,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan,
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.exchange.EnsureRequirements
 import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab}
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.{SessionState, SQLConf}
 import org.apache.spark.sql.internal.SQLConf.SQLConfEntry
 import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.types._
@@ -68,7 +66,7 @@ class SQLContext private[sql](
     @transient protected[sql] val cacheManager: CacheManager,
     @transient private[sql] val listener: SQLListener,
     val isRootContext: Boolean)
-  extends org.apache.spark.Logging with Serializable {
+  extends Logging with Serializable {
 
   self =>
 
@@ -115,9 +113,27 @@ class SQLContext private[sql](
   }
 
   /**
-   * @return Spark SQL configuration
+   * Per-session state, e.g. configuration, functions, temporary tables etc.
    */
-  protected[sql] lazy val conf = new SQLConf
+  @transient
+  protected[sql] lazy val sessionState: SessionState = new SessionState(self)
+  protected[sql] def conf: SQLConf = sessionState.conf
+  protected[sql] def catalog: Catalog = sessionState.catalog
+  protected[sql] def functionRegistry: FunctionRegistry = sessionState.functionRegistry
+  protected[sql] def analyzer: Analyzer = sessionState.analyzer
+  protected[sql] def optimizer: Optimizer = sessionState.optimizer
+  protected[sql] def sqlParser: ParserInterface = sessionState.sqlParser
+  protected[sql] def planner: SparkPlanner = sessionState.planner
+  protected[sql] def continuousQueryManager = sessionState.continuousQueryManager
+  protected[sql] def prepareForExecution: RuleExecutor[SparkPlan] =
+    sessionState.prepareForExecution
+
+  /**
+   * An interface to register custom [[org.apache.spark.sql.util.QueryExecutionListener]]s
+   * that listen for execution metrics.
+   */
+  @Experimental
+  def listenerManager: ExecutionListenerManager = sessionState.listenerManager
 
   /**
    * Set Spark SQL configuration properties.
@@ -179,43 +195,11 @@ class SQLContext private[sql](
    */
   def getAllConfs: immutable.Map[String, String] = conf.getAllConfs
 
-  @transient
-  lazy val listenerManager: ExecutionListenerManager = new ExecutionListenerManager
-
-  protected[sql] lazy val continuousQueryManager = new ContinuousQueryManager(this)
-
-  @transient
-  protected[sql] lazy val catalog: Catalog = new SimpleCatalog(conf)
-
-  @transient
-  protected[sql] lazy val functionRegistry: FunctionRegistry = FunctionRegistry.builtin.copy()
-
-  @transient
-  protected[sql] lazy val analyzer: Analyzer =
-    new Analyzer(catalog, functionRegistry, conf) {
-      override val extendedResolutionRules =
-        python.ExtractPythonUDFs ::
-        PreInsertCastAndRename ::
-        (if (conf.runSQLOnFile) new ResolveDataSource(self) :: Nil else Nil)
-
-      override val extendedCheckRules = Seq(
-        datasources.PreWriteCheck(catalog)
-      )
-    }
-
-  @transient
-  protected[sql] lazy val optimizer: Optimizer = new SparkOptimizer(this)
-
-  @transient
-  protected[sql] val sqlParser: ParserInterface = new SparkQl(conf)
-
   protected[sql] def parseSql(sql: String): LogicalPlan = sqlParser.parsePlan(sql)
 
-  protected[sql] def executeSql(sql: String):
-    org.apache.spark.sql.execution.QueryExecution = executePlan(parseSql(sql))
+  protected[sql] def executeSql(sql: String): QueryExecution = executePlan(parseSql(sql))
 
-  protected[sql] def executePlan(plan: LogicalPlan) =
-    new sparkexecution.QueryExecution(this, plan)
+  protected[sql] def executePlan(plan: LogicalPlan) = new QueryExecution(this, plan)
 
   /**
    * Add a jar to SQLContext
@@ -299,10 +283,8 @@ class SQLContext private[sql](
    *
    * @group basic
    * @since 1.3.0
-   * TODO move to SQLSession?
    */
-  @transient
-  val udf: UDFRegistration = new UDFRegistration(this)
+  def udf: UDFRegistration = sessionState.udf
 
   /**
    * Returns true if the table is currently cached in-memory.
@@ -873,25 +855,9 @@ class SQLContext private[sql](
   }
 
   @transient
-  protected[sql] val planner: sparkexecution.SparkPlanner = new sparkexecution.SparkPlanner(this)
-
-  @transient
   protected[sql] lazy val emptyResult = sparkContext.parallelize(Seq.empty[InternalRow], 1)
 
   /**
-   * Prepares a planned SparkPlan for execution by inserting shuffle operations and internal
-   * row format conversions as needed.
-   */
-  @transient
-  protected[sql] val prepareForExecution = new RuleExecutor[SparkPlan] {
-    val batches = Seq(
-      Batch("Subquery", Once, PlanSubqueries(self)),
-      Batch("Add exchange", Once, EnsureRequirements(self)),
-      Batch("Whole stage codegen", Once, CollapseCodegenStages(self))
-    )
-  }
-
-  /**
    * Parses the data type in our internal string representation. The data type string should
    * have the same format as the one generated by `toString` in scala.
    * It is only used by PySpark.

http://git-wip-us.apache.org/repos/asf/spark/blob/cca79fad/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
index de01cbc..d894825 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
@@ -22,6 +22,7 @@ import scala.util.Try
 
 import org.apache.spark.Logging
 import org.apache.spark.sql.api.java._
+import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
 import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.catalyst.expressions.{Expression, ScalaUDF}
 import org.apache.spark.sql.execution.aggregate.ScalaUDAF
@@ -34,9 +35,7 @@ import org.apache.spark.sql.types.DataType
  *
  * @since 1.3.0
  */
-class UDFRegistration private[sql] (sqlContext: SQLContext) extends Logging {
-
-  private val functionRegistry = sqlContext.functionRegistry
+class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends Logging {
 
   protected[sql] def registerPython(name: String, udf: UserDefinedPythonFunction): Unit = {
     log.debug(

http://git-wip-us.apache.org/repos/asf/spark/blob/cca79fad/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
new file mode 100644
index 0000000..f93a405
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal
+
+import org.apache.spark.sql.{ContinuousQueryManager, SQLContext, UDFRegistration}
+import org.apache.spark.sql.catalyst.ParserInterface
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, Catalog, FunctionRegistry, SimpleCatalog}
+import org.apache.spark.sql.catalyst.optimizer.Optimizer
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.datasources.{PreInsertCastAndRename, ResolveDataSource}
+import org.apache.spark.sql.execution.exchange.EnsureRequirements
+import org.apache.spark.sql.util.ExecutionListenerManager
+
+
+/**
+ * A class that holds all session-specific state in a given [[SQLContext]].
+ */
+private[sql] class SessionState(ctx: SQLContext) {
+
+  // Note: These are all lazy vals because they depend on each other (e.g. conf) and we
+  // want subclasses to override some of the fields. Otherwise, we would get a lot of NPEs.
+
+  /**
+   * SQL-specific key-value configurations.
+   */
+  lazy val conf = new SQLConf
+
+  /**
+   * Internal catalog for managing table and database states.
+   */
+  lazy val catalog: Catalog = new SimpleCatalog(conf)
+
+  /**
+   * Internal catalog for managing functions registered by the user.
+   */
+  lazy val functionRegistry: FunctionRegistry = FunctionRegistry.builtin.copy()
+
+  /**
+   * Interface exposed to the user for registering user-defined functions.
+   */
+  lazy val udf: UDFRegistration = new UDFRegistration(functionRegistry)
+
+  /**
+   * Logical query plan analyzer for resolving unresolved attributes and relations.
+   */
+  lazy val analyzer: Analyzer = {
+    new Analyzer(catalog, functionRegistry, conf) {
+      override val extendedResolutionRules =
+        python.ExtractPythonUDFs ::
+          PreInsertCastAndRename ::
+          (if (conf.runSQLOnFile) new ResolveDataSource(ctx) :: Nil else Nil)
+
+      override val extendedCheckRules = Seq(datasources.PreWriteCheck(catalog))
+    }
+  }
+
+  /**
+   * Logical query plan optimizer.
+   */
+  lazy val optimizer: Optimizer = new SparkOptimizer(ctx)
+
+  /**
+   * Parser that extracts expressions, plans, table identifiers etc. from SQL texts.
+   */
+  lazy val sqlParser: ParserInterface = new SparkQl(conf)
+
+  /**
+   * Planner that converts optimized logical plans to physical plans.
+   */
+  lazy val planner: SparkPlanner = new SparkPlanner(ctx)
+
+  /**
+   * Prepares a planned [[SparkPlan]] for execution by inserting shuffle operations and internal
+   * row format conversions as needed.
+   */
+  lazy val prepareForExecution = new RuleExecutor[SparkPlan] {
+    override val batches: Seq[Batch] = Seq(
+      Batch("Subquery", Once, PlanSubqueries(ctx)),
+      Batch("Add exchange", Once, EnsureRequirements(ctx)),
+      Batch("Whole stage codegen", Once, CollapseCodegenStages(ctx))
+    )
+  }
+
+  /**
+   * An interface to register custom [[org.apache.spark.sql.util.QueryExecutionListener]]s
+   * that listen for execution metrics.
+   */
+  lazy val listenerManager: ExecutionListenerManager = new ExecutionListenerManager
+
+  /**
+   * Interface to start and stop [[org.apache.spark.sql.ContinuousQuery]]s.
+   */
+  lazy val continuousQueryManager: ContinuousQueryManager = new ContinuousQueryManager(ctx)
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/cca79fad/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
index 28ad7ae..b3e146f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.test
 
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.{SessionState, SQLConf}
 
 /**
  * A special [[SQLContext]] prepared for testing.
@@ -31,16 +31,16 @@ private[sql] class TestSQLContext(sc: SparkContext) extends SQLContext(sc) { sel
       new SparkConf().set("spark.sql.testkey", "true")))
   }
 
-  protected[sql] override lazy val conf: SQLConf = new SQLConf {
-
-    clear()
-
-    override def clear(): Unit = {
-      super.clear()
-
-      // Make sure we start with the default test configs even after clear
-      TestSQLContext.overrideConfs.foreach {
-        case (key, value) => setConfString(key, value)
+  @transient
+  protected[sql] override lazy val sessionState: SessionState = new SessionState(self) {
+    override lazy val conf: SQLConf = {
+      new SQLConf {
+        clear()
+        override def clear(): Unit = {
+          super.clear()
+          // Make sure we start with the default test configs even after clear
+          TestSQLContext.overrideConfs.foreach { case (key, value) => setConfString(key, value) }
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/cca79fad/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index d15f883..0dc2a95 100644
--- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -55,7 +55,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
     // Enable in-memory partition pruning for testing purposes
     TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, true)
     // Use Hive hash expression instead of the native one
-    TestHive.functionRegistry.unregisterFunction("hash")
+    TestHive.sessionState.functionRegistry.unregisterFunction("hash")
     RuleExecutor.resetTime()
   }
 
@@ -65,7 +65,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
     Locale.setDefault(originalLocale)
     TestHive.setConf(SQLConf.COLUMN_BATCH_SIZE, originalColumnBatchSize)
     TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning)
-    TestHive.functionRegistry.restore()
+    TestHive.sessionState.functionRegistry.restore()
 
     // For debugging dump some statistics about how much time was spent in various optimizer rules.
     logWarning(RuleExecutor.dumpTimeSpent())

http://git-wip-us.apache.org/repos/asf/spark/blob/cca79fad/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index d511dd6..a9295d3 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -40,17 +40,15 @@ import org.apache.hadoop.util.VersionInfo
 import org.apache.spark.{Logging, SparkContext}
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.{InternalRow, ParserInterface}
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression}
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, PreInsertCastAndRename, PreWriteCheck, ResolveDataSource}
 import org.apache.spark.sql.execution.ui.SQLListener
 import org.apache.spark.sql.hive.client._
 import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand}
-import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf.SQLConfEntry
 import org.apache.spark.sql.internal.SQLConf.SQLConfEntry._
 import org.apache.spark.sql.types._
@@ -110,6 +108,16 @@ class HiveContext private[hive](
       isRootContext = false)
   }
 
+  @transient
+  protected[sql] override lazy val sessionState = new HiveSessionState(self)
+
+  protected[sql] override def catalog = sessionState.catalog
+
+  // The Hive UDF current_database() is foldable, will be evaluated by optimizer,
+  // but the optimizer can't access the SessionState of metadataHive.
+  sessionState.functionRegistry.registerFunction(
+    "current_database", (e: Seq[Expression]) => new CurrentDatabase(self))
+
   /**
    * When true, enables an experimental feature where metastore tables that use the parquet SerDe
    * are automatically converted to use the Spark SQL parquet table scan, instead of the Hive
@@ -442,39 +450,6 @@ class HiveContext private[hive](
     setConf(entry.key, entry.stringConverter(value))
   }
 
-  /* A catalyst metadata catalog that points to the Hive Metastore. */
-  @transient
-  override protected[sql] lazy val catalog =
-    new HiveMetastoreCatalog(metadataHive, this) with OverrideCatalog
-
-  // Note that HiveUDFs will be overridden by functions registered in this context.
-  @transient
-  override protected[sql] lazy val functionRegistry: FunctionRegistry =
-    new HiveFunctionRegistry(FunctionRegistry.builtin.copy(), this.executionHive)
-
-  // The Hive UDF current_database() is foldable, will be evaluated by optimizer, but the optimizer
-  // can't access the SessionState of metadataHive.
-  functionRegistry.registerFunction(
-    "current_database",
-    (expressions: Seq[Expression]) => new CurrentDatabase(this))
-
-  /* An analyzer that uses the Hive metastore. */
-  @transient
-  override protected[sql] lazy val analyzer: Analyzer =
-    new Analyzer(catalog, functionRegistry, conf) {
-      override val extendedResolutionRules =
-        catalog.ParquetConversions ::
-        catalog.CreateTables ::
-        catalog.PreInsertionCasts ::
-        python.ExtractPythonUDFs ::
-        PreInsertCastAndRename ::
-        (if (conf.runSQLOnFile) new ResolveDataSource(self) :: Nil else Nil)
-
-      override val extendedCheckRules = Seq(
-        PreWriteCheck(catalog)
-      )
-    }
-
   /** Overridden by child classes that need to set configuration before the client init. */
   protected def configure(): Map[String, String] = {
     // Hive 0.14.0 introduces timeout operations in HiveConf, and changes default values of a bunch
@@ -544,37 +519,6 @@ class HiveContext private[hive](
     c
   }
 
-  protected[sql] override lazy val conf: SQLConf = new SQLConf {
-    override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false)
-  }
-
-  @transient
-  protected[sql] override val sqlParser: ParserInterface = new HiveQl(conf)
-
-  @transient
-  private val hivePlanner = new SparkPlanner(this) with HiveStrategies {
-    val hiveContext = self
-
-    override def strategies: Seq[Strategy] = experimental.extraStrategies ++ Seq(
-      DataSourceStrategy,
-      HiveCommandStrategy(self),
-      HiveDDLStrategy,
-      DDLStrategy,
-      SpecialLimits,
-      InMemoryScans,
-      HiveTableScans,
-      DataSinks,
-      Scripts,
-      Aggregation,
-      LeftSemiJoin,
-      EquiJoinSelection,
-      BasicOperators,
-      BroadcastNestedLoop,
-      CartesianProduct,
-      DefaultJoin
-    )
-  }
-
   private def functionOrMacroDDLPattern(command: String) = Pattern.compile(
     ".*(create|drop)\\s+(temporary\\s+)?(function|macro).+", Pattern.DOTALL).matcher(command)
 
@@ -590,9 +534,6 @@ class HiveContext private[hive](
     }
   }
 
-  @transient
-  override protected[sql] val planner = hivePlanner
-
   /** Extends QueryExecution with hive specific features. */
   protected[sql] class QueryExecution(logicalPlan: LogicalPlan)
     extends org.apache.spark.sql.execution.QueryExecution(this, logicalPlan) {

http://git-wip-us.apache.org/repos/asf/spark/blob/cca79fad/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
new file mode 100644
index 0000000..09f54be
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.ParserInterface
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry, OverrideCatalog}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.execution.{python, SparkPlanner}
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.internal.{SessionState, SQLConf}
+
+
+/**
+ * A class that holds all session-specific state in a given [[HiveContext]].
+ */
+private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx) {
+
+  override lazy val conf: SQLConf = new SQLConf {
+    override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false)
+  }
+
+  /**
+   * A metadata catalog that points to the Hive metastore.
+   */
+  override lazy val catalog = new HiveMetastoreCatalog(ctx.metadataHive, ctx) with OverrideCatalog
+
+  /**
+   * Internal catalog for managing functions registered by the user.
+   * Note that HiveUDFs will be overridden by functions registered in this context.
+   */
+  override lazy val functionRegistry: FunctionRegistry = {
+    new HiveFunctionRegistry(FunctionRegistry.builtin.copy(), ctx.executionHive)
+  }
+
+  /**
+   * An analyzer that uses the Hive metastore.
+   */
+  override lazy val analyzer: Analyzer = {
+    new Analyzer(catalog, functionRegistry, conf) {
+      override val extendedResolutionRules =
+        catalog.ParquetConversions ::
+        catalog.CreateTables ::
+        catalog.PreInsertionCasts ::
+        python.ExtractPythonUDFs ::
+        PreInsertCastAndRename ::
+        (if (conf.runSQLOnFile) new ResolveDataSource(ctx) :: Nil else Nil)
+
+      override val extendedCheckRules = Seq(PreWriteCheck(catalog))
+    }
+  }
+
+  /**
+   * Parser for HiveQl query texts.
+   */
+  override lazy val sqlParser: ParserInterface = new HiveQl(conf)
+
+  /**
+   * Planner that takes into account Hive-specific strategies.
+   */
+  override lazy val planner: SparkPlanner = {
+    new SparkPlanner(ctx) with HiveStrategies {
+      override val hiveContext = ctx
+
+      override def strategies: Seq[Strategy] = {
+        ctx.experimental.extraStrategies ++ Seq(
+          DataSourceStrategy,
+          HiveCommandStrategy(ctx),
+          HiveDDLStrategy,
+          DDLStrategy,
+          SpecialLimits,
+          InMemoryScans,
+          HiveTableScans,
+          DataSinks,
+          Scripts,
+          Aggregation,
+          LeftSemiJoin,
+          EquiJoinSelection,
+          BasicOperators,
+          BroadcastNestedLoop,
+          CartesianProduct,
+          DefaultJoin
+        )
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/cca79fad/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 9d0622b..a7eca46 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -120,18 +120,25 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
   override def executePlan(plan: LogicalPlan): this.QueryExecution =
     new this.QueryExecution(plan)
 
-  protected[sql] override lazy val conf: SQLConf = new SQLConf {
-    override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false)
-
-    clear()
-
-    override def clear(): Unit = {
-      super.clear()
-
-      TestHiveContext.overrideConfs.map {
-        case (key, value) => setConfString(key, value)
+  @transient
+  protected[sql] override lazy val sessionState = new HiveSessionState(this) {
+    override lazy val conf: SQLConf = {
+      new SQLConf {
+        clear()
+        override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false)
+        override def clear(): Unit = {
+          super.clear()
+          TestHiveContext.overrideConfs.map {
+            case (key, value) => setConfString(key, value)
+          }
+        }
       }
     }
+
+    override lazy val functionRegistry = {
+      new TestHiveFunctionRegistry(
+        org.apache.spark.sql.catalyst.analysis.FunctionRegistry.builtin.copy(), self.executionHive)
+    }
   }
 
   /**
@@ -454,9 +461,6 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
     }
   }
 
-  @transient
-  override protected[sql] lazy val functionRegistry = new TestHiveFunctionRegistry(
-    org.apache.spark.sql.catalyst.analysis.FunctionRegistry.builtin.copy(), this.executionHive)
 }
 
 private[hive] class TestHiveFunctionRegistry(fr: SimpleFunctionRegistry, client: HiveClientImpl)

http://git-wip-us.apache.org/repos/asf/spark/blob/cca79fad/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index a127cf6..d81c566 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -425,7 +425,8 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
   }
 
   test("Caching converted data source Parquet Relations") {
-    def checkCached(tableIdentifier: catalog.QualifiedTableName): Unit = {
+    val _catalog = catalog
+    def checkCached(tableIdentifier: _catalog.QualifiedTableName): Unit = {
       // Converted test_parquet should be cached.
       catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) match {
         case null => fail("Converted test_parquet should be cached in the cache.")
@@ -452,7 +453,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
         |  OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
       """.stripMargin)
 
-    var tableIdentifier = catalog.QualifiedTableName("default", "test_insert_parquet")
+    var tableIdentifier = _catalog.QualifiedTableName("default", "test_insert_parquet")
 
     // First, make sure the converted test_parquet is not cached.
     assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
@@ -492,7 +493,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
         |  OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
       """.stripMargin)
 
-    tableIdentifier = catalog.QualifiedTableName("default", "test_parquet_partitioned_cache_test")
+    tableIdentifier = _catalog.QualifiedTableName("default", "test_parquet_partitioned_cache_test")
     assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
     sql(
       """


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org