You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by da...@apache.org on 2015/10/09 02:34:29 UTC

spark git commit: [SPARK-10810] [SPARK-10902] [SQL] Improve session management in SQL

Repository: spark
Updated Branches:
  refs/heads/master 84ea28717 -> 3390b400d


[SPARK-10810] [SPARK-10902] [SQL] Improve session management in SQL

This PR improve the sessions management by replacing the thread-local based to one SQLContext per session approach, introduce separated temporary tables and UDFs/UDAFs for each session.

A new session of SQLContext could be created by:

1) create an new SQLContext
2) call newSession() on existing SQLContext

For HiveContext, in order to reduce the cost for each session, the classloader and Hive client are shared across multiple sessions (created by newSession).

CacheManager is also shared by multiple sessions, so cache a table multiple times in different sessions will not cause multiple copies of in-memory cache.

Added jars are still shared by all the sessions, because SparkContext does not support sessions.

cc marmbrus yhuai rxin

Author: Davies Liu <da...@databricks.com>

Closes #8909 from davies/sessions.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3390b400
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3390b400
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3390b400

Branch: refs/heads/master
Commit: 3390b400d04e40f767d8a51f1078fcccb4e64abd
Parents: 84ea287
Author: Davies Liu <da...@databricks.com>
Authored: Thu Oct 8 17:34:24 2015 -0700
Committer: Davies Liu <da...@gmail.com>
Committed: Thu Oct 8 17:34:24 2015 -0700

