You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/03/28 15:14:40 UTC

spark git commit: [SPARK-20126][SQL] Remove HiveSessionState

Repository: spark
Updated Branches:
  refs/heads/master 4fcc214d9 -> f82461fc1


[SPARK-20126][SQL] Remove HiveSessionState

## What changes were proposed in this pull request?
Commit https://github.com/apache/spark/commit/ea361165e1ddce4d8aa0242ae3e878d7b39f1de2 moved most of the logic from the SessionState classes into an accompanying builder. This makes the existence of the `HiveSessionState` redundant. This PR removes the `HiveSessionState`.

## How was this patch tested?
Existing tests.

Author: Herman van Hovell <hv...@databricks.com>

Closes #17457 from hvanhovell/SPARK-20126.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f82461fc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f82461fc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f82461fc

Branch: refs/heads/master
Commit: f82461fc1197f6055d9cf972d82260b178e10a7c
Parents: 4fcc214
Author: Herman van Hovell <hv...@databricks.com>
Authored: Tue Mar 28 23:14:31 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Tue Mar 28 23:14:31 2017 +0800

----------------------------------------------------------------------
 .../spark/sql/execution/command/resources.scala |   2 +-
 .../spark/sql/internal/SessionState.scala       |  47 +++---
 .../sql/internal/sessionStateBuilders.scala     |   8 +-
 .../sql/hive/thriftserver/SparkSQLEnv.scala     |  12 +-
 .../server/SparkSQLOperationManager.scala       |   6 +-
 .../hive/execution/HiveCompatibilitySuite.scala |   2 +-
 .../org/apache/spark/sql/hive/HiveContext.scala |   4 -
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |   9 +-
 .../spark/sql/hive/HiveSessionState.scala       | 144 +++----------------
 .../apache/spark/sql/hive/test/TestHive.scala   |  23 ++-
 .../sql/hive/HiveMetastoreCatalogSuite.scala    |   6 +-
 .../sql/hive/MetastoreDataSourcesSuite.scala    |   7 +-
 .../spark/sql/hive/execution/HiveDDLSuite.scala |   6 +-
 .../apache/spark/sql/hive/parquetSuites.scala   |  21 +--
 14 files changed, 104 insertions(+), 193 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f82461fc/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala
index 20b0894..2e859cf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala
@@ -37,7 +37,7 @@ case class AddJarCommand(path: String) extends RunnableCommand {
   }
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
-    sparkSession.sessionState.addJar(path)
+    sparkSession.sessionState.resourceLoader.addJar(path)
     Seq(Row(0))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f82461fc/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index b5b0bb0..c6241d9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -63,6 +63,7 @@ private[sql] class SessionState(
     val optimizer: Optimizer,
     val planner: SparkPlanner,
     val streamingQueryManager: StreamingQueryManager,
+    val resourceLoader: SessionResourceLoader,
     createQueryExecution: LogicalPlan => QueryExecution,
     createClone: (SparkSession, SessionState) => SessionState) {
 
@@ -106,27 +107,6 @@ private[sql] class SessionState(
   def refreshTable(tableName: String): Unit = {
     catalog.refreshTable(sqlParser.parseTableIdentifier(tableName))
   }
-
-  /**
-   * Add a jar path to [[SparkContext]] and the classloader.
-   *
-   * Note: this method seems not access any session state, but the subclass `HiveSessionState` needs
-   * to add the jar to its hive client for the current session. Hence, it still needs to be in
-   * [[SessionState]].
-   */
-  def addJar(path: String): Unit = {
-    sparkContext.addJar(path)
-    val uri = new Path(path).toUri
-    val jarURL = if (uri.getScheme == null) {
-      // `path` is a local file path without a URL scheme
-      new File(path).toURI.toURL
-    } else {
-      // `path` is a URL with a scheme
-      uri.toURL
-    }
-    sharedState.jarClassLoader.addURL(jarURL)
-    Thread.currentThread().setContextClassLoader(sharedState.jarClassLoader)
-  }
 }
 
 private[sql] object SessionState {
@@ -160,10 +140,10 @@ class SessionStateBuilder(
  * Session shared [[FunctionResourceLoader]].
  */
 @InterfaceStability.Unstable
-class SessionFunctionResourceLoader(session: SparkSession) extends FunctionResourceLoader {
+class SessionResourceLoader(session: SparkSession) extends FunctionResourceLoader {
   override def loadResource(resource: FunctionResource): Unit = {
     resource.resourceType match {
-      case JarResource => session.sessionState.addJar(resource.uri)
+      case JarResource => addJar(resource.uri)
       case FileResource => session.sparkContext.addFile(resource.uri)
       case ArchiveResource =>
         throw new AnalysisException(
@@ -171,4 +151,25 @@ class SessionFunctionResourceLoader(session: SparkSession) extends FunctionResou
             "please use --archives options while calling spark-submit.")
     }
   }
+
+  /**
+   * Add a jar path to [[SparkContext]] and the classloader.
+   *
+   * Note: this method seems not access any session state, but the subclass `HiveSessionState` needs
+   * to add the jar to its hive client for the current session. Hence, it still needs to be in
+   * [[SessionState]].
+   */
+  def addJar(path: String): Unit = {
+    session.sparkContext.addJar(path)
+    val uri = new Path(path).toUri
+    val jarURL = if (uri.getScheme == null) {
+      // `path` is a local file path without a URL scheme
+      new File(path).toURI.toURL
+    } else {
+      // `path` is a URL with a scheme
+      uri.toURL
+    }
+    session.sharedState.jarClassLoader.addURL(jarURL)
+    Thread.currentThread().setContextClassLoader(session.sharedState.jarClassLoader)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f82461fc/sql/core/src/main/scala/org/apache/spark/sql/internal/sessionStateBuilders.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/sessionStateBuilders.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/sessionStateBuilders.scala
index 6b5559a..b8f645f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/sessionStateBuilders.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/sessionStateBuilders.scala
@@ -110,6 +110,11 @@ abstract class BaseSessionStateBuilder(
   protected lazy val sqlParser: ParserInterface = new SparkSqlParser(conf)
 
   /**
+   * ResourceLoader that is used to load function resources and jars.
+   */
+  protected lazy val resourceLoader: SessionResourceLoader = new SessionResourceLoader(session)
+
+  /**
    * Catalog for managing table and database states. If there is a pre-existing catalog, the state
    * of that catalog (temp tables & current database) will be copied into the new catalog.
    *
@@ -123,7 +128,7 @@ abstract class BaseSessionStateBuilder(
       conf,
       SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
       sqlParser,
-      new SessionFunctionResourceLoader(session))
+      resourceLoader)
     parentState.foreach(_.catalog.copyStateTo(catalog))
     catalog
   }
@@ -251,6 +256,7 @@ abstract class BaseSessionStateBuilder(
       optimizer,
       planner,
       streamingQueryManager,
+      resourceLoader,
       createQueryExecution,
       createClone)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/f82461fc/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
index c0b2994..01c4eb1 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
@@ -22,7 +22,7 @@ import java.io.PrintStream
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{SparkSession, SQLContext}
-import org.apache.spark.sql.hive.{HiveSessionState, HiveUtils}
+import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
 import org.apache.spark.util.Utils
 
 /** A singleton object for the master program. The slaves should not access this. */
@@ -49,10 +49,12 @@ private[hive] object SparkSQLEnv extends Logging {
       sparkContext = sparkSession.sparkContext
       sqlContext = sparkSession.sqlContext
 
-      val sessionState = sparkSession.sessionState.asInstanceOf[HiveSessionState]
-      sessionState.metadataHive.setOut(new PrintStream(System.out, true, "UTF-8"))
-      sessionState.metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8"))
-      sessionState.metadataHive.setError(new PrintStream(System.err, true, "UTF-8"))
+      val metadataHive = sparkSession
+        .sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
+        .client.newSession()
+      metadataHive.setOut(new PrintStream(System.out, true, "UTF-8"))
+      metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8"))
+      metadataHive.setError(new PrintStream(System.err, true, "UTF-8"))
       sparkSession.conf.set("spark.sql.hive.version", HiveUtils.hiveExecutionVersion)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/f82461fc/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
index 49ab664..a0e5012 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
@@ -26,7 +26,7 @@ import org.apache.hive.service.cli.session.HiveSession
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.hive.HiveSessionState
+import org.apache.spark.sql.hive.HiveUtils
 import org.apache.spark.sql.hive.thriftserver.{ReflectionUtils, SparkExecuteStatementOperation}
 
 /**
@@ -49,8 +49,8 @@ private[thriftserver] class SparkSQLOperationManager()
     val sqlContext = sessionToContexts.get(parentSession.getSessionHandle)
     require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" +
       s" initialized or had already closed.")
-    val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
-    val runInBackground = async && sessionState.hiveThriftServerAsync
+    val conf = sqlContext.sessionState.conf
+    val runInBackground = async && conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
     val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay,
       runInBackground)(sqlContext, sessionToActivePool)
     handleToOperation.put(operation.getHandle, operation)

http://git-wip-us.apache.org/repos/asf/spark/blob/f82461fc/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index f78660f..0a53aac 100644
--- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -39,7 +39,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
   private val originalLocale = Locale.getDefault
   private val originalColumnBatchSize = TestHive.conf.columnBatchSize
   private val originalInMemoryPartitionPruning = TestHive.conf.inMemoryPartitionPruning
-  private val originalConvertMetastoreOrc = TestHive.sessionState.convertMetastoreOrc
+  private val originalConvertMetastoreOrc = TestHive.conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)
   private val originalCrossJoinEnabled = TestHive.conf.crossJoinEnabled
   private val originalSessionLocalTimeZone = TestHive.conf.sessionLocalTimeZone
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f82461fc/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 5393c57..02a5117 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -48,10 +48,6 @@ class HiveContext private[hive](_sparkSession: SparkSession)
     new HiveContext(sparkSession.newSession())
   }
 
-  protected[sql] override def sessionState: HiveSessionState = {
-    sparkSession.sessionState.asInstanceOf[HiveSessionState]
-  }
-
   /**
    * Invalidate and refresh all the cached the metadata of the given table. For performance reasons,
    * Spark SQL or the external data source library it uses might cache certain metadata about a

http://git-wip-us.apache.org/repos/asf/spark/blob/f82461fc/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 2e060ab..305bd00 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -44,7 +44,7 @@ import org.apache.spark.sql.types._
  */
 private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Logging {
   // these are def_s and not val/lazy val since the latter would introduce circular references
-  private def sessionState = sparkSession.sessionState.asInstanceOf[HiveSessionState]
+  private def sessionState = sparkSession.sessionState
   private def tableRelationCache = sparkSession.sessionState.catalog.tableRelationCache
   import HiveMetastoreCatalog._
 
@@ -281,12 +281,13 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
   object ParquetConversions extends Rule[LogicalPlan] {
     private def shouldConvertMetastoreParquet(relation: CatalogRelation): Boolean = {
       relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("parquet") &&
-        sessionState.convertMetastoreParquet
+        sessionState.conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET)
     }
 
     private def convertToParquetRelation(relation: CatalogRelation): LogicalRelation = {
       val fileFormatClass = classOf[ParquetFileFormat]
-      val mergeSchema = sessionState.convertMetastoreParquetWithSchemaMerging
+      val mergeSchema = sessionState.conf.getConf(
+        HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING)
       val options = Map(ParquetOptions.MERGE_SCHEMA -> mergeSchema.toString)
 
       convertToLogicalRelation(relation, options, fileFormatClass, "parquet")
@@ -316,7 +317,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
   object OrcConversions extends Rule[LogicalPlan] {
     private def shouldConvertMetastoreOrc(relation: CatalogRelation): Boolean = {
       relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("orc") &&
-        sessionState.convertMetastoreOrc
+        sessionState.conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)
     }
 
     private def convertToOrcRelation(relation: CatalogRelation): LogicalRelation = {

http://git-wip-us.apache.org/repos/asf/spark/blob/f82461fc/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 49ff847..f49e6bb 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -17,121 +17,24 @@
 
 package org.apache.spark.sql.hive
 
-import org.apache.spark.SparkContext
 import org.apache.spark.annotation.{Experimental, InterfaceStability}
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
-import org.apache.spark.sql.catalyst.optimizer.Optimizer
-import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.analysis.Analyzer
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.{QueryExecution, SparkPlanner}
+import org.apache.spark.sql.execution.SparkPlanner
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.hive.client.HiveClient
-import org.apache.spark.sql.internal.{BaseSessionStateBuilder, SessionFunctionResourceLoader, SessionState, SharedState, SQLConf}
-import org.apache.spark.sql.streaming.StreamingQueryManager
-
+import org.apache.spark.sql.internal.{BaseSessionStateBuilder, SessionResourceLoader, SessionState}
 
 /**
- * A class that holds all session-specific state in a given [[SparkSession]] backed by Hive.
- *
- * @param sparkContext The [[SparkContext]].
- * @param sharedState The shared state.
- * @param conf SQL-specific key-value configurations.
- * @param experimentalMethods The experimental methods.
- * @param functionRegistry Internal catalog for managing functions registered by the user.
- * @param catalog Internal catalog for managing table and database states that uses Hive client for
- *                interacting with the metastore.
- * @param sqlParser Parser that extracts expressions, plans, table identifiers etc. from SQL texts.
- * @param analyzer Logical query plan analyzer for resolving unresolved attributes and relations.
- * @param optimizer Logical query plan optimizer.
- * @param planner Planner that converts optimized logical plans to physical plans and that takes
- *                Hive-specific strategies into account.
- * @param streamingQueryManager Interface to start and stop streaming queries.
- * @param createQueryExecution Function used to create QueryExecution objects.
- * @param createClone Function used to create clones of the session state.
- * @param metadataHive The Hive metadata client.
+ * Entry object for creating a Hive aware [[SessionState]].
  */
-private[hive] class HiveSessionState(
-    sparkContext: SparkContext,
-    sharedState: SharedState,
-    conf: SQLConf,
-    experimentalMethods: ExperimentalMethods,
-    functionRegistry: FunctionRegistry,
-    override val catalog: HiveSessionCatalog,
-    sqlParser: ParserInterface,
-    analyzer: Analyzer,
-    optimizer: Optimizer,
-    planner: SparkPlanner,
-    streamingQueryManager: StreamingQueryManager,
-    createQueryExecution: LogicalPlan => QueryExecution,
-    createClone: (SparkSession, SessionState) => SessionState,
-    val metadataHive: HiveClient)
-  extends SessionState(
-      sparkContext,
-      sharedState,
-      conf,
-      experimentalMethods,
-      functionRegistry,
-      catalog,
-      sqlParser,
-      analyzer,
-      optimizer,
-      planner,
-      streamingQueryManager,
-      createQueryExecution,
-      createClone) {
-
-  // ------------------------------------------------------
-  //  Helper methods, partially leftover from pre-2.0 days
-  // ------------------------------------------------------
-
-  override def addJar(path: String): Unit = {
-    metadataHive.addJar(path)
-    super.addJar(path)
-  }
-
-  /**
-   * When true, enables an experimental feature where metastore tables that use the parquet SerDe
-   * are automatically converted to use the Spark SQL parquet table scan, instead of the Hive
-   * SerDe.
-   */
-  def convertMetastoreParquet: Boolean = {
-    conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET)
-  }
-
-  /**
-   * When true, also tries to merge possibly different but compatible Parquet schemas in different
-   * Parquet data files.
-   *
-   * This configuration is only effective when "spark.sql.hive.convertMetastoreParquet" is true.
-   */
-  def convertMetastoreParquetWithSchemaMerging: Boolean = {
-    conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING)
-  }
-
-  /**
-   * When true, enables an experimental feature where metastore tables that use the Orc SerDe
-   * are automatically converted to use the Spark SQL ORC table scan, instead of the Hive
-   * SerDe.
-   */
-  def convertMetastoreOrc: Boolean = {
-    conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)
-  }
-
-  /**
-   * When true, Hive Thrift server will execute SQL queries asynchronously using a thread pool."
-   */
-  def hiveThriftServerAsync: Boolean = {
-    conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
-  }
-}
-
 private[hive] object HiveSessionState {
   /**
-   * Create a new [[HiveSessionState]] for the given session.
+   * Create a new Hive aware [[SessionState]]. for the given session.
    */
-  def apply(session: SparkSession): HiveSessionState = {
+  def apply(session: SparkSession): SessionState = {
     new HiveSessionStateBuilder(session).build()
   }
 }
@@ -148,6 +51,14 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session
     session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
 
   /**
+   * Create a Hive aware resource loader.
+   */
+  override protected lazy val resourceLoader: HiveSessionResourceLoader = {
+    val client: HiveClient = externalCatalog.client.newSession()
+    new HiveSessionResourceLoader(session, client)
+  }
+
+  /**
    * Create a [[HiveSessionCatalog]].
    */
   override protected lazy val catalog: HiveSessionCatalog = {
@@ -159,7 +70,7 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session
       conf,
       SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
       sqlParser,
-      new SessionFunctionResourceLoader(session))
+      resourceLoader)
     parentState.foreach(_.catalog.copyStateTo(catalog))
     catalog
   }
@@ -217,23 +128,14 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session
   }
 
   override protected def newBuilder: NewBuilder = new HiveSessionStateBuilder(_, _)
+}
 
-  override def build(): HiveSessionState = {
-    val metadataHive: HiveClient = externalCatalog.client.newSession()
-    new HiveSessionState(
-      session.sparkContext,
-      session.sharedState,
-      conf,
-      experimentalMethods,
-      functionRegistry,
-      catalog,
-      sqlParser,
-      analyzer,
-      optimizer,
-      planner,
-      streamingQueryManager,
-      createQueryExecution,
-      createClone,
-      metadataHive)
+class HiveSessionResourceLoader(
+    session: SparkSession,
+    client: HiveClient)
+  extends SessionResourceLoader(session) {
+  override def addJar(path: String): Unit = {
+    client.addJar(path)
+    super.addJar(path)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f82461fc/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 32ca696..0bcf219 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -34,7 +34,6 @@ import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
-import org.apache.spark.sql.catalyst.catalog.ExternalCatalog
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.command.CacheTableCommand
@@ -81,7 +80,7 @@ private[hive] class TestHiveSharedState(
     hiveClient: Option[HiveClient] = None)
   extends SharedState(sc) {
 
-  override lazy val externalCatalog: ExternalCatalog = {
+  override lazy val externalCatalog: TestHiveExternalCatalog = {
     new TestHiveExternalCatalog(
       sc.conf,
       sc.hadoopConfiguration,
@@ -123,8 +122,6 @@ class TestHiveContext(
     new TestHiveContext(sparkSession.newSession())
   }
 
-  override def sessionState: HiveSessionState = sparkSession.sessionState
-
   def setCacheTables(c: Boolean): Unit = {
     sparkSession.setCacheTables(c)
   }
@@ -155,7 +152,7 @@ class TestHiveContext(
 private[hive] class TestHiveSparkSession(
     @transient private val sc: SparkContext,
     @transient private val existingSharedState: Option[TestHiveSharedState],
-    @transient private val parentSessionState: Option[HiveSessionState],
+    @transient private val parentSessionState: Option[SessionState],
     private val loadTestTables: Boolean)
   extends SparkSession(sc) with Logging { self =>
 
@@ -195,10 +192,12 @@ private[hive] class TestHiveSparkSession(
   }
 
   @transient
-  override lazy val sessionState: HiveSessionState = {
+  override lazy val sessionState: SessionState = {
     new TestHiveSessionStateBuilder(this, parentSessionState).build()
   }
 
+  lazy val metadataHive: HiveClient = sharedState.externalCatalog.client.newSession()
+
   override def newSession(): TestHiveSparkSession = {
     new TestHiveSparkSession(sc, Some(sharedState), None, loadTestTables)
   }
@@ -492,7 +491,7 @@ private[hive] class TestHiveSparkSession(
       sessionState.catalog.clearTempTables()
       sessionState.catalog.tableRelationCache.invalidateAll()
 
-      sessionState.metadataHive.reset()
+      metadataHive.reset()
 
       FunctionRegistry.getFunctionNames.asScala.filterNot(originalUDFs.contains(_)).
         foreach { udfName => FunctionRegistry.unregisterTemporaryUDF(udfName) }
@@ -509,14 +508,14 @@ private[hive] class TestHiveSparkSession(
       sessionState.conf.setConfString("fs.defaultFS", new File(".").toURI.toString)
       // It is important that we RESET first as broken hooks that might have been set could break
       // other sql exec here.
-      sessionState.metadataHive.runSqlHive("RESET")
+      metadataHive.runSqlHive("RESET")
       // For some reason, RESET does not reset the following variables...
       // https://issues.apache.org/jira/browse/HIVE-9004
-      sessionState.metadataHive.runSqlHive("set hive.table.parameters.default=")
-      sessionState.metadataHive.runSqlHive("set datanucleus.cache.collections=true")
-      sessionState.metadataHive.runSqlHive("set datanucleus.cache.collections.lazy=true")
+      metadataHive.runSqlHive("set hive.table.parameters.default=")
+      metadataHive.runSqlHive("set datanucleus.cache.collections=true")
+      metadataHive.runSqlHive("set datanucleus.cache.collections.lazy=true")
       // Lots of tests fail if we do not change the partition whitelist from the default.
-      sessionState.metadataHive.runSqlHive("set hive.metastore.partition.name.whitelist.pattern=.*")
+      metadataHive.runSqlHive("set hive.metastore.partition.name.whitelist.pattern=.*")
 
       sessionState.catalog.setCurrentDatabase("default")
     } catch {

http://git-wip-us.apache.org/repos/asf/spark/blob/f82461fc/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
index 079358b..d8fd68b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
@@ -115,7 +115,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
         assert(columns.map(_.dataType) === Seq(DecimalType(10, 3), StringType))
 
         checkAnswer(table("t"), testDF)
-        assert(sessionState.metadataHive.runSqlHive("SELECT * FROM t") === Seq("1.1\t1", "2.1\t2"))
+        assert(sparkSession.metadataHive.runSqlHive("SELECT * FROM t") === Seq("1.1\t1", "2.1\t2"))
       }
     }
 
@@ -147,7 +147,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
           assert(columns.map(_.dataType) === Seq(DecimalType(10, 3), StringType))
 
           checkAnswer(table("t"), testDF)
-          assert(sessionState.metadataHive.runSqlHive("SELECT * FROM t") ===
+          assert(sparkSession.metadataHive.runSqlHive("SELECT * FROM t") ===
             Seq("1.1\t1", "2.1\t2"))
         }
       }
@@ -176,7 +176,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
           assert(columns.map(_.dataType) === Seq(IntegerType, StringType))
 
           checkAnswer(table("t"), Row(1, "val_1"))
-          assert(sessionState.metadataHive.runSqlHive("SELECT * FROM t") === Seq("1\tval_1"))
+          assert(sparkSession.metadataHive.runSqlHive("SELECT * FROM t") === Seq("1\tval_1"))
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/f82461fc/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index f02b721..55e02ac 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -379,8 +379,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
            |)
          """.stripMargin)
 
-      val expectedPath =
-        sessionState.catalog.hiveDefaultTableFilePath(TableIdentifier("ctasJsonTable"))
+      val expectedPath = sessionState.catalog.defaultTablePath(TableIdentifier("ctasJsonTable"))
       val filesystemPath = new Path(expectedPath)
       val fs = filesystemPath.getFileSystem(spark.sessionState.newHadoopConf())
       fs.delete(filesystemPath, true)
@@ -486,7 +485,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
           sql("DROP TABLE savedJsonTable")
           intercept[AnalysisException] {
             read.json(
-              sessionState.catalog.hiveDefaultTableFilePath(TableIdentifier("savedJsonTable")))
+              sessionState.catalog.defaultTablePath(TableIdentifier("savedJsonTable")).toString)
           }
         }
 
@@ -756,7 +755,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
           serde = None,
           compressed = false,
           properties = Map(
-            "path" -> sessionState.catalog.hiveDefaultTableFilePath(TableIdentifier(tableName)))
+            "path" -> sessionState.catalog.defaultTablePath(TableIdentifier(tableName)).toString)
         ),
         properties = Map(
           DATASOURCE_PROVIDER -> "json",

http://git-wip-us.apache.org/repos/asf/spark/blob/f82461fc/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 04bc79d..f0a995c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -128,11 +128,11 @@ class HiveDDLSuite
       dbPath: Option[String] = None): Boolean = {
     val expectedTablePath =
       if (dbPath.isEmpty) {
-        hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdentifier)
+        hiveContext.sessionState.catalog.defaultTablePath(tableIdentifier)
       } else {
-        new Path(new Path(dbPath.get), tableIdentifier.table).toString
+        new Path(new Path(dbPath.get), tableIdentifier.table)
       }
-    val filesystemPath = new Path(expectedTablePath)
+    val filesystemPath = new Path(expectedTablePath.toString)
     val fs = filesystemPath.getFileSystem(spark.sessionState.newHadoopConf())
     fs.exists(filesystemPath)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/f82461fc/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 81af249..9fc2923 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -22,6 +22,7 @@ import java.io.File
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogRelation
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.DataSourceScanExec
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.hive.execution.HiveTableScanExec
@@ -448,10 +449,14 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
     }
   }
 
+  private def getCachedDataSourceTable(id: TableIdentifier): LogicalPlan = {
+    sessionState.catalog.asInstanceOf[HiveSessionCatalog].getCachedDataSourceTable(id)
+  }
+
   test("Caching converted data source Parquet Relations") {
     def checkCached(tableIdentifier: TableIdentifier): Unit = {
       // Converted test_parquet should be cached.
-      sessionState.catalog.getCachedDataSourceTable(tableIdentifier) match {
+      getCachedDataSourceTable(tableIdentifier) match {
         case null => fail("Converted test_parquet should be cached in the cache.")
         case LogicalRelation(_: HadoopFsRelation, _, _) => // OK
         case other =>
@@ -479,14 +484,14 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
     var tableIdentifier = TableIdentifier("test_insert_parquet", Some("default"))
 
     // First, make sure the converted test_parquet is not cached.
-    assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null)
+    assert(getCachedDataSourceTable(tableIdentifier) === null)
     // Table lookup will make the table cached.
     table("test_insert_parquet")
     checkCached(tableIdentifier)
     // For insert into non-partitioned table, we will do the conversion,
     // so the converted test_insert_parquet should be cached.
     sessionState.refreshTable("test_insert_parquet")
-    assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null)
+    assert(getCachedDataSourceTable(tableIdentifier) === null)
     sql(
       """
         |INSERT INTO TABLE test_insert_parquet
@@ -499,7 +504,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
       sql("select a, b from jt").collect())
     // Invalidate the cache.
     sessionState.refreshTable("test_insert_parquet")
-    assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null)
+    assert(getCachedDataSourceTable(tableIdentifier) === null)
 
     // Create a partitioned table.
     sql(
@@ -517,7 +522,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
       """.stripMargin)
 
     tableIdentifier = TableIdentifier("test_parquet_partitioned_cache_test", Some("default"))
-    assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null)
+    assert(getCachedDataSourceTable(tableIdentifier) === null)
     sql(
       """
         |INSERT INTO TABLE test_parquet_partitioned_cache_test
@@ -526,14 +531,14 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
       """.stripMargin)
     // Right now, insert into a partitioned Parquet is not supported in data source Parquet.
     // So, we expect it is not cached.
-    assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null)
+    assert(getCachedDataSourceTable(tableIdentifier) === null)
     sql(
       """
         |INSERT INTO TABLE test_parquet_partitioned_cache_test
         |PARTITION (`date`='2015-04-02')
         |select a, b from jt
       """.stripMargin)
-    assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null)
+    assert(getCachedDataSourceTable(tableIdentifier) === null)
 
     // Make sure we can cache the partitioned table.
     table("test_parquet_partitioned_cache_test")
@@ -549,7 +554,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
         """.stripMargin).collect())
 
     sessionState.refreshTable("test_parquet_partitioned_cache_test")
-    assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null)
+    assert(getCachedDataSourceTable(tableIdentifier) === null)
 
     dropTables("test_insert_parquet", "test_parquet_partitioned_cache_test")
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org