You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/03/28 02:07:31 UTC

spark git commit: [SPARK-20100][SQL] Refactor SessionState initialization

Repository: spark
Updated Branches:
  refs/heads/master 8a6f33f04 -> ea361165e


[SPARK-20100][SQL] Refactor SessionState initialization

## What changes were proposed in this pull request?
The current SessionState initialization code path is quite complex. A part of the creation is done in the SessionState companion objects, a part of the creation is one inside the SessionState class, and a part is done by passing functions.

This PR refactors this code path, and consolidates SessionState initialization into a builder class. This SessionState will not do any initialization and just becomes a place holder for the various Spark SQL internals. This also lays the ground work for two future improvements:

1. This provides us with a start for removing the `HiveSessionState`. Removing the `HiveSessionState` would also require us to move resource loading into a separate class, and to (re)move metadata hive.
2. This makes it easier to customize the Spark Session. Currently you will need to create a custom version of the builder. I have added hooks to facilitate this. A future step will be to create a semi stable API on top of this.

## How was this patch tested?
Existing tests.

Author: Herman van Hovell <hv...@databricks.com>

Closes #17433 from hvanhovell/SPARK-20100.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ea361165
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ea361165
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ea361165

Branch: refs/heads/master
Commit: ea361165e1ddce4d8aa0242ae3e878d7b39f1de2
Parents: 8a6f33f
Author: Herman van Hovell <hv...@databricks.com>
Authored: Tue Mar 28 10:07:24 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Tue Mar 28 10:07:24 2017 +0800

----------------------------------------------------------------------
 .../sql/catalyst/catalog/SessionCatalog.scala   |  46 +--
 .../sql/catalyst/optimizer/Optimizer.scala      |  16 +-
 .../catalyst/catalog/SessionCatalogSuite.scala  |  22 +-
 .../spark/sql/execution/SparkOptimizer.scala    |  12 +-
 .../spark/sql/execution/SparkPlanner.scala      |  11 +-
 .../streaming/IncrementalExecution.scala        |  23 +-
 .../spark/sql/internal/SessionState.scala       | 180 +++---------
 .../sql/internal/sessionStateBuilders.scala     | 279 +++++++++++++++++++
 .../apache/spark/sql/test/TestSQLContext.scala  |  23 +-
 .../spark/sql/hive/HiveSessionCatalog.scala     |  76 +----
 .../spark/sql/hive/HiveSessionState.scala       | 259 ++++++++---------
 .../apache/spark/sql/hive/test/TestHive.scala   |  60 ++--
 .../sql/hive/HiveSessionCatalogSuite.scala      | 112 --------
 13 files changed, 547 insertions(+), 572 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ea361165/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index a469d12..72ab075 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -54,7 +54,8 @@ class SessionCatalog(
     functionRegistry: FunctionRegistry,
     conf: CatalystConf,
     hadoopConf: Configuration,
-    parser: ParserInterface) extends Logging {
+    parser: ParserInterface,
+    functionResourceLoader: FunctionResourceLoader) extends Logging {
   import SessionCatalog._
   import CatalogTypes.TablePartitionSpec
 
@@ -69,8 +70,8 @@ class SessionCatalog(
       functionRegistry,
       conf,
       new Configuration(),
-      CatalystSqlParser)
-    functionResourceLoader = DummyFunctionResourceLoader
+      CatalystSqlParser,
+      DummyFunctionResourceLoader)
   }
 
   // For testing only.
@@ -90,9 +91,7 @@ class SessionCatalog(
   // check whether the temporary table or function exists, then, if not, operate on
   // the corresponding item in the current database.
   @GuardedBy("this")
-  protected var currentDb = formatDatabaseName(DEFAULT_DATABASE)
-
-  @volatile var functionResourceLoader: FunctionResourceLoader = _
+  protected var currentDb: String = formatDatabaseName(DEFAULT_DATABASE)
 
   /**
    * Checks if the given name conforms the Hive standard ("[a-zA-z_0-9]+"),
@@ -1059,9 +1058,6 @@ class SessionCatalog(
    * by a tuple (resource type, resource uri).
    */
   def loadFunctionResources(resources: Seq[FunctionResource]): Unit = {
-    if (functionResourceLoader == null) {
-      throw new IllegalStateException("functionResourceLoader has not yet been initialized")
-    }
     resources.foreach(functionResourceLoader.loadResource)
   }
 
@@ -1259,28 +1255,16 @@ class SessionCatalog(
   }
 
   /**
-   * Create a new [[SessionCatalog]] with the provided parameters. `externalCatalog` and
-   * `globalTempViewManager` are `inherited`, while `currentDb` and `tempTables` are copied.
+   * Copy the current state of the catalog to another catalog.
+   *
+   * This function is synchronized on this [[SessionCatalog]] (the source) to make sure the copied
+   * state is consistent. The target [[SessionCatalog]] is not synchronized, and should not be
+   * because the target [[SessionCatalog]] should not be published at this point. The caller must
+   * synchronize on the target if this assumption does not hold.
    */
-  def newSessionCatalogWith(
-      conf: CatalystConf,
-      hadoopConf: Configuration,
-      functionRegistry: FunctionRegistry,
-      parser: ParserInterface): SessionCatalog = {
-    val catalog = new SessionCatalog(
-      externalCatalog,
-      globalTempViewManager,
-      functionRegistry,
-      conf,
-      hadoopConf,
-      parser)
-
-    synchronized {
-      catalog.currentDb = currentDb
-      // copy over temporary tables
-      tempTables.foreach(kv => catalog.tempTables.put(kv._1, kv._2))
-    }
-
-    catalog
+  private[sql] def copyStateTo(target: SessionCatalog): Unit = synchronized {
+    target.currentDb = currentDb
+    // copy over temporary tables
+    tempTables.foreach(kv => target.tempTables.put(kv._1, kv._2))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ea361165/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index dbe3ded..dbf479d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -17,20 +17,14 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
-import scala.annotation.tailrec
-import scala.collection.immutable.HashSet
 import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
 
-import org.apache.spark.api.java.function.FilterFunction
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
-import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral}
-import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
@@ -79,7 +73,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
     Batch("Aggregate", fixedPoint,
       RemoveLiteralFromGroupExpressions,
       RemoveRepetitionFromGroupExpressions) ::
-    Batch("Operator Optimizations", fixedPoint,
+    Batch("Operator Optimizations", fixedPoint, Seq(
       // Operator push down
       PushProjectionThroughUnion,
       ReorderJoin(conf),
@@ -117,7 +111,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
       RemoveRedundantProject,
       SimplifyCreateStructOps,
       SimplifyCreateArrayOps,
-      SimplifyCreateMapOps) ::
+      SimplifyCreateMapOps) ++
+      extendedOperatorOptimizationRules: _*) ::
     Batch("Check Cartesian Products", Once,
       CheckCartesianProducts(conf)) ::
     Batch("Join Reorder", Once,
@@ -146,6 +141,11 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
         s.withNewPlan(newPlan)
     }
   }
+
+  /**
+   * Override to provide additional rules for the operator optimization batch.
+   */
+  def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = Nil
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/ea361165/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index ca4ce1c..56bca73 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.catalyst.catalog
 
-import org.apache.hadoop.conf.Configuration
-
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, SimpleCatalystConf, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis._
@@ -1331,17 +1329,15 @@ abstract class SessionCatalogSuite extends PlanTest {
     }
   }
 