----------------------------------------------------------------------
 project/MimaExcludes.scala                      |  22 ++-
 .../catalyst/analysis/FunctionRegistry.scala    |  28 +++-
 .../scala/org/apache/spark/sql/SQLContext.scala | 164 +++++++++++--------
 .../spark/sql/execution/CacheManager.scala      |  14 +-
 .../org/apache/spark/sql/SQLContextSuite.scala  |  59 ++++---
 .../apache/spark/sql/test/TestSQLContext.scala  |  21 +--
 .../SparkExecuteStatementOperation.scala        |  76 ++-------
 .../thriftserver/SparkSQLSessionManager.scala   |   9 +-
 .../server/SparkSQLOperationManager.scala       |   5 +-
 .../spark/sql/hive/thriftserver/CliSuite.scala  |   8 +-
 .../thriftserver/HiveThriftServer2Suites.scala  |  76 +++++----
 .../org/apache/spark/sql/hive/HiveContext.scala | 155 +++++++++++-------
 .../org/apache/spark/sql/hive/HiveQl.scala      |  28 ++--
 .../spark/sql/hive/client/ClientInterface.scala |   9 +
 .../spark/sql/hive/client/ClientWrapper.scala   |  85 +++++-----
 .../sql/hive/client/IsolatedClientLoader.scala  | 107 +++++++-----
 .../spark/sql/hive/execution/commands.scala     |  27 +--
 .../apache/spark/sql/hive/test/TestHive.scala   |  27 +--
 .../org/apache/spark/sql/hive/HiveQlSuite.scala |  13 +-
 .../spark/sql/hive/client/VersionsSuite.scala   |   6 +-
 .../sql/hive/execution/HiveQuerySuite.scala     |  32 ++++
 .../sql/hive/execution/SQLQuerySuite.scala      |   9 +-
 22 files changed, 540 insertions(+), 440 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3390b400/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 2d4d146..08e4a44 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -79,7 +79,27 @@ object MimaExcludes {
         ProblemFilters.exclude[MissingMethodProblem](
           "org.apache.spark.ml.regression.LeastSquaresAggregator.add"),
         ProblemFilters.exclude[MissingMethodProblem](
-          "org.apache.spark.ml.regression.LeastSquaresCostFun.this")
+          "org.apache.spark.ml.regression.LeastSquaresCostFun.this"),
+        ProblemFilters.exclude[MissingMethodProblem](
+          "org.apache.spark.sql.SQLContext.clearLastInstantiatedContext"),
+        ProblemFilters.exclude[MissingMethodProblem](
+          "org.apache.spark.sql.SQLContext.setLastInstantiatedContext"),
+        ProblemFilters.exclude[MissingClassProblem](
+          "org.apache.spark.sql.SQLContext$SQLSession"),
+        ProblemFilters.exclude[MissingMethodProblem](
+          "org.apache.spark.sql.SQLContext.detachSession"),
+        ProblemFilters.exclude[MissingMethodProblem](
+          "org.apache.spark.sql.SQLContext.tlSession"),
+        ProblemFilters.exclude[MissingMethodProblem](
+          "org.apache.spark.sql.SQLContext.defaultSession"),
+        ProblemFilters.exclude[MissingMethodProblem](
+          "org.apache.spark.sql.SQLContext.currentSession"),
+        ProblemFilters.exclude[MissingMethodProblem](
+          "org.apache.spark.sql.SQLContext.openSession"),
+        ProblemFilters.exclude[MissingMethodProblem](
+          "org.apache.spark.sql.SQLContext.setSession"),
+        ProblemFilters.exclude[MissingMethodProblem](
+          "org.apache.spark.sql.SQLContext.createSession")
       )
     case v if v.startsWith("1.5") =>
       Seq(

http://git-wip-us.apache.org/repos/asf/spark/blob/3390b400/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index e6122d9..ba77b70 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -51,23 +51,37 @@ class SimpleFunctionRegistry extends FunctionRegistry {
   private val functionBuilders =
     StringKeyHashMap[(ExpressionInfo, FunctionBuilder)](caseSensitive = false)
 
-  override def registerFunction(name: String, info: ExpressionInfo, builder: FunctionBuilder)
-  : Unit = {
+  override def registerFunction(
+      name: String,
+      info: ExpressionInfo,
+      builder: FunctionBuilder): Unit = synchronized {
     functionBuilders.put(name, (info, builder))
   }
 
   override def lookupFunction(name: String, children: Seq[Expression]): Expression = {
-    val func = functionBuilders.get(name).map(_._2).getOrElse {
-      throw new AnalysisException(s"undefined function $name")
+    val func = synchronized {
+      functionBuilders.get(name).map(_._2).getOrElse {
+        throw new AnalysisException(s"undefined function $name")
+      }
     }
     func(children)
   }
 
-  override def listFunction(): Seq[String] = functionBuilders.iterator.map(_._1).toList.sorted
+  override def listFunction(): Seq[String] = synchronized {
+    functionBuilders.iterator.map(_._1).toList.sorted
+  }
 
-  override def lookupFunction(name: String): Option[ExpressionInfo] = {
+  override def lookupFunction(name: String): Option[ExpressionInfo] = synchronized {
     functionBuilders.get(name).map(_._1)
   }
+
+  def copy(): SimpleFunctionRegistry = synchronized {
+    val registry = new SimpleFunctionRegistry
+    functionBuilders.iterator.foreach { case (name, (info, builder)) =>
+      registry.registerFunction(name, info, builder)
+    }
+    registry
+  }
 }
 
 /**
@@ -257,7 +271,7 @@ object FunctionRegistry {
     expression[InputFileName]("input_file_name")
   )
 
-  val builtin: FunctionRegistry = {
+  val builtin: SimpleFunctionRegistry = {
     val fr = new SimpleFunctionRegistry
     expressions.foreach { case (name, (info, builder)) => fr.registerFunction(name, info, builder) }
     fr

http://git-wip-us.apache.org/repos/asf/spark/blob/3390b400/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index cb0a3e3..2bdfd82 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -30,6 +30,7 @@ import org.apache.spark.SparkContext
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
 import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql.SQLConf.SQLConfEntry
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.errors.DialectException
@@ -38,15 +39,12 @@ import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer}
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
 import org.apache.spark.sql.catalyst.{InternalRow, ParserDialect, _}
-import org.apache.spark.sql.execution.{Filter, _}
-import org.apache.spark.sql.{execution => sparkexecution}
-import org.apache.spark.sql.execution._
-import org.apache.spark.sql.sources._
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab}
 import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.{execution => sparkexecution}
 import org.apache.spark.util.Utils
 
 /**
@@ -64,18 +62,30 @@ import org.apache.spark.util.Utils
  *
  * @since 1.0.0
  */
-class SQLContext(@transient val sparkContext: SparkContext)
-  extends org.apache.spark.Logging
-  with Serializable {
+class SQLContext private[sql](
+    @transient val sparkContext: SparkContext,
+    @transient protected[sql] val cacheManager: CacheManager)
+  extends org.apache.spark.Logging with Serializable {
 
   self =>
 
+  def this(sparkContext: SparkContext) = this(sparkContext, new CacheManager)
   def this(sparkContext: JavaSparkContext) = this(sparkContext.sc)
 
   /**
+   * Returns a SQLContext as new session, with separated SQL configurations, temporary tables,
+   * registered functions, but sharing the same SparkContext and CacheManager.
+   *
+   * @since 1.6.0
+   */
+  def newSession(): SQLContext = {
+    new SQLContext(sparkContext, cacheManager)
+  }
+
+  /**
    * @return Spark SQL configuration
    */
-  protected[sql] def conf = currentSession().conf
+  protected[sql] lazy val conf = new SQLConf
 
   // `listener` should be only used in the driver
   @transient private[sql] val listener = new SQLListener(this)
@@ -142,13 +152,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
    */
   def getAllConfs: immutable.Map[String, String] = conf.getAllConfs
 
-  // TODO how to handle the temp table per user session?
   @transient
   protected[sql] lazy val catalog: Catalog = new SimpleCatalog(conf)
 
-  // TODO how to handle the temp function per user session?
   @transient
-  protected[sql] lazy val functionRegistry: FunctionRegistry = FunctionRegistry.builtin
+  protected[sql] lazy val functionRegistry: FunctionRegistry = FunctionRegistry.builtin.copy()
 
   @transient
   protected[sql] lazy val analyzer: Analyzer =
@@ -198,20 +206,19 @@ class SQLContext(@transient val sparkContext: SparkContext)
   protected[sql] def executePlan(plan: LogicalPlan) =
     new sparkexecution.QueryExecution(this, plan)
 
-  @transient
-  protected[sql] val tlSession = new ThreadLocal[SQLSession]() {
-    override def initialValue: SQLSession = defaultSession
-  }
-
-  @transient
-  protected[sql] val defaultSession = createSession()
-
   protected[sql] def dialectClassName = if (conf.dialect == "sql") {
     classOf[DefaultParserDialect].getCanonicalName
   } else {
     conf.dialect
   }
 
+  /**
+   * Add a jar to SQLContext
+   */
+  protected[sql] def addJar(path: String): Unit = {
+    sparkContext.addJar(path)
+  }
+
   {
     // We extract spark sql settings from SparkContext's conf and put them to
     // Spark SQL's conf.
@@ -236,9 +243,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
     }
   }
 
-  @transient
-  protected[sql] val cacheManager = new CacheManager(this)
-
   /**
    * :: Experimental ::
    * A collection of methods that are considered experimental, but can be used to hook into
@@ -300,21 +304,25 @@ class SQLContext(@transient val sparkContext: SparkContext)
    * @group cachemgmt
    * @since 1.3.0
    */
-  def isCached(tableName: String): Boolean = cacheManager.isCached(tableName)
+  def isCached(tableName: String): Boolean = {
+    cacheManager.lookupCachedData(table(tableName)).nonEmpty
+  }
 
   /**
    * Caches the specified table in-memory.
    * @group cachemgmt
    * @since 1.3.0
    */
-  def cacheTable(tableName: String): Unit = cacheManager.cacheTable(tableName)
+  def cacheTable(tableName: String): Unit = {
+    cacheManager.cacheQuery(table(tableName), Some(tableName))
+  }
 
   /**
    * Removes the specified table from the in-memory cache.
    * @group cachemgmt
    * @since 1.3.0
    */
-  def uncacheTable(tableName: String): Unit = cacheManager.uncacheTable(tableName)
+  def uncacheTable(tableName: String): Unit = cacheManager.uncacheQuery(table(tableName))
 
   /**
    * Removes all cached tables from the in-memory cache.
@@ -830,36 +838,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
     )
   }
 
-  protected[sql] def openSession(): SQLSession = {
-    detachSession()
-    val session = createSession()
-    tlSession.set(session)
-
-    session
-  }
-
-  protected[sql] def currentSession(): SQLSession = {
-    tlSession.get()
-  }
-
-  protected[sql] def createSession(): SQLSession = {
-    new this.SQLSession()
-  }
-
-  protected[sql] def detachSession(): Unit = {
-    tlSession.remove()
-  }
-
-  protected[sql] def setSession(session: SQLSession): Unit = {
-    detachSession()
-    tlSession.set(session)
-  }
-
-  protected[sql] class SQLSession {
-    // Note that this is a lazy val so we can override the default value in subclasses.
-    protected[sql] lazy val conf: SQLConf = new SQLConf
-  }
-
   @deprecated("use org.apache.spark.sql.QueryExecution", "1.6.0")
   protected[sql] class QueryExecution(logical: LogicalPlan)
     extends sparkexecution.QueryExecution(this, logical)
@@ -1196,46 +1174,90 @@ class SQLContext(@transient val sparkContext: SparkContext)
   // Register a succesfully instantiatd context to the singleton. This should be at the end of
   // the class definition so that the singleton is updated only if there is no exception in the
   // construction of the instance.
-  SQLContext.setLastInstantiatedContext(self)
+  sparkContext.addSparkListener(new SparkListener {
+    override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
+      SQLContext.clearInstantiatedContext(self)
+    }
+  })
+
+  SQLContext.setInstantiatedContext(self)
 }
 
 /**
  * This SQLContext object contains utility functions to create a singleton SQLContext instance,
- * or to get the last created SQLContext instance.
+ * or to get the created SQLContext instance.
+ *
+ * It also provides utility functions to support preference for threads in multiple sessions
+ * scenario, setActive could set a SQLContext for current thread, which will be returned by
+ * getOrCreate instead of the global one.
  */
 object SQLContext {
 
-  private val INSTANTIATION_LOCK = new Object()
+  /**
+   * The active SQLContext for the current thread.
+   */
+  private val activeContext: InheritableThreadLocal[SQLContext] =
+    new InheritableThreadLocal[SQLContext]
 
   /**
-   * Reference to the last created SQLContext.
+   * Reference to the created SQLContext.
    */
-  @transient private val lastInstantiatedContext = new AtomicReference[SQLContext]()
+  @transient private val instantiatedContext = new AtomicReference[SQLContext]()
 
   /**
    * Get the singleton SQLContext if it exists or create a new one using the given SparkContext.
+   *
    * This function can be used to create a singleton SQLContext object that can be shared across
    * the JVM.
+   *
+   * If there is an active SQLContext for current thread, it will be returned instead of the global
+   * one.
+   *
+   * @since 1.5.0
    */
   def getOrCreate(sparkContext: SparkContext): SQLContext = {
-    INSTANTIATION_LOCK.synchronized {
-      if (lastInstantiatedContext.get() == null) {
+    val ctx = activeContext.get()
+    if (ctx != null) {
+      return ctx
+    }
+
+    synchronized {
+      val ctx = instantiatedContext.get()
+      if (ctx == null) {
         new SQLContext(sparkContext)
+      } else {
+        ctx
       }
     }
-    lastInstantiatedContext.get()
   }
 
-  private[sql] def clearLastInstantiatedContext(): Unit = {
-    INSTANTIATION_LOCK.synchronized {
-      lastInstantiatedContext.set(null)
-    }
+  private[sql] def clearInstantiatedContext(sqlContext: SQLContext): Unit = {
+    instantiatedContext.compareAndSet(sqlContext, null)
   }
 
-  private[sql] def setLastInstantiatedContext(sqlContext: SQLContext): Unit = {
-    INSTANTIATION_LOCK.synchronized {
-      lastInstantiatedContext.set(sqlContext)
-    }
+  private[sql] def setInstantiatedContext(sqlContext: SQLContext): Unit = {
+    instantiatedContext.compareAndSet(null, sqlContext)
+  }
+
+  /**
+   * Changes the SQLContext that will be returned in this thread and its children when
+   * SQLContext.getOrCreate() is called. This can be used to ensure that a given thread receives
+   * a SQLContext with an isolated session, instead of the global (first created) context.
+   *
+   * @since 1.6.0
+   */
+  def setActive(sqlContext: SQLContext): Unit = {
+    activeContext.set(sqlContext)
+  }
+
+  /**
+   * Clears the active SQLContext for current thread. Subsequent calls to getOrCreate will
+   * return the first created context instead of a thread-local override.
+   *
+   * @since 1.6.0
+   */
+  def clearActive(): Unit = {
+    activeContext.remove()
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/3390b400/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index d3e5c37..f85aeb1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -20,9 +20,9 @@ package org.apache.spark.sql.execution
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
 import org.apache.spark.Logging
+import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.columnar.InMemoryRelation
-import org.apache.spark.sql.{DataFrame, SQLContext}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK
 
@@ -37,7 +37,7 @@ private[sql] case class CachedData(plan: LogicalPlan, cachedRepresentation: InMe
  *
  * Internal to Spark SQL.
  */
-private[sql] class CacheManager(sqlContext: SQLContext) extends Logging {
+private[sql] class CacheManager extends Logging {
 
   @transient
   private val cachedData = new scala.collection.mutable.ArrayBuffer[CachedData]
@@ -45,15 +45,6 @@ private[sql] class CacheManager(sqlContext: SQLContext) extends Logging {
   @transient
   private val cacheLock = new ReentrantReadWriteLock
 
-  /** Returns true if the table is currently cached in-memory. */
-  def isCached(tableName: String): Boolean = lookupCachedData(sqlContext.table(tableName)).nonEmpty
-
-  /** Caches the specified table in-memory. */
-  def cacheTable(tableName: String): Unit = cacheQuery(sqlContext.table(tableName), Some(tableName))
-
-  /** Removes the specified table from the in-memory cache. */
-  def uncacheTable(tableName: String): Unit = uncacheQuery(sqlContext.table(tableName))
-
   /** Acquires a read lock on the cache for the duration of `f`. */
   private def readLock[A](f: => A): A = {
     val lock = cacheLock.readLock()
@@ -96,6 +87,7 @@ private[sql] class CacheManager(sqlContext: SQLContext) extends Logging {
     if (lookupCachedData(planToCache).nonEmpty) {
       logWarning("Asked to cache already cached data.")
     } else {
+      val sqlContext = query.sqlContext
       cachedData +=
         CachedData(
           planToCache,

http://git-wip-us.apache.org/repos/asf/spark/blob/3390b400/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
index dd88ae3..1994dac 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
@@ -17,33 +17,52 @@
 
 package org.apache.spark.sql
 
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.{SharedSparkContext, SparkFunSuite}
 
-class SQLContextSuite extends SparkFunSuite with SharedSQLContext {
-
-  override def afterAll(): Unit = {
-    try {
-      SQLContext.setLastInstantiatedContext(sqlContext)
-    } finally {
-      super.afterAll()
-    }
-  }
+class SQLContextSuite extends SparkFunSuite with SharedSparkContext{
 
   test("getOrCreate instantiates SQLContext") {
-    SQLContext.clearLastInstantiatedContext()
-    val sqlContext = SQLContext.getOrCreate(sparkContext)
+    val sqlContext = SQLContext.getOrCreate(sc)
     assert(sqlContext != null, "SQLContext.getOrCreate returned null")
-    assert(SQLContext.getOrCreate(sparkContext).eq(sqlContext),
+    assert(SQLContext.getOrCreate(sc).eq(sqlContext),
       "SQLContext created by SQLContext.getOrCreate not returned by SQLContext.getOrCreate")
   }
 
-  test("getOrCreate gets last explicitly instantiated SQLContext") {
-    SQLContext.clearLastInstantiatedContext()
-    val sqlContext = new SQLContext(sparkContext)
-    assert(SQLContext.getOrCreate(sparkContext) != null,
-      "SQLContext.getOrCreate after explicitly created SQLContext returned null")
-    assert(SQLContext.getOrCreate(sparkContext).eq(sqlContext),
+  test("getOrCreate return the original SQLContext") {
+    val sqlContext = SQLContext.getOrCreate(sc)
+    val newSession = sqlContext.newSession()
+    assert(SQLContext.getOrCreate(sc).eq(sqlContext),
       "SQLContext.getOrCreate after explicitly created SQLContext did not return the context")
+    SQLContext.setActive(newSession)
+    assert(SQLContext.getOrCreate(sc).eq(newSession),
+      "SQLContext.getOrCreate after explicitly setActive() did not return the active context")
+  }
+
+  test("Sessions of SQLContext") {
+    val sqlContext = SQLContext.getOrCreate(sc)
+    val session1 = sqlContext.newSession()
+    val session2 = sqlContext.newSession()
+
+    // all have the default configurations
+    val key = SQLConf.SHUFFLE_PARTITIONS.key
+    assert(session1.getConf(key) === session2.getConf(key))
+    session1.setConf(key, "1")
+    session2.setConf(key, "2")
+    assert(session1.getConf(key) === "1")
+    assert(session2.getConf(key) === "2")
+
+    // temporary table should not be shared
+    val df = session1.range(10)
+    df.registerTempTable("test1")
+    assert(session1.tableNames().contains("test1"))
+    assert(!session2.tableNames().contains("test1"))
+
+    // UDF should not be shared
+    def myadd(a: Int, b: Int): Int = a + b
+    session1.udf.register[Int, Int, Int]("myadd", myadd)
+    session1.sql("select myadd(1, 2)").explain()
+    intercept[AnalysisException] {
+      session2.sql("select myadd(1, 2)").explain()
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3390b400/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
index 10e633f..c89a151 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
@@ -31,23 +31,16 @@ private[sql] class TestSQLContext(sc: SparkContext) extends SQLContext(sc) { sel
       new SparkConf().set("spark.sql.testkey", "true")))
   }
 
-  // Make sure we set those test specific confs correctly when we create
-  // the SQLConf as well as when we call clear.
-  protected[sql] override def createSession(): SQLSession = new this.SQLSession()
+  protected[sql] override lazy val conf: SQLConf = new SQLConf {
 
-  /** A special [[SQLSession]] that uses fewer shuffle partitions than normal. */
-  protected[sql] class SQLSession extends super.SQLSession {
-    protected[sql] override lazy val conf: SQLConf = new SQLConf {
+    clear()
 
-      clear()
+    override def clear(): Unit = {
+      super.clear()
 
-      override def clear(): Unit = {
-        super.clear()
-
-        // Make sure we start with the default test configs even after clear
-        TestSQLContext.overrideConfs.map {
-          case (key, value) => setConfString(key, value)
-        }
+      // Make sure we start with the default test configs even after clear
+      TestSQLContext.overrideConfs.map {
+        case (key, value) => setConfString(key, value)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/3390b400/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index 306f98b..719b03e 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -20,19 +20,15 @@ package org.apache.spark.sql.hive.thriftserver
 import java.security.PrivilegedExceptionAction
 import java.sql.{Date, Timestamp}
 import java.util.concurrent.RejectedExecutionException
-import java.util.{Arrays, Map => JMap, UUID}
+import java.util.{Arrays, UUID, Map => JMap}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.{ArrayBuffer, Map => SMap}
 import scala.util.control.NonFatal
 
-import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.metastore.api.FieldSchema
-import org.apache.hive.service.cli._
-import org.apache.hadoop.hive.ql.metadata.Hive
-import org.apache.hadoop.hive.ql.metadata.HiveException
-import org.apache.hadoop.hive.ql.session.SessionState
 import org.apache.hadoop.hive.shims.Utils
+import org.apache.hive.service.cli._
 import org.apache.hive.service.cli.operation.ExecuteStatementOperation
 import org.apache.hive.service.cli.session.HiveSession
 
@@ -40,7 +36,7 @@ import org.apache.spark.Logging
 import org.apache.spark.sql.execution.SetCommand
 import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.{DataFrame, Row => SparkRow, SQLConf}
+import org.apache.spark.sql.{DataFrame, SQLConf, Row => SparkRow}
 
 
 private[hive] class SparkExecuteStatementOperation(
@@ -143,30 +139,15 @@ private[hive] class SparkExecuteStatementOperation(
     if (!runInBackground) {
       runInternal()
     } else {
-      val parentSessionState = SessionState.get()
-      val hiveConf = getConfigForOperation()
       val sparkServiceUGI = Utils.getUGI()
-      val sessionHive = getCurrentHive()
-      val currentSqlSession = hiveContext.currentSession
 
       // Runnable impl to call runInternal asynchronously,
       // from a different thread
       val backgroundOperation = new Runnable() {
 
         override def run(): Unit = {
-          val doAsAction = new PrivilegedExceptionAction[Object]() {
-            override def run(): Object = {
-
-              // User information is part of the metastore client member in Hive
-              hiveContext.setSession(currentSqlSession)
-              // Always use the latest class loader provided by executionHive's state.
-              val executionHiveClassLoader =
-                hiveContext.executionHive.state.getConf.getClassLoader
-              sessionHive.getConf.setClassLoader(executionHiveClassLoader)
-              parentSessionState.getConf.setClassLoader(executionHiveClassLoader)
-
-              Hive.set(sessionHive)
-              SessionState.setCurrentSessionState(parentSessionState)
+          val doAsAction = new PrivilegedExceptionAction[Unit]() {
+            override def run(): Unit = {
               try {
                 runInternal()
               } catch {
@@ -174,7 +155,6 @@ private[hive] class SparkExecuteStatementOperation(
                   setOperationException(e)
                   log.error("Error running hive query: ", e)
               }
-              return null
             }
           }
 
@@ -191,7 +171,7 @@ private[hive] class SparkExecuteStatementOperation(
       try {
         // This submit blocks if no background threads are available to run this operation
         val backgroundHandle =
-          getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation)
+          parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation)
         setBackgroundHandle(backgroundHandle)
       } catch {
         case rejected: RejectedExecutionException =>
@@ -210,6 +190,11 @@ private[hive] class SparkExecuteStatementOperation(
     statementId = UUID.randomUUID().toString
     logInfo(s"Running query '$statement' with $statementId")
     setState(OperationState.RUNNING)
+    // Always use the latest class loader provided by executionHive's state.
+    val executionHiveClassLoader =
+      hiveContext.executionHive.state.getConf.getClassLoader
+    Thread.currentThread().setContextClassLoader(executionHiveClassLoader)
+
     HiveThriftServer2.listener.onStatementStart(
       statementId,
       parentSession.getSessionHandle.getSessionId.toString,
@@ -279,43 +264,4 @@ private[hive] class SparkExecuteStatementOperation(
       }
     }
   }
-
-  /**
-   * If there are query specific settings to overlay, then create a copy of config
-   * There are two cases we need to clone the session config that's being passed to hive driver
-   * 1. Async query -
-   *    If the client changes a config setting, that shouldn't reflect in the execution
-   *    already underway
-   * 2. confOverlay -
-   *    The query specific settings should only be applied to the query config and not session
-   * @return new configuration
-   * @throws HiveSQLException
-   */
-  private def getConfigForOperation(): HiveConf = {
-    var sqlOperationConf = getParentSession().getHiveConf()
-    if (!getConfOverlay().isEmpty() || runInBackground) {
-      // clone the partent session config for this query
-      sqlOperationConf = new HiveConf(sqlOperationConf)
-
-      // apply overlay query specific settings, if any
-      getConfOverlay().asScala.foreach { case (k, v) =>
-        try {
-          sqlOperationConf.verifyAndSet(k, v)
-        } catch {
-          case e: IllegalArgumentException =>
-            throw new HiveSQLException("Error applying statement specific settings", e)
-        }
-      }
-    }
-    return sqlOperationConf
-  }
-
-  private def getCurrentHive(): Hive = {
-    try {
-      return Hive.get()
-    } catch {
-      case e: HiveException =>
-        throw new HiveSQLException("Failed to get current Hive object", e);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3390b400/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
index 92ac0ec..33aaead 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
@@ -36,7 +36,7 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, hiveContext:
   extends SessionManager(hiveServer)
   with ReflectedCompositeService {
 
-  private lazy val sparkSqlOperationManager = new SparkSQLOperationManager(hiveContext)
+  private lazy val sparkSqlOperationManager = new SparkSQLOperationManager()
 
   override def init(hiveConf: HiveConf) {
     setSuperField(this, "hiveConf", hiveConf)
@@ -60,13 +60,15 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, hiveContext:
       sessionConf: java.util.Map[String, String],
       withImpersonation: Boolean,
       delegationToken: String): SessionHandle = {
-    hiveContext.openSession()
     val sessionHandle =
       super.openSession(protocol, username, passwd, ipAddress, sessionConf, withImpersonation,
           delegationToken)
     val session = super.getSession(sessionHandle)
     HiveThriftServer2.listener.onSessionCreated(
       session.getIpAddress, sessionHandle.getSessionId.toString, session.getUsername)
+    val ctx = hiveContext.newSession()
+    ctx.setConf("spark.sql.hive.version", HiveContext.hiveExecutionVersion)
+    sparkSqlOperationManager.sessionToContexts += sessionHandle -> ctx
     sessionHandle
   }
 
@@ -74,7 +76,6 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, hiveContext:
     HiveThriftServer2.listener.onSessionClosed(sessionHandle.getSessionId.toString)
     super.closeSession(sessionHandle)
     sparkSqlOperationManager.sessionToActivePool -= sessionHandle
-
-    hiveContext.detachSession()
+    sparkSqlOperationManager.sessionToContexts.remove(sessionHandle)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3390b400/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
index c8031ed..476651a 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
@@ -30,20 +30,21 @@ import org.apache.spark.sql.hive.thriftserver.{SparkExecuteStatementOperation, R
 /**
  * Executes queries using Spark SQL, and maintains a list of handles to active queries.
  */
-private[thriftserver] class SparkSQLOperationManager(hiveContext: HiveContext)
+private[thriftserver] class SparkSQLOperationManager()
   extends OperationManager with Logging {
 
   val handleToOperation = ReflectionUtils
     .getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation")
 
   val sessionToActivePool = Map[SessionHandle, String]()
+  val sessionToContexts = Map[SessionHandle, HiveContext]()
 
   override def newExecuteStatementOperation(
       parentSession: HiveSession,
       statement: String,
       confOverlay: JMap[String, String],
       async: Boolean): ExecuteStatementOperation = synchronized {
-
+    val hiveContext = sessionToContexts(parentSession.getSessionHandle)
     val runInBackground = async && hiveContext.hiveThriftServerAsync
     val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay,
       runInBackground)(hiveContext, sessionToActivePool)

http://git-wip-us.apache.org/repos/asf/spark/blob/3390b400/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
index e59a14e..76d1591 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
@@ -96,7 +96,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfter with Logging {
       buffer += s"${new Timestamp(new Date().getTime)} - $source> $line"
 
       // If we haven't found all expected answers and another expected answer comes up...
-      if (next < expectedAnswers.size && line.startsWith(expectedAnswers(next))) {
+      if (next < expectedAnswers.size && line.contains(expectedAnswers(next))) {
         next += 1
         // If all expected answers have been found...
         if (next == expectedAnswers.size) {
@@ -159,7 +159,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfter with Logging {
       s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE hive_test;"
         -> "OK",
       "CACHE TABLE hive_test;"
-        -> "Time taken: ",
+        -> "",
       "SELECT COUNT(*) FROM hive_test;"
         -> "5",
       "DROP TABLE hive_test;"
@@ -180,7 +180,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfter with Logging {
       "CREATE TABLE hive_test(key INT, val STRING);"
         -> "OK",
       "SHOW TABLES;"
-        -> "Time taken: "
+        -> "hive_test"
     )
 
     runCliWithin(2.minute, Seq("--database", "hive_test_db", "-e", "SHOW TABLES;"))(
@@ -210,7 +210,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfter with Logging {
       s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE sourceTable;"
         -> "OK",
       "INSERT INTO TABLE t1 SELECT key, val FROM sourceTable;"
-        -> "Time taken:",
+        -> "",
       "SELECT count(key) FROM t1;"
         -> "5",
       "DROP TABLE t1;"

http://git-wip-us.apache.org/repos/asf/spark/blob/3390b400/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index 19b2f24..ff8ca01 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -205,6 +205,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
     import org.apache.spark.sql.SQLConf
     var defaultV1: String = null
     var defaultV2: String = null
+    var data: ArrayBuffer[Int] = null
 
     withMultipleConnectionJdbcStatement(
       // create table
@@ -214,10 +215,16 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
             "DROP TABLE IF EXISTS test_map",
             "CREATE TABLE test_map(key INT, value STRING)",
             s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map",
-            "CACHE TABLE test_table AS SELECT key FROM test_map ORDER BY key DESC")
+            "CACHE TABLE test_table AS SELECT key FROM test_map ORDER BY key DESC",
+            "CREATE DATABASE db1")
 
         queries.foreach(statement.execute)
 
+        val plan = statement.executeQuery("explain select * from test_table")
+        plan.next()
+        plan.next()
+        assert(plan.getString(1).contains("InMemoryColumnarTableScan"))
+
         val rs1 = statement.executeQuery("SELECT key FROM test_table ORDER BY KEY DESC")
         val buf1 = new collection.mutable.ArrayBuffer[Int]()
         while (rs1.next()) {
@@ -233,6 +240,8 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
         rs2.close()
 
         assert(buf1 === buf2)
+
+        data = buf1
       },
 
       // first session, we get the default value of the session status
@@ -289,56 +298,51 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
         rs2.close()
       },
 
-      // accessing the cached data in another session
+      // try to access the cached data in another session
       { statement =>
 
-        val rs1 = statement.executeQuery("SELECT key FROM test_table ORDER BY KEY DESC")
-        val buf1 = new collection.mutable.ArrayBuffer[Int]()
-        while (rs1.next()) {
-          buf1 += rs1.getInt(1)
+        // Cached temporary table can't be accessed by other sessions
+        intercept[SQLException] {
+          statement.executeQuery("SELECT key FROM test_table ORDER BY KEY DESC")
         }
-        rs1.close()
 
-        val rs2 = statement.executeQuery("SELECT key FROM test_map ORDER BY KEY DESC")
-        val buf2 = new collection.mutable.ArrayBuffer[Int]()
-        while (rs2.next()) {
-          buf2 += rs2.getInt(1)
+        val plan = statement.executeQuery("explain select key from test_map ORDER BY key DESC")
+        plan.next()
+        plan.next()
+        assert(plan.getString(1).contains("InMemoryColumnarTableScan"))
+
+        val rs = statement.executeQuery("SELECT key FROM test_map ORDER BY KEY DESC")
+        val buf = new collection.mutable.ArrayBuffer[Int]()
+        while (rs.next()) {
+          buf += rs.getInt(1)
         }
-        rs2.close()
+        rs.close()
+        assert(buf === data)
+      },
 
-        assert(buf1 === buf2)
-        statement.executeQuery("UNCACHE TABLE test_table")
+      // switch another database
+      { statement =>
+        statement.execute("USE db1")
 
-        // TODO need to figure out how to determine if the data loaded from cache
-        val rs3 = statement.executeQuery("SELECT key FROM test_map ORDER BY KEY DESC")
-        val buf3 = new collection.mutable.ArrayBuffer[Int]()
-        while (rs3.next()) {
-          buf3 += rs3.getInt(1)
+        // there is no test_map table in db1
+        intercept[SQLException] {
+          statement.executeQuery("SELECT key FROM test_map ORDER BY KEY DESC")
         }
-        rs3.close()
 
-        assert(buf1 === buf3)
+        statement.execute("CREATE TABLE test_map2(key INT, value STRING)")
       },
 
-      // accessing the uncached table
+      // access default database
       { statement =>
 
-        // TODO need to figure out how to determine if the data loaded from cache
-        val rs1 = statement.executeQuery("SELECT key FROM test_table ORDER BY KEY DESC")
-        val buf1 = new collection.mutable.ArrayBuffer[Int]()
-        while (rs1.next()) {
-          buf1 += rs1.getInt(1)
-        }
-        rs1.close()
-
-        val rs2 = statement.executeQuery("SELECT key FROM test_map ORDER BY KEY DESC")
-        val buf2 = new collection.mutable.ArrayBuffer[Int]()
-        while (rs2.next()) {
-          buf2 += rs2.getInt(1)
+        // current database should still be `default`
+        intercept[SQLException] {
+          statement.executeQuery("SELECT key FROM test_map2")
         }
-        rs2.close()
 
-        assert(buf1 === buf2)
+        statement.execute("USE db1")
+        // access test_map2
+        statement.executeQuery("SELECT key from test_map2")
       }
     )
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/3390b400/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 17de8ef..dad1e23 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit
 import scala.collection.JavaConverters._
 import scala.collection.mutable.HashMap
 import scala.language.implicitConversions
-import scala.concurrent.duration._
 
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.hive.common.StatsSetupConst
@@ -34,32 +33,49 @@ import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 import org.apache.hadoop.hive.ql.metadata.Table
 import org.apache.hadoop.hive.ql.parse.VariableSubstitution
-import org.apache.hadoop.hive.ql.session.SessionState
 import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable}
 
-import org.apache.spark.Logging
-import org.apache.spark.SparkContext
 import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql._
+import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.sql.SQLConf.SQLConfEntry
 import org.apache.spark.sql.SQLConf.SQLConfEntry._
-import org.apache.spark.sql.catalyst.{SqlParser, TableIdentifier, ParserDialect}
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression}
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.execution.{ExecutedCommand, ExtractPythonUDFs, SetCommand}
-import org.apache.spark.sql.execution.datasources.{PreWriteCheck, PreInsertCastAndRename, DataSourceStrategy}
+import org.apache.spark.sql.catalyst.{InternalRow, ParserDialect, SqlParser}
+import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, PreInsertCastAndRename, PreWriteCheck}
+import org.apache.spark.sql.execution.{CacheManager, ExecutedCommand, ExtractPythonUDFs, SetCommand}
 import org.apache.spark.sql.hive.client._
 import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand}
 import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.Utils
+import org.apache.spark.{Logging, SparkContext}
 
 
 /**
  * This is the HiveQL Dialect, this dialect is strongly bind with HiveContext
  */
-private[hive] class HiveQLDialect extends ParserDialect {
+private[hive] class HiveQLDialect(sqlContext: HiveContext) extends ParserDialect {
   override def parse(sqlText: String): LogicalPlan = {
-    HiveQl.parseSql(sqlText)
+    sqlContext.executionHive.withHiveState {
+      HiveQl.parseSql(sqlText)
+    }
+  }
+}
+
+/**
+ * Returns the current database of metadataHive.
+ */
+private[hive] case class CurrentDatabase(ctx: HiveContext)
+  extends LeafExpression with CodegenFallback {
+  override def dataType: DataType = StringType
+  override def foldable: Boolean = true
+  override def nullable: Boolean = false
+  override def eval(input: InternalRow): Any = {
+    UTF8String.fromString(ctx.metadataHive.currentDatabase)
   }
 }
 
@@ -69,14 +85,30 @@ private[hive] class HiveQLDialect extends ParserDialect {
  *
  * @since 1.0.0
  */
-class HiveContext(sc: SparkContext) extends SQLContext(sc) with Logging {
+class HiveContext private[hive](
+    sc: SparkContext,
+    cacheManager: CacheManager,
+    @transient execHive: ClientWrapper,
+    @transient metaHive: ClientInterface) extends SQLContext(sc, cacheManager) with Logging {
   self =>
 
-  import HiveContext._
+  def this(sc: SparkContext) = this(sc, new CacheManager, null, null)
+  def this(sc: JavaSparkContext) = this(sc.sc)
+
+  import org.apache.spark.sql.hive.HiveContext._
 
   logDebug("create HiveContext")
 
   /**
+   * Returns a new HiveContext as new session, which will have separated SQLConf, UDF/UDAF,
+   * temporary tables and SessionState, but sharing the same CacheManager, IsolatedClientLoader
+   * and Hive client (both of execution and metadata) with existing HiveContext.
+   */
+  override def newSession(): HiveContext = {
+    new HiveContext(sc, cacheManager, executionHive.newSession(), metadataHive.newSession())
+  }
+
+  /**
    * When true, enables an experimental feature where metastore tables that use the parquet SerDe
    * are automatically converted to use the Spark SQL parquet table scan, instead of the Hive
    * SerDe.
@@ -157,14 +189,18 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) with Logging {
    * for storing persistent metadata, and only point to a dummy metastore in a temporary directory.
    */
   @transient
-  protected[hive] lazy val executionHive: ClientWrapper = {
+  protected[hive] lazy val executionHive: ClientWrapper = if (execHive != null) {
+    execHive
+  } else {
     logInfo(s"Initializing execution hive, version $hiveExecutionVersion")
-    new ClientWrapper(
+    val loader = new IsolatedClientLoader(
       version = IsolatedClientLoader.hiveVersion(hiveExecutionVersion),
+      execJars = Seq(),
       config = newTemporaryConfiguration(),
-      initClassLoader = Utils.getContextOrSparkClassLoader)
+      isolationOn = false,
+      baseClassLoader = Utils.getContextOrSparkClassLoader)
+    loader.createClient().asInstanceOf[ClientWrapper]
   }
-  SessionState.setCurrentSessionState(executionHive.state)
 
   /**
    * Overrides default Hive configurations to avoid breaking changes to Spark SQL users.
@@ -182,7 +218,9 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) with Logging {
    * in the hive-site.xml file.
    */
   @transient
-  protected[hive] lazy val metadataHive: ClientInterface = {
+  protected[hive] lazy val metadataHive: ClientInterface = if (metaHive != null) {
+    metaHive
+  } else {
     val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion)
 
     // We instantiate a HiveConf here to read in the hive-site.xml file and then pass the options
@@ -268,14 +306,10 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) with Logging {
         barrierPrefixes = hiveMetastoreBarrierPrefixes,
         sharedPrefixes = hiveMetastoreSharedPrefixes)
     }
-    isolatedLoader.client
+    isolatedLoader.createClient()
   }
 
   protected[sql] override def parseSql(sql: String): LogicalPlan = {
-    var state = SessionState.get()
-    if (state == null) {
-      SessionState.setCurrentSessionState(tlSession.get().asInstanceOf[SQLSession].sessionState)
-    }
     super.parseSql(substitutor.substitute(hiveconf, sql))
   }
 
@@ -384,8 +418,6 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) with Logging {
     }
   }
 
-  protected[hive] def hiveconf = tlSession.get().asInstanceOf[this.SQLSession].hiveconf
-
   override def setConf(key: String, value: String): Unit = {
     super.setConf(key, value)
     executionHive.runSqlHive(s"SET $key=$value")
@@ -402,7 +434,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) with Logging {
     setConf(entry.key, entry.stringConverter(value))
   }
 
-    /* A catalyst metadata catalog that points to the Hive Metastore. */
+  /* A catalyst metadata catalog that points to the Hive Metastore. */
   @transient
   override protected[sql] lazy val catalog =
     new HiveMetastoreCatalog(metadataHive, this) with OverrideCatalog
@@ -410,7 +442,13 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) with Logging {
   // Note that HiveUDFs will be overridden by functions registered in this context.
   @transient
   override protected[sql] lazy val functionRegistry: FunctionRegistry =
-    new HiveFunctionRegistry(FunctionRegistry.builtin)
+    new HiveFunctionRegistry(FunctionRegistry.builtin.copy())
+
+  // The Hive UDF current_database() is foldable, will be evaluated by optimizer, but the optimizer
+  // can't access the SessionState of metadataHive.
+  functionRegistry.registerFunction(
+    "current_database",
+    (expressions: Seq[Expression]) => new CurrentDatabase(this))
 
   /* An analyzer that uses the Hive metastore. */
   @transient
@@ -430,10 +468,6 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) with Logging {
       )
     }
 
-  override protected[sql] def createSession(): SQLSession = {
-    new this.SQLSession()
-  }
-
   /** Overridden by child classes that need to set configuration before the client init. */
   protected def configure(): Map[String, String] = {
     // Hive 0.14.0 introduces timeout operations in HiveConf, and changes default values of a bunch
@@ -488,41 +522,40 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) with Logging {
     }.toMap
   }
 
-  protected[hive] class SQLSession extends super.SQLSession {
-    protected[sql] override lazy val conf: SQLConf = new SQLConf {
-      override def dialect: String = getConf(SQLConf.DIALECT, "hiveql")
-      override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false)
-    }
-
-    /**
-     * SQLConf and HiveConf contracts:
-     *
-     * 1. reuse existing started SessionState if any
-     * 2. when the Hive session is first initialized, params in HiveConf will get picked up by the
-     *    SQLConf.  Additionally, any properties set by set() or a SET command inside sql() will be
-     *    set in the SQLConf *as well as* in the HiveConf.
-     */
-    protected[hive] lazy val sessionState: SessionState = {
-      var state = SessionState.get()
-      if (state == null) {
-        state = new SessionState(new HiveConf(classOf[SessionState]))
-        SessionState.start(state)
-      }
-      state
-    }
+  /**
+   * SQLConf and HiveConf contracts:
+   *
+   * 1. create a new SessionState for each HiveContext
+   * 2. when the Hive session is first initialized, params in HiveConf will get picked up by the
+   *    SQLConf.  Additionally, any properties set by set() or a SET command inside sql() will be
+   *    set in the SQLConf *as well as* in the HiveConf.
+   */
+  @transient
+  protected[hive] lazy val hiveconf: HiveConf = {
+    val c = executionHive.conf
+    setConf(c.getAllProperties)
+    c
+  }
 
-    protected[hive] lazy val hiveconf: HiveConf = {
-      setConf(sessionState.getConf.getAllProperties)
-      sessionState.getConf
-    }
+  protected[sql] override lazy val conf: SQLConf = new SQLConf {
+    override def dialect: String = getConf(SQLConf.DIALECT, "hiveql")
+    override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false)
   }
 
-  override protected[sql] def dialectClassName = if (conf.dialect == "hiveql") {
+  protected[sql] override def dialectClassName = if (conf.dialect == "hiveql") {
     classOf[HiveQLDialect].getCanonicalName
   } else {
     super.dialectClassName
   }
 
+  protected[sql] override def getSQLDialect(): ParserDialect = {
+    if (conf.dialect == "hiveql") {
+      new HiveQLDialect(this)
+    } else {
+      super.getSQLDialect()
+    }
+  }
+
   @transient
   private val hivePlanner = new SparkPlanner with HiveStrategies {
     val hiveContext = self
@@ -598,6 +631,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) with Logging {
         case _ => super.simpleString
       }
   }
+
+  protected[sql] override def addJar(path: String): Unit = {
+    // Add jar to Hive and classloader
+    executionHive.addJar(path)
+    metadataHive.addJar(path)
+    Thread.currentThread().setContextClassLoader(executionHive.clientLoader.classLoader)
+    super.addJar(path)
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3390b400/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 2bf22f5..250c232 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -25,29 +25,27 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
-import org.apache.hadoop.hive.serde.serdeConstants
-import org.apache.hadoop.hive.ql.{ErrorMsg, Context}
-import org.apache.hadoop.hive.ql.exec.{FunctionRegistry, FunctionInfo}
+import org.apache.hadoop.hive.ql.exec.{FunctionInfo, FunctionRegistry}
 import org.apache.hadoop.hive.ql.lib.Node
 import org.apache.hadoop.hive.ql.parse._
 import org.apache.hadoop.hive.ql.plan.PlanUtils
 import org.apache.hadoop.hive.ql.session.SessionState
+import org.apache.hadoop.hive.ql.{Context, ErrorMsg}
+import org.apache.hadoop.hive.serde.serdeConstants
 import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
 
 import org.apache.spark.Logging
-import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst
+import org.apache.spark.sql.{AnalysisException, catalyst}
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans._
-import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.{logical, _}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.trees.CurrentOrigin
 import org.apache.spark.sql.execution.ExplainCommand
 import org.apache.spark.sql.execution.datasources.DescribeCommand
 import org.apache.spark.sql.hive.HiveShim._
 import org.apache.spark.sql.hive.client._
-import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DropTable, AnalyzeTable, HiveScriptIOSchema}
+import org.apache.spark.sql.hive.execution.{AnalyzeTable, DropTable, HiveNativeCommand, HiveScriptIOSchema}
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.CalendarInterval
 import org.apache.spark.util.random.RandomSampler
@@ -268,7 +266,7 @@ private[hive] object HiveQl extends Logging {
     node
   }
 
-  private def createContext(): Context = new Context(SessionState.get().getConf())
+  private def createContext(): Context = new Context(hiveConf)
 
   private def getAst(sql: String, context: Context) =
     ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql, context))
@@ -277,12 +275,16 @@ private[hive] object HiveQl extends Logging {
    * Returns the HiveConf
    */
   private[this] def hiveConf: HiveConf = {
-    val ss = SessionState.get() // SessionState is lazy initialization, it can be null here
+    var ss = SessionState.get()
+    // SessionState is lazy initialization, it can be null here
     if (ss == null) {
-      new HiveConf()
-    } else {
-      ss.getConf
+      val original = Thread.currentThread().getContextClassLoader
+      val conf = new HiveConf(classOf[SessionState])
+      conf.setClassLoader(original)
+      ss = new SessionState(conf)
+      SessionState.start(ss)
     }
+    ss.getConf
   }
 
   /** Returns a LogicalPlan for a given HiveQL string. */

http://git-wip-us.apache.org/repos/asf/spark/blob/3390b400/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala
index 915eae9..9d9a55e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala
@@ -178,6 +178,15 @@ private[hive] trait ClientInterface {
       holdDDLTime: Boolean,
       listBucketingEnabled: Boolean): Unit
 
+  /** Add a jar into class loader */
+  def addJar(path: String): Unit
+
+  /** Return a ClientInterface as new session, that will share the class loader and Hive client */
+  def newSession(): ClientInterface
+
+  /** Run a function within Hive state (SessionState, HiveConf, Hive client and class loader) */
+  def withHiveState[A](f: => A): A
+
   /** Used for testing only.  Removes all metadata from this instance of Hive. */
   def reset(): Unit
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3390b400/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
index 8f6d448..3dce86c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
@@ -60,7 +60,8 @@ import org.apache.spark.util.{CircularBuffer, Utils}
 private[hive] class ClientWrapper(
     override val version: HiveVersion,
     config: Map[String, String],
-    initClassLoader: ClassLoader)
+    initClassLoader: ClassLoader,
+    val clientLoader: IsolatedClientLoader)
   extends ClientInterface
   with Logging {
 
@@ -150,31 +151,29 @@ private[hive] class ClientWrapper(
     // Switch to the initClassLoader.
     Thread.currentThread().setContextClassLoader(initClassLoader)
     val ret = try {
-      val oldState = SessionState.get()
-      if (oldState == null) {
-        val initialConf = new HiveConf(classOf[SessionState])
-        // HiveConf is a Hadoop Configuration, which has a field of classLoader and
-        // the initial value will be the current thread's context class loader
-        // (i.e. initClassLoader at here).
-        // We call initialConf.setClassLoader(initClassLoader) at here to make
-        // this action explicit.
-        initialConf.setClassLoader(initClassLoader)
-        config.foreach { case (k, v) =>
-          if (k.toLowerCase.contains("password")) {
-            logDebug(s"Hive Config: $k=xxx")
-          } else {
-            logDebug(s"Hive Config: $k=$v")
-          }
-          initialConf.set(k, v)
+      val initialConf = new HiveConf(classOf[SessionState])
+      // HiveConf is a Hadoop Configuration, which has a field of classLoader and
+      // the initial value will be the current thread's context class loader
+      // (i.e. initClassLoader at here).
+      // We call initialConf.setClassLoader(initClassLoader) at here to make
+      // this action explicit.
+      initialConf.setClassLoader(initClassLoader)
+      config.foreach { case (k, v) =>
+        if (k.toLowerCase.contains("password")) {
+          logDebug(s"Hive Config: $k=xxx")
+        } else {
+          logDebug(s"Hive Config: $k=$v")
         }
-        val newState = new SessionState(initialConf)
-        SessionState.start(newState)
-        newState.out = new PrintStream(outputBuffer, true, "UTF-8")
-        newState.err = new PrintStream(outputBuffer, true, "UTF-8")
-        newState
-      } else {
-        oldState
+        initialConf.set(k, v)
+      }
+      val state = new SessionState(initialConf)
+      if (clientLoader.cachedHive != null) {
+        Hive.set(clientLoader.cachedHive.asInstanceOf[Hive])
       }
+      SessionState.start(state)
+      state.out = new PrintStream(outputBuffer, true, "UTF-8")
+      state.err = new PrintStream(outputBuffer, true, "UTF-8")
+      state
     } finally {
       Thread.currentThread().setContextClassLoader(original)
     }
@@ -188,11 +187,6 @@ private[hive] class ClientWrapper(
     conf.get(key, defaultValue)
   }
 
-  // TODO: should be a def?s
-  // When we create this val client, the HiveConf of it (conf) is the one associated with state.
-  @GuardedBy("this")
-  private var client = Hive.get(conf)
-
   // We use hive's conf for compatibility.
   private val retryLimit = conf.getIntVar(HiveConf.ConfVars.METASTORETHRIFTFAILURERETRIES)
   private val retryDelayMillis = shim.getMetastoreClientConnectRetryDelayMillis(conf)
@@ -200,7 +194,7 @@ private[hive] class ClientWrapper(
   /**
    * Runs `f` with multiple retries in case the hive metastore is temporarily unreachable.
    */
-  private def retryLocked[A](f: => A): A = synchronized {
+  private def retryLocked[A](f: => A): A = clientLoader.synchronized {
     // Hive sometimes retries internally, so set a deadline to avoid compounding delays.
     val deadline = System.nanoTime + (retryLimit * retryDelayMillis * 1e6).toLong
     var numTries = 0
@@ -215,13 +209,8 @@ private[hive] class ClientWrapper(
           logWarning(
             "HiveClientWrapper got thrift exception, destroying client and retrying " +
               s"(${retryLimit - numTries} tries remaining)", e)
+          clientLoader.cachedHive = null
           Thread.sleep(retryDelayMillis)
-          try {
-            client = Hive.get(state.getConf, true)
-          } catch {
-            case e: Exception if causedByThrift(e) =>
-              logWarning("Failed to refresh hive client, will retry.", e)
-          }
       }
     } while (numTries <= retryLimit && System.nanoTime < deadline)
     if (System.nanoTime > deadline) {
@@ -242,13 +231,26 @@ private[hive] class ClientWrapper(
     false
   }
 
+  def client: Hive = {
+    if (clientLoader.cachedHive != null) {
+      clientLoader.cachedHive.asInstanceOf[Hive]
+    } else {
+      val c = Hive.get(conf)
+      clientLoader.cachedHive = c
+      c
+    }
+  }
+
   /**
    * Runs `f` with ThreadLocal session state and classloaders configured for this version of hive.
    */
-  private def withHiveState[A](f: => A): A = retryLocked {
+  def withHiveState[A](f: => A): A = retryLocked {
     val original = Thread.currentThread().getContextClassLoader
     // Set the thread local metastore client to the client associated with this ClientWrapper.
     Hive.set(client)
+    // The classloader in clientLoader could be changed after addJar, always use the latest
+    // classloader
+    state.getConf.setClassLoader(clientLoader.classLoader)
     // setCurrentSessionState will use the classLoader associated
     // with the HiveConf in `state` to override the context class loader of the current
     // thread.
@@ -545,6 +547,15 @@ private[hive] class ClientWrapper(
       listBucketingEnabled)
   }
 
+  def addJar(path: String): Unit = {
+    clientLoader.addJar(path)
+    runSqlHive(s"ADD JAR $path")
+  }
+
+  def newSession(): ClientWrapper = {
+    clientLoader.createClient().asInstanceOf[ClientWrapper]
+  }
+
   def reset(): Unit = withHiveState {
     client.getAllTables("default").asScala.foreach { t =>
         logDebug(s"Deleting table $t")

http://git-wip-us.apache.org/repos/asf/spark/blob/3390b400/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index 1fe4cba..567e4d7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -22,6 +22,7 @@ import java.lang.reflect.InvocationTargetException
 import java.net.{URL, URLClassLoader}
 import java.util
 
+import scala.collection.mutable
 import scala.language.reflectiveCalls
 import scala.util.Try
 
@@ -148,53 +149,75 @@ private[hive] class IsolatedClientLoader(
     name.replaceAll("\\.", "/") + ".class"
 
   /** The classloader that is used to load an isolated version of Hive. */
-  protected val classLoader: ClassLoader = new URLClassLoader(allJars, rootClassLoader) {
-    override def loadClass(name: String, resolve: Boolean): Class[_] = {
-      val loaded = findLoadedClass(name)
-      if (loaded == null) doLoadClass(name, resolve) else loaded
-    }
-
-    def doLoadClass(name: String, resolve: Boolean): Class[_] = {
-      val classFileName = name.replaceAll("\\.", "/") + ".class"
-      if (isBarrierClass(name) && isolationOn) {
-        // For barrier classes, we construct a new copy of the class.
-        val bytes = IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName))
-        logDebug(s"custom defining: $name - ${util.Arrays.hashCode(bytes)}")
-        defineClass(name, bytes, 0, bytes.length)
-      } else if (!isSharedClass(name)) {
-        logDebug(s"hive class: $name - ${getResource(classToPath(name))}")
-        super.loadClass(name, resolve)
-      } else {
-        // For shared classes, we delegate to baseClassLoader.
-        logDebug(s"shared class: $name")
-        baseClassLoader.loadClass(name)
+  private[hive] var classLoader: ClassLoader = if (isolationOn) {
+    new URLClassLoader(allJars, rootClassLoader) {
+      override def loadClass(name: String, resolve: Boolean): Class[_] = {
+        val loaded = findLoadedClass(name)
+        if (loaded == null) doLoadClass(name, resolve) else loaded
+      }
+      def doLoadClass(name: String, resolve: Boolean): Class[_] = {
+        val classFileName = name.replaceAll("\\.", "/") + ".class"
+        if (isBarrierClass(name)) {
+          // For barrier classes, we construct a new copy of the class.
+          val bytes = IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName))
+          logDebug(s"custom defining: $name - ${util.Arrays.hashCode(bytes)}")
+          defineClass(name, bytes, 0, bytes.length)
+        } else if (!isSharedClass(name)) {
+          logDebug(s"hive class: $name - ${getResource(classToPath(name))}")
+          super.loadClass(name, resolve)
+        } else {
+          // For shared classes, we delegate to baseClassLoader.
+          logDebug(s"shared class: $name")
+          baseClassLoader.loadClass(name)
+        }
       }
     }
+  } else {
+    baseClassLoader
   }
 
-  // Pre-reflective instantiation setup.
-  logDebug("Initializing the logger to avoid disaster...")
-  Thread.currentThread.setContextClassLoader(classLoader)
+  private[hive] def addJar(path: String): Unit = synchronized {
+    val jarURL = new java.io.File(path).toURI.toURL
+    // TODO: we should avoid of stacking classloaders (use a single URLClassLoader and add jars
+    // to that)
+    classLoader = new java.net.URLClassLoader(Array(jarURL), classLoader)
+  }
 
   /** The isolated client interface to Hive. */
-  val client: ClientInterface = try {
-    classLoader
-      .loadClass(classOf[ClientWrapper].getName)
-      .getConstructors.head
-      .newInstance(version, config, classLoader)
-      .asInstanceOf[ClientInterface]
-  } catch {
-    case e: InvocationTargetException =>
-      if (e.getCause().isInstanceOf[NoClassDefFoundError]) {
-        val cnf = e.getCause().asInstanceOf[NoClassDefFoundError]
-        throw new ClassNotFoundException(
-          s"$cnf when creating Hive client using classpath: ${execJars.mkString(", ")}\n" +
-           "Please make sure that jars for your version of hive and hadoop are included in the " +
-          s"paths passed to ${HiveContext.HIVE_METASTORE_JARS}.")
-      } else {
-        throw e
-      }
-  } finally {
-    Thread.currentThread.setContextClassLoader(baseClassLoader)
+  private[hive] def createClient(): ClientInterface = {
+    if (!isolationOn) {
+      return new ClientWrapper(version, config, baseClassLoader, this)
+    }
+    // Pre-reflective instantiation setup.
+    logDebug("Initializing the logger to avoid disaster...")
+    val origLoader = Thread.currentThread().getContextClassLoader
+    Thread.currentThread.setContextClassLoader(classLoader)
+
+    try {
+      classLoader
+        .loadClass(classOf[ClientWrapper].getName)
+        .getConstructors.head
+        .newInstance(version, config, classLoader, this)
+        .asInstanceOf[ClientInterface]
+    } catch {
+      case e: InvocationTargetException =>
+        if (e.getCause().isInstanceOf[NoClassDefFoundError]) {
+          val cnf = e.getCause().asInstanceOf[NoClassDefFoundError]
+          throw new ClassNotFoundException(
+            s"$cnf when creating Hive client using classpath: ${execJars.mkString(", ")}\n" +
+            "Please make sure that jars for your version of hive and hadoop are included in the " +
+            s"paths passed to ${HiveContext.HIVE_METASTORE_JARS}.")
+        } else {
+          throw e
+        }
+    } finally {
+      Thread.currentThread.setContextClassLoader(origLoader)
+    }
   }
+
+  /**
+   * The place holder for shared Hive client for all the HiveContext sessions (they share an
+   * IsolatedClientLoader).
+   */
+  private[hive] var cachedHive: Any = null
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3390b400/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 9f654ee..51ec92a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -18,18 +18,18 @@
 package org.apache.spark.sql.hive.execution
 
 import org.apache.hadoop.hive.metastore.MetaStoreUtils
+
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.{TableIdentifier, SqlParser}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.execution.RunnableCommand
-import org.apache.spark.sql.execution.datasources.{ResolvedDataSource, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.{LogicalRelation, ResolvedDataSource}
 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
-import org.apache.spark.util.Utils
 
 /**
  * Analyzes the given table in the current database to generate statistics, which will be
@@ -86,26 +86,7 @@ case class AddJar(path: String) extends RunnableCommand {
   }
 
   override def run(sqlContext: SQLContext): Seq[Row] = {
-    val hiveContext = sqlContext.asInstanceOf[HiveContext]
-    val currentClassLoader = Utils.getContextOrSparkClassLoader
-
-    // Add jar to current context
-    val jarURL = new java.io.File(path).toURI.toURL
-    val newClassLoader = new java.net.URLClassLoader(Array(jarURL), currentClassLoader)
-    Thread.currentThread.setContextClassLoader(newClassLoader)
-    // We need to explicitly set the class loader associated with the conf in executionHive's
-    // state because this class loader will be used as the context class loader of the current
-    // thread to execute any Hive command.
-    // We cannot use `org.apache.hadoop.hive.ql.metadata.Hive.get().getConf()` because Hive.get()
-    // returns the value of a thread local variable and its HiveConf may not be the HiveConf
-    // associated with `executionHive.state` (for example, HiveContext is created in one thread
-    // and then add jar is called from another thread).
-    hiveContext.executionHive.state.getConf.setClassLoader(newClassLoader)
-    // Add jar to isolated hive (metadataHive) class loader.
-    hiveContext.runSqlHive(s"ADD JAR $path")
-
-    // Add jar to executors
-    hiveContext.sparkContext.addJar(path)
+    sqlContext.addJar(path)
 
     Seq(Row(0))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/3390b400/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index be335a4..ff39ccb 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -116,27 +116,18 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
   override def executePlan(plan: LogicalPlan): this.QueryExecution =
     new this.QueryExecution(plan)
 
-  // Make sure we set those test specific confs correctly when we create
-  // the SQLConf as well as when we call clear.
-  override protected[sql] def createSession(): SQLSession = {
-    new this.SQLSession()
-  }
-
-  protected[hive] class SQLSession extends super.SQLSession {
-    protected[sql] override lazy val conf: SQLConf = new SQLConf {
-      // TODO as in unit test, conf.clear() probably be called, all of the value will be cleared.
-      // The super.getConf(SQLConf.DIALECT) is "sql" by default, we need to set it as "hiveql"
-      override def dialect: String = super.getConf(SQLConf.DIALECT, "hiveql")
-      override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false)
+  protected[sql] override lazy val conf: SQLConf = new SQLConf {
+    // The super.getConf(SQLConf.DIALECT) is "sql" by default, we need to set it as "hiveql"
+    override def dialect: String = super.getConf(SQLConf.DIALECT, "hiveql")
+    override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false)
 
-      clear()
+    clear()
 
-      override def clear(): Unit = {
-        super.clear()
+    override def clear(): Unit = {
+      super.clear()
 
-        TestHiveContext.overrideConfs.map {
-          case (key, value) => setConfString(key, value)
-        }
+      TestHiveContext.overrideConfs.map {
+        case (key, value) => setConfString(key, value)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/3390b400/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala
index 79cf40a..528a739 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala
@@ -17,22 +17,15 @@
 
 package org.apache.spark.sql.hive
 
-import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.ql.session.SessionState
 import org.apache.hadoop.hive.serde.serdeConstants
+import org.scalatest.BeforeAndAfterAll
+
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.hive.client.{ManagedTable, HiveColumn, ExternalTable, HiveTable}
-import org.scalatest.BeforeAndAfterAll
+import org.apache.spark.sql.hive.client.{ExternalTable, HiveColumn, HiveTable, ManagedTable}
 
 
 class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll {
-  override def beforeAll() {
-    if (SessionState.get() == null) {
-      SessionState.start(new HiveConf())
-    }
-  }
-
   private def extractTableDesc(sql: String): (HiveTable, Boolean) = {
     HiveQl.createPlan(sql).collect {
       case CreateTableAsSelect(desc, child, allowExisting) => (desc, allowExisting)

http://git-wip-us.apache.org/repos/asf/spark/blob/3390b400/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 2da22ec..c6d034a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -53,7 +53,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
   test("success sanity check") {
     val badClient = IsolatedClientLoader.forVersion(HiveContext.hiveExecutionVersion,
       buildConf(),
-      ivyPath).client
+      ivyPath).createClient()
     val db = new HiveDatabase("default", "")
     badClient.createDatabase(db)
   }
@@ -83,7 +83,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
   ignore("failure sanity check") {
     val e = intercept[Throwable] {
       val badClient = quietly {
-        IsolatedClientLoader.forVersion("13", buildConf(), ivyPath).client
+        IsolatedClientLoader.forVersion("13", buildConf(), ivyPath).createClient()
       }
     }
     assert(getNestedMessages(e) contains "Unknown column 'A0.OWNER_NAME' in 'field list'")
@@ -97,7 +97,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
     test(s"$version: create client") {
       client = null
       System.gc() // Hack to avoid SEGV on some JVM versions.
-      client = IsolatedClientLoader.forVersion(version, buildConf(), ivyPath).client
+      client = IsolatedClientLoader.forVersion(version, buildConf(), ivyPath).createClient()
     }
 
     test(s"$version: createDatabase") {

http://git-wip-us.apache.org/repos/asf/spark/blob/3390b400/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index fe63ad5..2878500 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -1133,6 +1133,38 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
     conf.clear()
   }
 
+  test("current_database with multiple sessions") {
+    sql("create database a")
+    sql("use a")
+    val s2 = newSession()
+    s2.sql("create database b")
+    s2.sql("use b")
+
+    assert(sql("select current_database()").first() === Row("a"))
+    assert(s2.sql("select current_database()").first() === Row("b"))
+
+    try {
+      sql("create table test_a(key INT, value STRING)")
+      s2.sql("create table test_b(key INT, value STRING)")
+
+      sql("select * from test_a")
+      intercept[AnalysisException] {
+        sql("select * from test_b")
+      }
+      sql("select * from b.test_b")
+
+      s2.sql("select * from test_b")
+      intercept[AnalysisException] {
+        s2.sql("select * from test_a")
+      }
+      s2.sql("select * from a.test_a")
+    } finally {
+      sql("DROP TABLE IF EXISTS test_a")
+      s2.sql("DROP TABLE IF EXISTS test_b")
+    }
+
+  }
+
   createQueryTest("select from thrift based table",
     "SELECT * from src_thrift")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3390b400/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index ec5b83b..ccc15ea 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -160,10 +160,15 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
   }
 
   test("show functions") {
-    val allFunctions =
+    val allBuiltinFunctions =
       (FunctionRegistry.builtin.listFunction().toSet[String] ++
         org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionNames.asScala).toList.sorted
-    checkAnswer(sql("SHOW functions"), allFunctions.map(Row(_)))
+    // The TestContext is shared by all the test cases, some functions may be registered before
+    // this, so we check that all the builtin functions are returned.
+    val allFunctions = sql("SHOW functions").collect().map(r => r(0))
+    allBuiltinFunctions.foreach { f =>
+      assert(allFunctions.contains(f))
+    }
     checkAnswer(sql("SHOW functions abs"), Row("abs"))
     checkAnswer(sql("SHOW functions 'abs'"), Row("abs"))
     checkAnswer(sql("SHOW functions abc.abs"), Row("abs"))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org