-  test("clone SessionCatalog - temp views") {
+  test("copy SessionCatalog state - temp views") {
     withEmptyCatalog { original =>
       val tempTable1 = Range(1, 10, 1, 10)
       original.createTempView("copytest1", tempTable1, overrideIfExists = false)
 
       // check if tables copied over
-      val clone = original.newSessionCatalogWith(
-        SimpleCatalystConf(caseSensitiveAnalysis = true),
-        new Configuration(),
-        new SimpleFunctionRegistry,
-        CatalystSqlParser)
+      val clone = new SessionCatalog(original.externalCatalog)
+      original.copyStateTo(clone)
+
       assert(original ne clone)
       assert(clone.getTempView("copytest1") == Some(tempTable1))
 
@@ -1355,7 +1351,7 @@ abstract class SessionCatalogSuite extends PlanTest {
     }
   }
 
-  test("clone SessionCatalog - current db") {
+  test("copy SessionCatalog state - current db") {
     withEmptyCatalog { original =>
       val db1 = "db1"
       val db2 = "db2"
@@ -1368,11 +1364,9 @@ abstract class SessionCatalogSuite extends PlanTest {
       original.setCurrentDatabase(db1)
 
       // check if current db copied over
-      val clone = original.newSessionCatalogWith(
-        SimpleCatalystConf(caseSensitiveAnalysis = true),
-        new Configuration(),
-        new SimpleFunctionRegistry,
-        CatalystSqlParser)
+      val clone = new SessionCatalog(original.externalCatalog)
+      original.copyStateTo(clone)
+
       assert(original ne clone)
       assert(clone.getCurrentDatabase == db1)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ea361165/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
index 9817283..2cdfb7a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
@@ -30,9 +30,17 @@ class SparkOptimizer(
     experimentalMethods: ExperimentalMethods)
   extends Optimizer(catalog, conf) {
 
-  override def batches: Seq[Batch] = super.batches :+
+  override def batches: Seq[Batch] = (super.batches :+
     Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog, conf)) :+
     Batch("Extract Python UDF from Aggregate", Once, ExtractPythonUDFFromAggregate) :+
-    Batch("Prune File Source Table Partitions", Once, PruneFileSourcePartitions) :+
+    Batch("Prune File Source Table Partitions", Once, PruneFileSourcePartitions)) ++
+    postHocOptimizationBatches :+
     Batch("User Provided Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*)
+
+  /**
+   * Optimization batches that are executed after the regular optimization batches, but before the
+   * batch executing the [[ExperimentalMethods]] optimizer rules. This hook can be used to add
+   * custom optimizer batches to the Spark optimizer.
+   */
+   def postHocOptimizationBatches: Seq[Batch] = Nil
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ea361165/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
index 6782416..6566502 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
@@ -27,13 +27,14 @@ import org.apache.spark.sql.internal.SQLConf
 class SparkPlanner(
     val sparkContext: SparkContext,
     val conf: SQLConf,
-    val extraStrategies: Seq[Strategy])
+    val experimentalMethods: ExperimentalMethods)
   extends SparkStrategies {
 
   def numPartitions: Int = conf.numShufflePartitions
 
   def strategies: Seq[Strategy] =
-      extraStrategies ++ (
+    experimentalMethods.extraStrategies ++
+      extraPlanningStrategies ++ (
       FileSourceStrategy ::
       DataSourceStrategy ::
       SpecialLimits ::
@@ -42,6 +43,12 @@ class SparkPlanner(
       InMemoryScans ::
       BasicOperators :: Nil)
 
+  /**
+   * Override to add extra planning strategies to the planner. These strategies are tried after
+   * the strategies defined in [[ExperimentalMethods]], and before the regular strategies.
+   */
+  def extraPlanningStrategies: Seq[Strategy] = Nil
+
   override protected def collectPlaceholders(plan: SparkPlan): Seq[(SparkPlan, LogicalPlan)] = {
     plan.collect {
       case placeholder @ PlanLater(logicalPlan) => placeholder -> logicalPlan

http://git-wip-us.apache.org/repos/asf/spark/blob/ea361165/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index 0f0e4a9..622e049 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -20,8 +20,8 @@ package org.apache.spark.sql.execution.streaming
 import java.util.concurrent.atomic.AtomicInteger
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.expressions.{CurrentBatchTimestamp, Literal}
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{SparkSession, Strategy}
+import org.apache.spark.sql.catalyst.expressions.CurrentBatchTimestamp
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SparkPlanner, UnaryExecNode}
@@ -40,20 +40,17 @@ class IncrementalExecution(
     offsetSeqMetadata: OffsetSeqMetadata)
   extends QueryExecution(sparkSession, logicalPlan) with Logging {
 
-  // TODO: make this always part of planning.
-  val streamingExtraStrategies =
-    sparkSession.sessionState.planner.StatefulAggregationStrategy +:
-    sparkSession.sessionState.planner.FlatMapGroupsWithStateStrategy +:
-    sparkSession.sessionState.planner.StreamingRelationStrategy +:
-    sparkSession.sessionState.planner.StreamingDeduplicationStrategy +:
-    sparkSession.sessionState.experimentalMethods.extraStrategies
-
   // Modified planner with stateful operations.
-  override def planner: SparkPlanner =
-    new SparkPlanner(
+  override val planner: SparkPlanner = new SparkPlanner(
       sparkSession.sparkContext,
       sparkSession.sessionState.conf,
-      streamingExtraStrategies)
+      sparkSession.sessionState.experimentalMethods) {
+    override def extraPlanningStrategies: Seq[Strategy] =
+      StatefulAggregationStrategy ::
+      FlatMapGroupsWithStateStrategy ::
+      StreamingRelationStrategy ::
+      StreamingDeduplicationStrategy :: Nil
+  }
 
   /**
    * See [SPARK-18339]

http://git-wip-us.apache.org/repos/asf/spark/blob/ea361165/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index ce80604..b5b0bb0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -22,22 +22,21 @@ import java.io.File
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.SparkContext
+import org.apache.spark.annotation.{Experimental, InterfaceStability}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.streaming.StreamingQueryManager
 import org.apache.spark.sql.util.ExecutionListenerManager
 
-
 /**
  * A class that holds all session-specific state in a given [[SparkSession]].
+ *
  * @param sparkContext The [[SparkContext]].
  * @param sharedState The shared state.
  * @param conf SQL-specific key-value configurations.
@@ -46,9 +45,11 @@ import org.apache.spark.sql.util.ExecutionListenerManager
  * @param catalog Internal catalog for managing table and database states.
  * @param sqlParser Parser that extracts expressions, plans, table identifiers etc. from SQL texts.
  * @param analyzer Logical query plan analyzer for resolving unresolved attributes and relations.
- * @param streamingQueryManager Interface to start and stop
- *                              [[org.apache.spark.sql.streaming.StreamingQuery]]s.
- * @param queryExecutionCreator Lambda to create a [[QueryExecution]] from a [[LogicalPlan]]
+ * @param optimizer Logical query plan optimizer.
+ * @param planner Planner that converts optimized logical plans to physical plans
+ * @param streamingQueryManager Interface to start and stop streaming queries.
+ * @param createQueryExecution Function used to create QueryExecution objects.
+ * @param createClone Function used to create clones of the session state.
  */
 private[sql] class SessionState(
     sparkContext: SparkContext,
@@ -59,8 +60,11 @@ private[sql] class SessionState(
     val catalog: SessionCatalog,
     val sqlParser: ParserInterface,
     val analyzer: Analyzer,
+    val optimizer: Optimizer,
+    val planner: SparkPlanner,
     val streamingQueryManager: StreamingQueryManager,
-    val queryExecutionCreator: LogicalPlan => QueryExecution) {
+    createQueryExecution: LogicalPlan => QueryExecution,
+    createClone: (SparkSession, SessionState) => SessionState) {
 
   def newHadoopConf(): Configuration = SessionState.newHadoopConf(
     sparkContext.hadoopConfiguration,
@@ -77,41 +81,12 @@ private[sql] class SessionState(
   }
 
   /**
-   * A class for loading resources specified by a function.
-   */
-  val functionResourceLoader: FunctionResourceLoader = {
-    new FunctionResourceLoader {
-      override def loadResource(resource: FunctionResource): Unit = {
-        resource.resourceType match {
-          case JarResource => addJar(resource.uri)
-          case FileResource => sparkContext.addFile(resource.uri)
-          case ArchiveResource =>
-            throw new AnalysisException(
-              "Archive is not allowed to be loaded. If YARN mode is used, " +
-                "please use --archives options while calling spark-submit.")
-        }
-      }
-    }
-  }
-
-  /**
    * Interface exposed to the user for registering user-defined functions.
    * Note that the user-defined functions must be deterministic.
    */
   val udf: UDFRegistration = new UDFRegistration(functionRegistry)
 
   /**
-   * Logical query plan optimizer.
-   */
-  val optimizer: Optimizer = new SparkOptimizer(catalog, conf, experimentalMethods)
-
-  /**
-   * Planner that converts optimized logical plans to physical plans.
-   */
-  def planner: SparkPlanner =
-    new SparkPlanner(sparkContext, conf, experimentalMethods.extraStrategies)
-
-  /**
    * An interface to register custom [[org.apache.spark.sql.util.QueryExecutionListener]]s
    * that listen for execution metrics.
    */
@@ -120,38 +95,13 @@ private[sql] class SessionState(
   /**
    * Get an identical copy of the `SessionState` and associate it with the given `SparkSession`
    */
-  def clone(newSparkSession: SparkSession): SessionState = {
-    val sparkContext = newSparkSession.sparkContext
-    val confCopy = conf.clone()
-    val functionRegistryCopy = functionRegistry.clone()
-    val sqlParser: ParserInterface = new SparkSqlParser(confCopy)
-    val catalogCopy = catalog.newSessionCatalogWith(
-      confCopy,
-      SessionState.newHadoopConf(sparkContext.hadoopConfiguration, confCopy),
-      functionRegistryCopy,
-      sqlParser)
-    val queryExecutionCreator = (plan: LogicalPlan) => new QueryExecution(newSparkSession, plan)
-
-    SessionState.mergeSparkConf(confCopy, sparkContext.getConf)
-
-    new SessionState(
-      sparkContext,
-      newSparkSession.sharedState,
-      confCopy,
-      experimentalMethods.clone(),
-      functionRegistryCopy,
-      catalogCopy,
-      sqlParser,
-      SessionState.createAnalyzer(newSparkSession, catalogCopy, confCopy),
-      new StreamingQueryManager(newSparkSession),
-      queryExecutionCreator)
-  }
+  def clone(newSparkSession: SparkSession): SessionState = createClone(newSparkSession, this)
 
   // ------------------------------------------------------
   //  Helper methods, partially leftover from pre-2.0 days
   // ------------------------------------------------------
 
-  def executePlan(plan: LogicalPlan): QueryExecution = queryExecutionCreator(plan)
+  def executePlan(plan: LogicalPlan): QueryExecution = createQueryExecution(plan)
 
   def refreshTable(tableName: String): Unit = {
     catalog.refreshTable(sqlParser.parseTableIdentifier(tableName))
@@ -179,53 +129,12 @@ private[sql] class SessionState(
   }
 }
 
-
 private[sql] object SessionState {
-
-  def apply(sparkSession: SparkSession): SessionState = {
-    apply(sparkSession, new SQLConf)
-  }
-
-  def apply(sparkSession: SparkSession, sqlConf: SQLConf): SessionState = {
-    val sparkContext = sparkSession.sparkContext
-
-    // Automatically extract all entries and put them in our SQLConf
-    mergeSparkConf(sqlConf, sparkContext.getConf)
-
-    val functionRegistry = FunctionRegistry.builtin.clone()
-
-    val sqlParser: ParserInterface = new SparkSqlParser(sqlConf)
-
-    val catalog = new SessionCatalog(
-      sparkSession.sharedState.externalCatalog,
-      sparkSession.sharedState.globalTempViewManager,
-      functionRegistry,
-      sqlConf,
-      newHadoopConf(sparkContext.hadoopConfiguration, sqlConf),
-      sqlParser)
-
-    val analyzer: Analyzer = createAnalyzer(sparkSession, catalog, sqlConf)
-
-    val streamingQueryManager: StreamingQueryManager = new StreamingQueryManager(sparkSession)
-
-    val queryExecutionCreator = (plan: LogicalPlan) => new QueryExecution(sparkSession, plan)
-
-    val sessionState = new SessionState(
-      sparkContext,
-      sparkSession.sharedState,
-      sqlConf,
-      new ExperimentalMethods,
-      functionRegistry,
-      catalog,
-      sqlParser,
-      analyzer,
-      streamingQueryManager,
-      queryExecutionCreator)
-    // functionResourceLoader needs to access SessionState.addJar, so it cannot be created before
-    // creating SessionState. Setting `catalog.functionResourceLoader` here is safe since the caller
-    // cannot use SessionCatalog before we return SessionState.
-    catalog.functionResourceLoader = sessionState.functionResourceLoader
-    sessionState
+  /**
+   * Create a new [[SessionState]] for the given session.
+   */
+  def apply(session: SparkSession): SessionState = {
+    new SessionStateBuilder(session).build()
   }
 
   def newHadoopConf(hadoopConf: Configuration, sqlConf: SQLConf): Configuration = {
@@ -233,34 +142,33 @@ private[sql] object SessionState {
     sqlConf.getAllConfs.foreach { case (k, v) => if (v ne null) newHadoopConf.set(k, v) }
     newHadoopConf
   }
+}
 
-  /**
-   * Create an logical query plan `Analyzer` with rules specific to a non-Hive `SessionState`.
-   */
-  private def createAnalyzer(
-      sparkSession: SparkSession,
-      catalog: SessionCatalog,
-      sqlConf: SQLConf): Analyzer = {
-    new Analyzer(catalog, sqlConf) {
-      override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
-        new FindDataSourceTable(sparkSession) ::
-        new ResolveSQLOnFile(sparkSession) :: Nil
-
-      override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
-        PreprocessTableCreation(sparkSession) ::
-        PreprocessTableInsertion(sqlConf) ::
-        DataSourceAnalysis(sqlConf) :: Nil
-
-      override val extendedCheckRules = Seq(PreWriteCheck, HiveOnlyCheck)
-    }
-  }
+/**
+ * Concrete implementation of a [[SessionStateBuilder]].
+ */
+@Experimental
+@InterfaceStability.Unstable
+class SessionStateBuilder(
+    session: SparkSession,
+    parentState: Option[SessionState] = None)
+  extends BaseSessionStateBuilder(session, parentState) {
+  override protected def newBuilder: NewBuilder = new SessionStateBuilder(_, _)
+}
 
-  /**
-   * Extract entries from `SparkConf` and put them in the `SQLConf`
-   */
-  def mergeSparkConf(sqlConf: SQLConf, sparkConf: SparkConf): Unit = {
-    sparkConf.getAll.foreach { case (k, v) =>
-      sqlConf.setConfString(k, v)
+/**
+ * Session shared [[FunctionResourceLoader]].
+ */
+@InterfaceStability.Unstable
+class SessionFunctionResourceLoader(session: SparkSession) extends FunctionResourceLoader {
+  override def loadResource(resource: FunctionResource): Unit = {
+    resource.resourceType match {
+      case JarResource => session.sessionState.addJar(resource.uri)
+      case FileResource => session.sparkContext.addFile(resource.uri)
+      case ArchiveResource =>
+        throw new AnalysisException(
+          "Archive is not allowed to be loaded. If YARN mode is used, " +
+            "please use --archives options while calling spark-submit.")
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ea361165/sql/core/src/main/scala/org/apache/spark/sql/internal/sessionStateBuilders.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/sessionStateBuilders.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/sessionStateBuilders.scala
new file mode 100644
index 0000000..6b5559a
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/sessionStateBuilders.scala
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.internal
+
+import org.apache.spark.SparkConf
+import org.apache.spark.annotation.{Experimental, InterfaceStability}
+import org.apache.spark.sql.{ExperimentalMethods, SparkSession, Strategy}
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import org.apache.spark.sql.catalyst.optimizer.Optimizer
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{QueryExecution, SparkOptimizer, SparkPlanner, SparkSqlParser}
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.streaming.StreamingQueryManager
+
+/**
+ * Builder class that coordinates construction of a new [[SessionState]].
+ *
+ * The builder explicitly defines all components needed by the session state, and creates a session
+ * state when `build` is called. Components should only be initialized once. This is not a problem
+ * for most components as they are only used in the `build` function. However some components
+ * (`conf`, `catalog`, `functionRegistry`, `experimentalMethods` & `sqlParser`) are as dependencies
+ * for other components and are shared as a result. These components are defined as lazy vals to
+ * make sure the component is created only once.
+ *
+ * A developer can modify the builder by providing custom versions of components, or by using the
+ * hooks provided for the analyzer, optimizer & planner. There are some dependencies between the
+ * components (they are documented per dependency), a developer should respect these when making
+ * modifications in order to prevent initialization problems.
+ *
+ * A parent [[SessionState]] can be used to initialize the new [[SessionState]]. The new session
+ * state will clone the parent sessions state's `conf`, `functionRegistry`, `experimentalMethods`
+ * and `catalog` fields. Note that the state is cloned when `build` is called, and not before.
+ */
+@Experimental
+@InterfaceStability.Unstable
+abstract class BaseSessionStateBuilder(
+    val session: SparkSession,
+    val parentState: Option[SessionState] = None) {
+  type NewBuilder = (SparkSession, Option[SessionState]) => BaseSessionStateBuilder
+
+  /**
+   * Function that produces a new instance of the SessionStateBuilder. This is used by the
+   * [[SessionState]]'s clone functionality. Make sure to override this when implementing your own
+   * [[SessionStateBuilder]].
+   */
+  protected def newBuilder: NewBuilder
+
+  /**
+   * Extract entries from `SparkConf` and put them in the `SQLConf`
+   */
+  protected def mergeSparkConf(sqlConf: SQLConf, sparkConf: SparkConf): Unit = {
+    sparkConf.getAll.foreach { case (k, v) =>
+      sqlConf.setConfString(k, v)
+    }
+  }
+
+  /**
+   * SQL-specific key-value configurations.
+   *
+   * These either get cloned from a pre-existing instance or newly created. The conf is always
+   * merged with its [[SparkConf]].
+   */
+  protected lazy val conf: SQLConf = {
+    val conf = parentState.map(_.conf.clone()).getOrElse(new SQLConf)
+    mergeSparkConf(conf, session.sparkContext.conf)
+    conf
+  }
+
+  /**
+   * Internal catalog managing functions registered by the user.
+   *
+   * This either gets cloned from a pre-existing version or cloned from the built-in registry.
+   */
+  protected lazy val functionRegistry: FunctionRegistry = {
+    parentState.map(_.functionRegistry).getOrElse(FunctionRegistry.builtin).clone()
+  }
+
+  /**
+   * Experimental methods that can be used to define custom optimization rules and custom planning
+   * strategies.
+   *
+   * This either gets cloned from a pre-existing version or newly created.
+   */
+  protected lazy val experimentalMethods: ExperimentalMethods = {
+    parentState.map(_.experimentalMethods.clone()).getOrElse(new ExperimentalMethods)
+  }
+
+  /**
+   * Parser that extracts expressions, plans, table identifiers etc. from SQL texts.
+   *
+   * Note: this depends on the `conf` field.
+   */
+  protected lazy val sqlParser: ParserInterface = new SparkSqlParser(conf)
+
+  /**
+   * Catalog for managing table and database states. If there is a pre-existing catalog, the state
+   * of that catalog (temp tables & current database) will be copied into the new catalog.
+   *
+   * Note: this depends on the `conf`, `functionRegistry` and `sqlParser` fields.
+   */
+  protected lazy val catalog: SessionCatalog = {
+    val catalog = new SessionCatalog(
+      session.sharedState.externalCatalog,
+      session.sharedState.globalTempViewManager,
+      functionRegistry,
+      conf,
+      SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
+      sqlParser,
+      new SessionFunctionResourceLoader(session))
+    parentState.foreach(_.catalog.copyStateTo(catalog))
+    catalog
+  }
+
+  /**
+   * Logical query plan analyzer for resolving unresolved attributes and relations.
+   *
+   * Note: this depends on the `conf` and `catalog` fields.
+   */
+  protected def analyzer: Analyzer = new Analyzer(catalog, conf) {
+    override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
+      new FindDataSourceTable(session) +:
+        new ResolveSQLOnFile(session) +:
+        customResolutionRules
+
+    override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
+      PreprocessTableCreation(session) +:
+        PreprocessTableInsertion(conf) +:
+        DataSourceAnalysis(conf) +:
+        customPostHocResolutionRules
+
+    override val extendedCheckRules: Seq[LogicalPlan => Unit] =
+      PreWriteCheck +:
+        HiveOnlyCheck +:
+        customCheckRules
+  }
+
+  /**
+   * Custom resolution rules to add to the Analyzer. Prefer overriding this instead of creating
+   * your own Analyzer.
+   *
+   * Note that this may NOT depend on the `analyzer` function.
+   */
+  protected def customResolutionRules: Seq[Rule[LogicalPlan]] = Nil
+
+  /**
+   * Custom post resolution rules to add to the Analyzer. Prefer overriding this instead of
+   * creating your own Analyzer.
+   *
+   * Note that this may NOT depend on the `analyzer` function.
+   */
+  protected def customPostHocResolutionRules: Seq[Rule[LogicalPlan]] = Nil
+
+  /**
+   * Custom check rules to add to the Analyzer. Prefer overriding this instead of creating
+   * your own Analyzer.
+   *
+   * Note that this may NOT depend on the `analyzer` function.
+   */
+  protected def customCheckRules: Seq[LogicalPlan => Unit] = Nil
+
+  /**
+   * Logical query plan optimizer.
+   *
+   * Note: this depends on the `conf`, `catalog` and `experimentalMethods` fields.
+   */
+  protected def optimizer: Optimizer = {
+    new SparkOptimizer(catalog, conf, experimentalMethods) {
+      override def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] =
+        super.extendedOperatorOptimizationRules ++ customOperatorOptimizationRules
+    }
+  }
+
+  /**
+   * Custom operator optimization rules to add to the Optimizer. Prefer overriding this instead
+   * of creating your own Optimizer.
+   *
+   * Note that this may NOT depend on the `optimizer` function.
+   */
+  protected def customOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = Nil
+
+  /**
+   * Planner that converts optimized logical plans to physical plans.
+   *
+   * Note: this depends on the `conf` and `experimentalMethods` fields.
+   */
+  protected def planner: SparkPlanner = {
+    new SparkPlanner(session.sparkContext, conf, experimentalMethods) {
+      override def extraPlanningStrategies: Seq[Strategy] =
+        super.extraPlanningStrategies ++ customPlanningStrategies
+    }
+  }
+
+  /**
+   * Custom strategies to add to the planner. Prefer overriding this instead of creating
+   * your own Planner.
+   *
+   * Note that this may NOT depend on the `planner` function.
+   */
+  protected def customPlanningStrategies: Seq[Strategy] = Nil
+
+  /**
+   * Create a query execution object.
+   */
+  protected def createQueryExecution: LogicalPlan => QueryExecution = { plan =>
+    new QueryExecution(session, plan)
+  }
+
+  /**
+   * Interface to start and stop streaming queries.
+   */
+  protected def streamingQueryManager: StreamingQueryManager = new StreamingQueryManager(session)
+
+  /**
+   * Function used to make clones of the session state.
+   */
+  protected def createClone: (SparkSession, SessionState) => SessionState = {
+    val createBuilder = newBuilder
+    (session, state) => createBuilder(session, Option(state)).build()
+  }
+
+  /**
+   * Build the [[SessionState]].
+   */
+  def build(): SessionState = {
+    new SessionState(
+      session.sparkContext,
+      session.sharedState,
+      conf,
+      experimentalMethods,
+      functionRegistry,
+      catalog,
+      sqlParser,
+      analyzer,
+      optimizer,
+      planner,
+      streamingQueryManager,
+      createQueryExecution,
+      createClone)
+  }
+}
+
+/**
+ * Helper class for using SessionStateBuilders during tests.
+ */
+private[sql] trait WithTestConf { self: BaseSessionStateBuilder =>
+  def overrideConfs: Map[String, String]
+
+  override protected lazy val conf: SQLConf = {
+    val conf = parentState.map(_.conf.clone()).getOrElse {
+      new SQLConf {
+        clear()
+        override def clear(): Unit = {
+          super.clear()
+          // Make sure we start with the default test configs even after clear
+          overrideConfs.foreach { case (key, value) => setConfString(key, value) }
+        }
+      }
+    }
+    mergeSparkConf(conf, session.sparkContext.conf)
+    conf
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ea361165/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
index 898a2fb..b01977a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.test
 
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.internal.{SessionState, SQLConf}
+import org.apache.spark.sql.internal.{SessionState, SessionStateBuilder, SQLConf, WithTestConf}
 
 /**
  * A special [[SparkSession]] prepared for testing.
@@ -35,16 +35,9 @@ private[sql] class TestSparkSession(sc: SparkContext) extends SparkSession(sc) {
   }
 
   @transient
-  override lazy val sessionState: SessionState = SessionState(
-    this,
-    new SQLConf {
-      clear()
-      override def clear(): Unit = {
-        super.clear()
-        // Make sure we start with the default test configs even after clear
-        TestSQLContext.overrideConfs.foreach { case (key, value) => setConfString(key, value) }
-      }
-    })
+  override lazy val sessionState: SessionState = {
+    new TestSQLSessionStateBuilder(this, None).build()
+  }
 
   // Needed for Java tests
   def loadTestData(): Unit = {
@@ -67,3 +60,11 @@ private[sql] object TestSQLContext {
       // Fewer shuffle partitions to speed up testing.
       SQLConf.SHUFFLE_PARTITIONS.key -> "5")
 }
+
+private[sql] class TestSQLSessionStateBuilder(
+    session: SparkSession,
+    state: Option[SessionState])
+  extends SessionStateBuilder(session, state) with WithTestConf {
+  override def overrideConfs: Map[String, String] = TestSQLContext.overrideConfs
+  override def newBuilder: NewBuilder = new TestSQLSessionStateBuilder(_, _)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ea361165/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index 6b7599e..2cc20a7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -25,8 +25,8 @@ import org.apache.hadoop.hive.ql.exec.{UDAF, UDF}
 import org.apache.hadoop.hive.ql.exec.{FunctionRegistry => HiveFunctionRegistry}
 import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, GenericUDF, GenericUDTF}
 
-import org.apache.spark.sql.{AnalysisException, SparkSession}
-import org.apache.spark.sql.catalyst.{CatalystConf, FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
 import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
@@ -47,14 +47,16 @@ private[sql] class HiveSessionCatalog(
     functionRegistry: FunctionRegistry,
     conf: SQLConf,
     hadoopConf: Configuration,
-    parser: ParserInterface)
+    parser: ParserInterface,
+    functionResourceLoader: FunctionResourceLoader)
   extends SessionCatalog(
       externalCatalog,
       globalTempViewManager,
       functionRegistry,
       conf,
       hadoopConf,
-      parser) {
+      parser,
+      functionResourceLoader) {
 
   // ----------------------------------------------------------------
   // | Methods and fields for interacting with HiveMetastoreCatalog |
@@ -69,47 +71,6 @@ private[sql] class HiveSessionCatalog(
     metastoreCatalog.hiveDefaultTableFilePath(name)
   }
 
-  /**
-   * Create a new [[HiveSessionCatalog]] with the provided parameters. `externalCatalog` and
-   * `globalTempViewManager` are `inherited`, while `currentDb` and `tempTables` are copied.
-   */
-  def newSessionCatalogWith(
-      newSparkSession: SparkSession,
-      conf: SQLConf,
-      hadoopConf: Configuration,
-      functionRegistry: FunctionRegistry,
-      parser: ParserInterface): HiveSessionCatalog = {
-    val catalog = HiveSessionCatalog(
-      newSparkSession,
-      functionRegistry,
-      conf,
-      hadoopConf,
-      parser)
-
-    synchronized {
-      catalog.currentDb = currentDb
-      // copy over temporary tables
-      tempTables.foreach(kv => catalog.tempTables.put(kv._1, kv._2))
-    }
-
-    catalog
-  }
-
-  /**
-   * The parent class [[SessionCatalog]] cannot access the [[SparkSession]] class, so we cannot add
-   * a [[SparkSession]] parameter to [[SessionCatalog.newSessionCatalogWith]]. However,
-   * [[HiveSessionCatalog]] requires a [[SparkSession]] parameter, so we can a new version of
-   * `newSessionCatalogWith` and disable this one.
-   *
-   * TODO Refactor HiveSessionCatalog to not use [[SparkSession]] directly.
-   */
-  override def newSessionCatalogWith(
-      conf: CatalystConf,
-      hadoopConf: Configuration,
-      functionRegistry: FunctionRegistry,
-      parser: ParserInterface): HiveSessionCatalog = throw new UnsupportedOperationException(
-    "to clone HiveSessionCatalog, use the other clone method that also accepts a SparkSession")
-
   // For testing only
   private[hive] def getCachedDataSourceTable(table: TableIdentifier): LogicalPlan = {
     val key = metastoreCatalog.getQualifiedTableName(table)
@@ -250,28 +211,3 @@ private[sql] class HiveSessionCatalog(
     "histogram_numeric"
   )
 }
-
-private[sql] object HiveSessionCatalog {
-
-  def apply(
-      sparkSession: SparkSession,
-      functionRegistry: FunctionRegistry,
-      conf: SQLConf,
-      hadoopConf: Configuration,
-      parser: ParserInterface): HiveSessionCatalog = {
-    // Catalog for handling data source tables. TODO: This really doesn't belong here since it is
-    // essentially a cache for metastore tables. However, it relies on a lot of session-specific
-    // things so it would be a lot of work to split its functionality between HiveSessionCatalog
-    // and HiveCatalog. We should still do it at some point...
-    val metastoreCatalog = new HiveMetastoreCatalog(sparkSession)
-
-    new HiveSessionCatalog(
-      sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog],
-      sparkSession.sharedState.globalTempViewManager,
-      metastoreCatalog,
-      functionRegistry,
-      conf,
-      hadoopConf,
-      parser)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/ea361165/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index cb8bcb8..49ff847 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -18,20 +18,23 @@
 package org.apache.spark.sql.hive
 
 import org.apache.spark.SparkContext
+import org.apache.spark.annotation.{Experimental, InterfaceStability}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
+import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.{QueryExecution, SparkPlanner, SparkSqlParser}
+import org.apache.spark.sql.execution.{QueryExecution, SparkPlanner}
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.hive.client.HiveClient
-import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf}
+import org.apache.spark.sql.internal.{BaseSessionStateBuilder, SessionFunctionResourceLoader, SessionState, SharedState, SQLConf}
 import org.apache.spark.sql.streaming.StreamingQueryManager
 
 
 /**
  * A class that holds all session-specific state in a given [[SparkSession]] backed by Hive.
+ *
  * @param sparkContext The [[SparkContext]].
  * @param sharedState The shared state.
  * @param conf SQL-specific key-value configurations.
@@ -40,12 +43,14 @@ import org.apache.spark.sql.streaming.StreamingQueryManager
  * @param catalog Internal catalog for managing table and database states that uses Hive client for
  *                interacting with the metastore.
  * @param sqlParser Parser that extracts expressions, plans, table identifiers etc. from SQL texts.
- * @param metadataHive The Hive metadata client.
  * @param analyzer Logical query plan analyzer for resolving unresolved attributes and relations.
- * @param streamingQueryManager Interface to start and stop
- *                              [[org.apache.spark.sql.streaming.StreamingQuery]]s.
- * @param queryExecutionCreator Lambda to create a [[QueryExecution]] from a [[LogicalPlan]]
- * @param plannerCreator Lambda to create a planner that takes into account Hive-specific strategies
+ * @param optimizer Logical query plan optimizer.
+ * @param planner Planner that converts optimized logical plans to physical plans and that takes
+ *                Hive-specific strategies into account.
+ * @param streamingQueryManager Interface to start and stop streaming queries.
+ * @param createQueryExecution Function used to create QueryExecution objects.
+ * @param createClone Function used to create clones of the session state.
+ * @param metadataHive The Hive metadata client.
  */
 private[hive] class HiveSessionState(
     sparkContext: SparkContext,
@@ -55,11 +60,13 @@ private[hive] class HiveSessionState(
     functionRegistry: FunctionRegistry,
     override val catalog: HiveSessionCatalog,
     sqlParser: ParserInterface,
-    val metadataHive: HiveClient,
     analyzer: Analyzer,
+    optimizer: Optimizer,
+    planner: SparkPlanner,
     streamingQueryManager: StreamingQueryManager,
-    queryExecutionCreator: LogicalPlan => QueryExecution,
-    val plannerCreator: () => SparkPlanner)
+    createQueryExecution: LogicalPlan => QueryExecution,
+    createClone: (SparkSession, SessionState) => SessionState,
+    val metadataHive: HiveClient)
   extends SessionState(
       sparkContext,
       sharedState,
@@ -69,14 +76,11 @@ private[hive] class HiveSessionState(
       catalog,
       sqlParser,
       analyzer,
+      optimizer,
+      planner,
       streamingQueryManager,
-      queryExecutionCreator) { self =>
-
-  /**
-   * Planner that takes into account Hive-specific strategies.
-   */
-  override def planner: SparkPlanner = plannerCreator()
-
+      createQueryExecution,
+      createClone) {
 
   // ------------------------------------------------------
   //  Helper methods, partially leftover from pre-2.0 days
@@ -121,150 +125,115 @@ private[hive] class HiveSessionState(
   def hiveThriftServerAsync: Boolean = {
     conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
   }
+}
 
+private[hive] object HiveSessionState {
   /**
-   * Get an identical copy of the `HiveSessionState`.
-   * This should ideally reuse the `SessionState.clone` but cannot do so.
-   * Doing that will throw an exception when trying to clone the catalog.
+   * Create a new [[HiveSessionState]] for the given session.
    */
-  override def clone(newSparkSession: SparkSession): HiveSessionState = {
-    val sparkContext = newSparkSession.sparkContext
-    val confCopy = conf.clone()
-    val functionRegistryCopy = functionRegistry.clone()
-    val experimentalMethodsCopy = experimentalMethods.clone()
-    val sqlParser: ParserInterface = new SparkSqlParser(confCopy)
-    val catalogCopy = catalog.newSessionCatalogWith(
-      newSparkSession,
-      confCopy,
-      SessionState.newHadoopConf(sparkContext.hadoopConfiguration, confCopy),
-      functionRegistryCopy,
-      sqlParser)
-    val queryExecutionCreator = (plan: LogicalPlan) => new QueryExecution(newSparkSession, plan)
-
-    val hiveClient =
-      newSparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
-        .newSession()
-
-    SessionState.mergeSparkConf(confCopy, sparkContext.getConf)
-
-    new HiveSessionState(
-      sparkContext,
-      newSparkSession.sharedState,
-      confCopy,
-      experimentalMethodsCopy,
-      functionRegistryCopy,
-      catalogCopy,
-      sqlParser,
-      hiveClient,
-      HiveSessionState.createAnalyzer(newSparkSession, catalogCopy, confCopy),
-      new StreamingQueryManager(newSparkSession),
-      queryExecutionCreator,
-      HiveSessionState.createPlannerCreator(
-        newSparkSession,
-        confCopy,
-        experimentalMethodsCopy))
+  def apply(session: SparkSession): HiveSessionState = {
+    new HiveSessionStateBuilder(session).build()
   }
-
 }
 
-private[hive] object HiveSessionState {
-
-  def apply(sparkSession: SparkSession): HiveSessionState = {
-    apply(sparkSession, new SQLConf)
-  }
-
-  def apply(sparkSession: SparkSession, conf: SQLConf): HiveSessionState = {
-    val initHelper = SessionState(sparkSession, conf)
-
-    val sparkContext = sparkSession.sparkContext
-
-    val catalog = HiveSessionCatalog(
-      sparkSession,
-      initHelper.functionRegistry,
-      initHelper.conf,
-      SessionState.newHadoopConf(sparkContext.hadoopConfiguration, initHelper.conf),
-      initHelper.sqlParser)
-
-    val metadataHive: HiveClient =
-      sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
-        .newSession()
-
-    val analyzer: Analyzer = createAnalyzer(sparkSession, catalog, initHelper.conf)
+/**
+ * Builder that produces a [[HiveSessionState]].
+ */
+@Experimental
+@InterfaceStability.Unstable
+class HiveSessionStateBuilder(session: SparkSession, parentState: Option[SessionState] = None)
+  extends BaseSessionStateBuilder(session, parentState) {
 
-    val plannerCreator = createPlannerCreator(
-      sparkSession,
-      initHelper.conf,
-      initHelper.experimentalMethods)
+  private def externalCatalog: HiveExternalCatalog =
+    session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
 
-    val hiveSessionState = new HiveSessionState(
-      sparkContext,
-      sparkSession.sharedState,
-      initHelper.conf,
-      initHelper.experimentalMethods,
-      initHelper.functionRegistry,
-      catalog,
-      initHelper.sqlParser,
-      metadataHive,
-      analyzer,
-      initHelper.streamingQueryManager,
-      initHelper.queryExecutionCreator,
-      plannerCreator)
-    catalog.functionResourceLoader = hiveSessionState.functionResourceLoader
-    hiveSessionState
+  /**
+   * Create a [[HiveSessionCatalog]].
+   */
+  override protected lazy val catalog: HiveSessionCatalog = {
+    val catalog = new HiveSessionCatalog(
+      externalCatalog,
+      session.sharedState.globalTempViewManager,
+      new HiveMetastoreCatalog(session),
+      functionRegistry,
+      conf,
+      SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
+      sqlParser,
+      new SessionFunctionResourceLoader(session))
+    parentState.foreach(_.catalog.copyStateTo(catalog))
+    catalog
   }
 
   /**
-   * Create an logical query plan `Analyzer` with rules specific to a `HiveSessionState`.
+   * A logical query plan `Analyzer` with rules specific to Hive.
    */
-  private def createAnalyzer(
-      sparkSession: SparkSession,
-      catalog: HiveSessionCatalog,
-      sqlConf: SQLConf): Analyzer = {
-    new Analyzer(catalog, sqlConf) {
-      override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
-        new ResolveHiveSerdeTable(sparkSession) ::
-        new FindDataSourceTable(sparkSession) ::
-        new ResolveSQLOnFile(sparkSession) :: Nil
-
-      override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
-        new DetermineTableStats(sparkSession) ::
-        catalog.ParquetConversions ::
-        catalog.OrcConversions ::
-        PreprocessTableCreation(sparkSession) ::
-        PreprocessTableInsertion(sqlConf) ::
-        DataSourceAnalysis(sqlConf) ::
-        HiveAnalysis :: Nil
+  override protected def analyzer: Analyzer = new Analyzer(catalog, conf) {
+    override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
+      new ResolveHiveSerdeTable(session) +:
+      new FindDataSourceTable(session) +:
+      new ResolveSQLOnFile(session) +:
+      customResolutionRules
+
+    override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
+      new DetermineTableStats(session) +:
+      catalog.ParquetConversions +:
+      catalog.OrcConversions +:
+      PreprocessTableCreation(session) +:
+      PreprocessTableInsertion(conf) +:
+      DataSourceAnalysis(conf) +:
+      HiveAnalysis +:
+      customPostHocResolutionRules
+
+    override val extendedCheckRules: Seq[LogicalPlan => Unit] =
+      PreWriteCheck +:
+      customCheckRules
+  }
 
-      override val extendedCheckRules = Seq(PreWriteCheck)
+  /**
+   * Planner that takes into account Hive-specific strategies.
+   */
+  override protected def planner: SparkPlanner = {
+    new SparkPlanner(session.sparkContext, conf, experimentalMethods) with HiveStrategies {
+      override val sparkSession: SparkSession = session
+
+      override def extraPlanningStrategies: Seq[Strategy] =
+        super.extraPlanningStrategies ++ customPlanningStrategies
+
+      override def strategies: Seq[Strategy] = {
+        experimentalMethods.extraStrategies ++
+          extraPlanningStrategies ++ Seq(
+          FileSourceStrategy,
+          DataSourceStrategy,
+          SpecialLimits,
+          InMemoryScans,
+          HiveTableScans,
+          Scripts,
+          Aggregation,
+          JoinSelection,
+          BasicOperators
+        )
+      }
     }
   }
 
-  private def createPlannerCreator(
-      associatedSparkSession: SparkSession,
-      sqlConf: SQLConf,
-      experimentalMethods: ExperimentalMethods): () => SparkPlanner = {
-    () =>
-      new SparkPlanner(
-          associatedSparkSession.sparkContext,
-          sqlConf,
-          experimentalMethods.extraStrategies)
-        with HiveStrategies {
-
-        override val sparkSession: SparkSession = associatedSparkSession
+  override protected def newBuilder: NewBuilder = new HiveSessionStateBuilder(_, _)
 
-        override def strategies: Seq[Strategy] = {
-          experimentalMethods.extraStrategies ++ Seq(
-            FileSourceStrategy,
-            DataSourceStrategy,
-            SpecialLimits,
-            InMemoryScans,
-            HiveTableScans,
-            Scripts,
-            Aggregation,
-            JoinSelection,
-            BasicOperators
-          )
-        }
-      }
+  override def build(): HiveSessionState = {
+    val metadataHive: HiveClient = externalCatalog.client.newSession()
+    new HiveSessionState(
+      session.sparkContext,
+      session.sharedState,
+      conf,
+      experimentalMethods,
+      functionRegistry,
+      catalog,
+      sqlParser,
+      analyzer,
+      optimizer,
+      planner,
+      streamingQueryManager,
+      createQueryExecution,
+      createClone,
+      metadataHive)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ea361165/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index b63ed76..32ca696 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -40,7 +40,7 @@ import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.command.CacheTableCommand
 import org.apache.spark.sql.hive._
 import org.apache.spark.sql.hive.client.HiveClient
-import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf}
+import org.apache.spark.sql.internal._
 import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
 import org.apache.spark.util.{ShutdownHookManager, Utils}
 
@@ -148,12 +148,14 @@ class TestHiveContext(
  *
  * @param sc SparkContext
  * @param existingSharedState optional [[SharedState]]
+ * @param parentSessionState optional parent [[SessionState]]
  * @param loadTestTables if true, load the test tables. They can only be loaded when running
  *                       in the JVM, i.e when calling from Python this flag has to be false.
  */
 private[hive] class TestHiveSparkSession(
     @transient private val sc: SparkContext,
     @transient private val existingSharedState: Option[TestHiveSharedState],
+    @transient private val parentSessionState: Option[HiveSessionState],
     private val loadTestTables: Boolean)
   extends SparkSession(sc) with Logging { self =>
 
@@ -161,6 +163,7 @@ private[hive] class TestHiveSparkSession(
     this(
       sc,
       existingSharedState = None,
+      parentSessionState = None,
       loadTestTables)
   }
 
@@ -168,6 +171,7 @@ private[hive] class TestHiveSparkSession(
     this(
       sc,
       existingSharedState = Some(new TestHiveSharedState(sc, Some(hiveClient))),
+      parentSessionState = None,
       loadTestTables)
   }
 
@@ -192,36 +196,21 @@ private[hive] class TestHiveSparkSession(
 
   @transient
   override lazy val sessionState: HiveSessionState = {
-    val testConf =
-      new SQLConf {
-        clear()
-        override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false)
-        override def clear(): Unit = {
-          super.clear()
-          TestHiveContext.overrideConfs.foreach { case (k, v) => setConfString(k, v) }
-        }
-      }
-    val queryExecutionCreator = (plan: LogicalPlan) => new TestHiveQueryExecution(this, plan)
-    val initHelper = HiveSessionState(this, testConf)
-    SessionState.mergeSparkConf(testConf, sparkContext.getConf)
-
-    new HiveSessionState(
-      sparkContext,
-      sharedState,
-      testConf,
-      initHelper.experimentalMethods,
-      initHelper.functionRegistry,
-      initHelper.catalog,
-      initHelper.sqlParser,
-      initHelper.metadataHive,
-      initHelper.analyzer,
-      initHelper.streamingQueryManager,
-      queryExecutionCreator,
-      initHelper.plannerCreator)
+    new TestHiveSessionStateBuilder(this, parentSessionState).build()
   }
 
   override def newSession(): TestHiveSparkSession = {
-    new TestHiveSparkSession(sc, Some(sharedState), loadTestTables)
+    new TestHiveSparkSession(sc, Some(sharedState), None, loadTestTables)
+  }
+
+  override def cloneSession(): SparkSession = {
+    val result = new TestHiveSparkSession(
+      sparkContext,
+      Some(sharedState),
+      Some(sessionState),
+      loadTestTables)
+    result.sessionState // force copy of SessionState
+    result
   }
 
   private var cacheTables: Boolean = false
@@ -595,3 +584,18 @@ private[hive] object TestHiveContext {
   }
 
 }
+
+private[sql] class TestHiveSessionStateBuilder(
+    session: SparkSession,
+    state: Option[SessionState])
+  extends HiveSessionStateBuilder(session, state)
+  with WithTestConf {
+
+  override def overrideConfs: Map[String, String] = TestHiveContext.overrideConfs
+
+  override def createQueryExecution: (LogicalPlan) => QueryExecution = { plan =>
+    new TestHiveQueryExecution(session.asInstanceOf[TestHiveSparkSession], plan)
+  }
+
+  override protected def newBuilder: NewBuilder = new TestHiveSessionStateBuilder(_, _)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ea361165/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionCatalogSuite.scala
deleted file mode 100644
index 3b0f59b..0000000
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionCatalogSuite.scala
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import java.net.URI
-
-import org.apache.hadoop.conf.Configuration
-
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry
-import org.apache.spark.sql.catalyst.catalog.CatalogDatabase
-import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
-import org.apache.spark.sql.catalyst.plans.logical.Range
-import org.apache.spark.sql.hive.test.TestHiveSingleton
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.util.Utils
-
-class HiveSessionCatalogSuite extends TestHiveSingleton {
-
-  test("clone HiveSessionCatalog") {
-    val original = spark.sessionState.catalog.asInstanceOf[HiveSessionCatalog]
-
-    val tempTableName1 = "copytest1"
-    val tempTableName2 = "copytest2"
-    try {
-      val tempTable1 = Range(1, 10, 1, 10)
-      original.createTempView(tempTableName1, tempTable1, overrideIfExists = false)
-
-      // check if tables copied over
-      val clone = original.newSessionCatalogWith(
-        spark,
-        new SQLConf,
-        new Configuration(),
-        new SimpleFunctionRegistry,
-        CatalystSqlParser)
-      assert(original ne clone)
-      assert(clone.getTempView(tempTableName1) == Some(tempTable1))
-
-      // check if clone and original independent
-      clone.dropTable(TableIdentifier(tempTableName1), ignoreIfNotExists = false, purge = false)
-      assert(original.getTempView(tempTableName1) == Some(tempTable1))
-
-      val tempTable2 = Range(1, 20, 2, 10)
-      original.createTempView(tempTableName2, tempTable2, overrideIfExists = false)
-      assert(clone.getTempView(tempTableName2).isEmpty)
-    } finally {
-      // Drop the created temp views from the global singleton HiveSession.
-      original.dropTable(TableIdentifier(tempTableName1), ignoreIfNotExists = true, purge = true)
-      original.dropTable(TableIdentifier(tempTableName2), ignoreIfNotExists = true, purge = true)
-    }
-  }
-
-  test("clone SessionCatalog - current db") {
-    val original = spark.sessionState.catalog.asInstanceOf[HiveSessionCatalog]
-    val originalCurrentDatabase = original.getCurrentDatabase
-    val db1 = "db1"
-    val db2 = "db2"
-    val db3 = "db3"
-    try {
-      original.createDatabase(newDb(db1), ignoreIfExists = true)
-      original.createDatabase(newDb(db2), ignoreIfExists = true)
-      original.createDatabase(newDb(db3), ignoreIfExists = true)
-
-      original.setCurrentDatabase(db1)
-
-      // check if tables copied over
-      val clone = original.newSessionCatalogWith(
-        spark,
-        new SQLConf,
-        new Configuration(),
-        new SimpleFunctionRegistry,
-        CatalystSqlParser)
-
-      // check if current db copied over
-      assert(original ne clone)
-      assert(clone.getCurrentDatabase == db1)
-
-      // check if clone and original independent
-      clone.setCurrentDatabase(db2)
-      assert(original.getCurrentDatabase == db1)
-      original.setCurrentDatabase(db3)
-      assert(clone.getCurrentDatabase == db2)
-    } finally {
-      // Drop the created databases from the global singleton HiveSession.
-      original.dropDatabase(db1, ignoreIfNotExists = true, cascade = true)
-      original.dropDatabase(db2, ignoreIfNotExists = true, cascade = true)
-      original.dropDatabase(db3, ignoreIfNotExists = true, cascade = true)
-      original.setCurrentDatabase(originalCurrentDatabase)
-    }
-  }
-
-  def newUriForDatabase(): URI = new URI(Utils.createTempDir().toURI.toString.stripSuffix("/"))
-
-  def newDb(name: String): CatalogDatabase = {
-    CatalogDatabase(name, name + " description", newUriForDatabase(), Map.empty)
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org