You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/08/17 16:24:52 UTC

spark git commit: [SPARK-21428] Turn IsolatedClientLoader off while using builtin Hive jars for reusing CliSessionState

Repository: spark
Updated Branches:
  refs/heads/master d695a528b -> b83b502c4


[SPARK-21428] Turn IsolatedClientLoader off while using builtin Hive jars for reusing CliSessionState

## What changes were proposed in this pull request?

Set isolated to false while using builtin hive jars and `SessionState.get` returns a `CliSessionState` instance.

## How was this patch tested?

1 Unit Tests
2 Manually verified: `hive.exec.strachdir` was only created once because of reusing cliSessionState
```java
➜  spark git:(SPARK-21428) ✗ bin/spark-sql --conf spark.sql.hive.metastore.jars=builtin

log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
17/07/16 23:59:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/07/16 23:59:27 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
17/07/16 23:59:27 INFO ObjectStore: ObjectStore, initialize called
17/07/16 23:59:28 INFO Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored
17/07/16 23:59:28 INFO Persistence: Property datanucleus.cache.level2 unknown - will be ignored
17/07/16 23:59:29 INFO ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes="Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order"
17/07/16 23:59:30 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
17/07/16 23:59:30 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
17/07/16 23:59:31 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
17/07/16 23:59:31 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
17/07/16 23:59:31 INFO MetaStoreDirectSql: Using direct SQL, underlying DB is DERBY
17/07/16 23:59:31 INFO ObjectStore: Initialized ObjectStore
17/07/16 23:59:31 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
17/07/16 23:59:31 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
17/07/16 23:59:32 INFO HiveMetaStore: Added admin role in metastore
17/07/16 23:59:32 INFO HiveMetaStore: Added public role in metastore
17/07/16 23:59:32 INFO HiveMetaStore: No user is added in admin role, since config is empty
17/07/16 23:59:32 INFO HiveMetaStore: 0: get_all_databases
17/07/16 23:59:32 INFO audit: ugi=Kent	ip=unknown-ip-addr	cmd=get_all_databases
17/07/16 23:59:32 INFO HiveMetaStore: 0: get_functions: db=default pat=*
17/07/16 23:59:32 INFO audit: ugi=Kent	ip=unknown-ip-addr	cmd=get_functions: db=default pat=*
17/07/16 23:59:32 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MResourceUri" is tagged as "embedded-only" so does not have its own datastore table.
17/07/16 23:59:32 INFO SessionState: Created local directory: /var/folders/k2/04p4k4ws73l6711h_mz2_tq00000gn/T/beea7261-221a-4711-89e8-8b12a9d37370_resources
17/07/16 23:59:32 INFO SessionState: Created HDFS directory: /tmp/hive/Kent/beea7261-221a-4711-89e8-8b12a9d37370
17/07/16 23:59:32 INFO SessionState: Created local directory: /var/folders/k2/04p4k4ws73l6711h_mz2_tq00000gn/T/Kent/beea7261-221a-4711-89e8-8b12a9d37370
17/07/16 23:59:32 INFO SessionState: Created HDFS directory: /tmp/hive/Kent/beea7261-221a-4711-89e8-8b12a9d37370/_tmp_space.db
17/07/16 23:59:32 INFO SparkContext: Running Spark version 2.3.0-SNAPSHOT
17/07/16 23:59:32 INFO SparkContext: Submitted application: SparkSQL::10.0.0.8
17/07/16 23:59:32 INFO SecurityManager: Changing view acls to: Kent
17/07/16 23:59:32 INFO SecurityManager: Changing modify acls to: Kent
17/07/16 23:59:32 INFO SecurityManager: Changing view acls groups to:
17/07/16 23:59:32 INFO SecurityManager: Changing modify acls groups to:
17/07/16 23:59:32 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(Kent); groups with view permissions: Set(); users  with modify permissions: Set(Kent); groups with modify permissions: Set()
17/07/16 23:59:33 INFO Utils: Successfully started service 'sparkDriver' on port 51889.
17/07/16 23:59:33 INFO SparkEnv: Registering MapOutputTracker
17/07/16 23:59:33 INFO SparkEnv: Registering BlockManagerMaster
17/07/16 23:59:33 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
17/07/16 23:59:33 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
17/07/16 23:59:33 INFO DiskBlockManager: Created local directory at /private/var/folders/k2/04p4k4ws73l6711h_mz2_tq00000gn/T/blockmgr-9cfae28a-01e9-4c73-a1f1-f76fa52fc7a5
17/07/16 23:59:33 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
17/07/16 23:59:33 INFO SparkEnv: Registering OutputCommitCoordinator
17/07/16 23:59:33 INFO Utils: Successfully started service 'SparkUI' on port 4040.
17/07/16 23:59:33 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://10.0.0.8:4040
17/07/16 23:59:33 INFO Executor: Starting executor ID driver on host localhost
17/07/16 23:59:33 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 51890.
17/07/16 23:59:33 INFO NettyBlockTransferService: Server created on 10.0.0.8:51890
17/07/16 23:59:33 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
17/07/16 23:59:33 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 10.0.0.8, 51890, None)
17/07/16 23:59:33 INFO BlockManagerMasterEndpoint: Registering block manager 10.0.0.8:51890 with 366.3 MB RAM, BlockManagerId(driver, 10.0.0.8, 51890, None)
17/07/16 23:59:33 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 10.0.0.8, 51890, None)
17/07/16 23:59:33 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 10.0.0.8, 51890, None)
17/07/16 23:59:34 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/Users/Kent/Documents/spark/spark-warehouse').
17/07/16 23:59:34 INFO SharedState: Warehouse path is 'file:/Users/Kent/Documents/spark/spark-warehouse'.
17/07/16 23:59:34 INFO HiveUtils: Initializing HiveMetastoreConnection version 1.2.1 using Spark classes.
17/07/16 23:59:34 INFO HiveClientImpl: Warehouse location for Hive client (version 1.2.2) is /user/hive/warehouse
17/07/16 23:59:34 INFO HiveMetaStore: 0: get_database: default
17/07/16 23:59:34 INFO audit: ugi=Kent	ip=unknown-ip-addr	cmd=get_database: default
17/07/16 23:59:34 INFO HiveClientImpl: Warehouse location for Hive client (version 1.2.2) is /user/hive/warehouse
17/07/16 23:59:34 INFO HiveMetaStore: 0: get_database: global_temp
17/07/16 23:59:34 INFO audit: ugi=Kent	ip=unknown-ip-addr	cmd=get_database: global_temp
17/07/16 23:59:34 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
17/07/16 23:59:34 INFO HiveClientImpl: Warehouse location for Hive client (version 1.2.2) is /user/hive/warehouse
17/07/16 23:59:34 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
spark-sql>

```
cc cloud-fan gatorsmile

Author: Kent Yao <ya...@hotmail.com>
Author: hzyaoqin <hz...@corp.netease.com>

Closes #18648 from yaooqinn/SPARK-21428.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b83b502c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b83b502c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b83b502c

Branch: refs/heads/master
Commit: b83b502c4189c571bda776511c6f7541c6067aae
Parents: d695a52
Author: Kent Yao <ya...@hotmail.com>
Authored: Fri Aug 18 00:24:45 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Fri Aug 18 00:24:45 2017 +0800

----------------------------------------------------------------------
 .../apache/spark/deploy/SparkHadoopUtil.scala   |  12 +-
 .../org/apache/spark/deploy/SparkSubmit.scala   |  19 +--
 .../thriftserver/HiveCliSessionStateSuite.scala |  64 ++++++++
 .../org/apache/spark/sql/hive/HiveUtils.scala   |  19 ++-
 .../spark/sql/hive/client/HiveClient.scala      |   6 +
 .../spark/sql/hive/client/HiveClientImpl.scala  | 158 ++++++++-----------
 6 files changed, 170 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b83b502c/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index e26f61d..2a92ef9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.deploy
 
-import java.io.IOException
+import java.io.{File, IOException}
 import java.security.PrivilegedExceptionAction
 import java.text.DateFormat
 import java.util.{Arrays, Comparator, Date, Locale}
@@ -155,8 +155,14 @@ class SparkHadoopUtil extends Logging {
 
   def getSecretKeyFromUserCredentials(key: String): Array[Byte] = { null }
 
-  def loginUserFromKeytab(principalName: String, keytabFilename: String) {
-    UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename)
+  def loginUserFromKeytab(principalName: String, keytabFilename: String): Unit = {
+    if (!new File(keytabFilename).exists()) {
+      throw new SparkException(s"Keytab file: ${keytabFilename} does not exist")
+    } else {
+      logInfo("Attempting to login to Kerberos" +
+        s" using principal: ${principalName} and keytab: ${keytabFilename}")
+      UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename)
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/b83b502c/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 0197800..6d744a0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -559,18 +559,13 @@ object SparkSubmit extends CommandLineUtils {
     if (clusterManager == YARN || clusterManager == LOCAL) {
       if (args.principal != null) {
         require(args.keytab != null, "Keytab must be specified when principal is specified")
-        if (!new File(args.keytab).exists()) {
-          throw new SparkException(s"Keytab file: ${args.keytab} does not exist")
-        } else {
-          // Add keytab and principal configurations in sysProps to make them available
-          // for later use; e.g. in spark sql, the isolated class loader used to talk
-          // to HiveMetastore will use these settings. They will be set as Java system
-          // properties and then loaded by SparkConf
-          sysProps.put("spark.yarn.keytab", args.keytab)
-          sysProps.put("spark.yarn.principal", args.principal)
-
-          UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
-        }
+        SparkHadoopUtil.get.loginUserFromKeytab(args.principal, args.keytab)
+        // Add keytab and principal configurations in sysProps to make them available
+        // for later use; e.g. in spark sql, the isolated class loader used to talk
+        // to HiveMetastore will use these settings. They will be set as Java system
+        // properties and then loaded by SparkConf
+        sysProps.put("spark.yarn.keytab", args.keytab)
+        sysProps.put("spark.yarn.principal", args.principal)
       }
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b83b502c/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveCliSessionStateSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveCliSessionStateSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveCliSessionStateSuite.scala
new file mode 100644
index 0000000..5f9ea4d
--- /dev/null
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveCliSessionStateSuite.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.thriftserver
+
+import org.apache.hadoop.hive.cli.CliSessionState
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.ql.session.SessionState
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.sql.hive.HiveUtils
+
+class HiveCliSessionStateSuite extends SparkFunSuite {
+
+  def withSessionClear(f: () => Unit): Unit = {
+    try f finally SessionState.detachSession()
+  }
+
+  test("CliSessionState will be reused") {
+    withSessionClear { () =>
+      val hiveConf = new HiveConf(classOf[SessionState])
+      HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false).foreach {
+        case (key, value) => hiveConf.set(key, value)
+      }
+      val sessionState: SessionState = new CliSessionState(hiveConf)
+      SessionState.start(sessionState)
+      val s1 = SessionState.get
+      val sparkConf = new SparkConf()
+      val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
+      val s2 = HiveUtils.newClientForMetadata(sparkConf, hadoopConf).getState
+      assert(s1 === s2)
+      assert(s2.isInstanceOf[CliSessionState])
+    }
+  }
+
+  test("SessionState will not be reused") {
+    withSessionClear { () =>
+      val sparkConf = new SparkConf()
+      val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
+      HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false).foreach {
+        case (key, value) => hadoopConf.set(key, value)
+      }
+      val hiveClient = HiveUtils.newClientForMetadata(sparkConf, hadoopConf)
+      val s1 = hiveClient.getState
+      val s2 = hiveClient.newSession().getState
+      assert(s1 !== s2)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/b83b502c/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
index 6439250..561c127 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
@@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.hive.common.`type`.HiveDecimal
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
+import org.apache.hadoop.hive.ql.session.SessionState
 import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable}
 import org.apache.hadoop.util.VersionInfo
 
@@ -231,6 +232,22 @@ private[spark] object HiveUtils extends Logging {
   }
 
   /**
+   * Check current Thread's SessionState type
+   * @return true when SessionState.get returns an instance of CliSessionState,
+   *         false when it gets non-CliSessionState instance or null
+   */
+  def isCliSessionState(): Boolean = {
+    val state = SessionState.get
+    var temp: Class[_] = if (state != null) state.getClass else null
+    var found = false
+    while (temp != null && !found) {
+      found = temp.getName == "org.apache.hadoop.hive.cli.CliSessionState"
+      temp = temp.getSuperclass
+    }
+    found
+  }
+
+  /**
    * Create a [[HiveClient]] used for execution.
    *
    * Currently this must always be Hive 13 as this is the version of Hive that is packaged
@@ -313,7 +330,7 @@ private[spark] object HiveUtils extends Logging {
         hadoopConf = hadoopConf,
         execJars = jars.toSeq,
         config = configurations,
-        isolationOn = true,
+        isolationOn = !isCliSessionState(),
         barrierPrefixes = hiveMetastoreBarrierPrefixes,
         sharedPrefixes = hiveMetastoreSharedPrefixes)
     } else if (hiveMetastoreJars == "maven") {

http://git-wip-us.apache.org/repos/asf/spark/blob/b83b502c/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index 16a80f9..8cff0ca 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -39,6 +39,12 @@ private[hive] trait HiveClient {
   def getConf(key: String, defaultValue: String): String
 
   /**
+   * Return the associated Hive SessionState of this [[HiveClientImpl]]
+   * @return [[Any]] not SessionState to avoid linkage error
+   */
+  def getState: Any
+
+  /**
    * Runs a HiveQL command using Hive, returning the results as a list of strings.  Each row will
    * result in one string.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/b83b502c/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index bde9a81..5e5c0a2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -35,9 +35,9 @@ import org.apache.hadoop.hive.ql.metadata.{Hive, Partition => HivePartition, Tab
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC
 import org.apache.hadoop.hive.ql.processors._
 import org.apache.hadoop.hive.ql.session.SessionState
-import org.apache.hadoop.security.UserGroupInformation
 
 import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.metrics.source.HiveCatalogMetrics
 import org.apache.spark.sql.AnalysisException
@@ -105,100 +105,33 @@ private[hive] class HiveClientImpl(
   // Create an internal session state for this HiveClientImpl.
   val state: SessionState = {
     val original = Thread.currentThread().getContextClassLoader
-    // Switch to the initClassLoader.
-    Thread.currentThread().setContextClassLoader(initClassLoader)
-
-    // Set up kerberos credentials for UserGroupInformation.loginUser within
-    // current class loader
-    if (sparkConf.contains("spark.yarn.principal") && sparkConf.contains("spark.yarn.keytab")) {
-      val principalName = sparkConf.get("spark.yarn.principal")
-      val keytabFileName = sparkConf.get("spark.yarn.keytab")
-      if (!new File(keytabFileName).exists()) {
-        throw new SparkException(s"Keytab file: ${keytabFileName}" +
-          " specified in spark.yarn.keytab does not exist")
-      } else {
-        logInfo("Attempting to login to Kerberos" +
-          s" using principal: ${principalName} and keytab: ${keytabFileName}")
-        UserGroupInformation.loginUserFromKeytab(principalName, keytabFileName)
-      }
-    }
-
-    def isCliSessionState(state: SessionState): Boolean = {
-      var temp: Class[_] = if (state != null) state.getClass else null
-      var found = false
-      while (temp != null && !found) {
-        found = temp.getName == "org.apache.hadoop.hive.cli.CliSessionState"
-        temp = temp.getSuperclass
+    if (clientLoader.isolationOn) {
+      // Switch to the initClassLoader.
+      Thread.currentThread().setContextClassLoader(initClassLoader)
+      // Set up kerberos credentials for UserGroupInformation.loginUser within current class loader
+      if (sparkConf.contains("spark.yarn.principal") && sparkConf.contains("spark.yarn.keytab")) {
+        val principal = sparkConf.get("spark.yarn.principal")
+        val keytab = sparkConf.get("spark.yarn.keytab")
+        SparkHadoopUtil.get.loginUserFromKeytab(principal, keytab)
       }
-      found
-    }
-
-    val ret = try {
-      // originState will be created if not exists, will never be null
-      val originalState = SessionState.get()
-      if (isCliSessionState(originalState)) {
-        // In `SparkSQLCLIDriver`, we have already started a `CliSessionState`,
-        // which contains information like configurations from command line. Later
-        // we call `SparkSQLEnv.init()` there, which would run into this part again.
-        // so we should keep `conf` and reuse the existing instance of `CliSessionState`.
-        originalState
-      } else {
-        val hiveConf = new HiveConf(classOf[SessionState])
-        // 1: we set all confs in the hadoopConf to this hiveConf.
-        // This hadoopConf contains user settings in Hadoop's core-site.xml file
-        // and Hive's hive-site.xml file. Note, we load hive-site.xml file manually in
-        // SharedState and put settings in this hadoopConf instead of relying on HiveConf
-        // to load user settings. Otherwise, HiveConf's initialize method will override
-        // settings in the hadoopConf. This issue only shows up when spark.sql.hive.metastore.jars
-        // is not set to builtin. When spark.sql.hive.metastore.jars is builtin, the classpath
-        // has hive-site.xml. So, HiveConf will use that to override its default values.
-        hadoopConf.iterator().asScala.foreach { entry =>
-          val key = entry.getKey
-          val value = entry.getValue
-          if (key.toLowerCase(Locale.ROOT).contains("password")) {
-            logDebug(s"Applying Hadoop and Hive config to Hive Conf: $key=xxx")
-          } else {
-            logDebug(s"Applying Hadoop and Hive config to Hive Conf: $key=$value")
-          }
-          hiveConf.set(key, value)
-        }
-        // HiveConf is a Hadoop Configuration, which has a field of classLoader and
-        // the initial value will be the current thread's context class loader
-        // (i.e. initClassLoader at here).
-        // We call initialConf.setClassLoader(initClassLoader) at here to make
-        // this action explicit.
-        hiveConf.setClassLoader(initClassLoader)
-        // 2: we set all spark confs to this hiveConf.
-        sparkConf.getAll.foreach { case (k, v) =>
-          if (k.toLowerCase(Locale.ROOT).contains("password")) {
-            logDebug(s"Applying Spark config to Hive Conf: $k=xxx")
-          } else {
-            logDebug(s"Applying Spark config to Hive Conf: $k=$v")
-          }
-          hiveConf.set(k, v)
-        }
-        // 3: we set all entries in config to this hiveConf.
-        extraConfig.foreach { case (k, v) =>
-          if (k.toLowerCase(Locale.ROOT).contains("password")) {
-            logDebug(s"Applying extra config to HiveConf: $k=xxx")
-          } else {
-            logDebug(s"Applying extra config to HiveConf: $k=$v")
-          }
-          hiveConf.set(k, v)
-        }
-        val state = new SessionState(hiveConf)
-        if (clientLoader.cachedHive != null) {
-          Hive.set(clientLoader.cachedHive.asInstanceOf[Hive])
-        }
-        SessionState.start(state)
-        state.out = new PrintStream(outputBuffer, true, "UTF-8")
-        state.err = new PrintStream(outputBuffer, true, "UTF-8")
-        state
+      try {
+        newState()
+      } finally {
+        Thread.currentThread().setContextClassLoader(original)
       }
-    } finally {
-      Thread.currentThread().setContextClassLoader(original)
+    } else {
+      // Isolation off means we detect a CliSessionState instance in current thread.
+      // 1: Inside the spark project, we have already started a CliSessionState in
+      // `SparkSQLCLIDriver`, which contains configurations from command lines. Later, we call
+      // `SparkSQLEnv.init()` there, which would new a hive client again. so we should keep those
+      // configurations and reuse the existing instance of `CliSessionState`. In this case,
+      // SessionState.get will always return a CliSessionState.
+      // 2: In another case, a user app may start a CliSessionState outside spark project with built
+      // in hive jars, which will turn off isolation, if SessionSate.detachSession is
+      // called to remove the current state after that, hive client created later will initialize
+      // its own state by newState()
+      Option(SessionState.get).getOrElse(newState())
     }
-    ret
   }
 
   // Log the default warehouse location.
@@ -206,6 +139,44 @@ private[hive] class HiveClientImpl(
     s"Warehouse location for Hive client " +
       s"(version ${version.fullVersion}) is ${conf.get("hive.metastore.warehouse.dir")}")
 
+  private def newState(): SessionState = {
+    val hiveConf = new HiveConf(classOf[SessionState])
+    // HiveConf is a Hadoop Configuration, which has a field of classLoader and
+    // the initial value will be the current thread's context class loader
+    // (i.e. initClassLoader at here).
+    // We call initialConf.setClassLoader(initClassLoader) at here to make
+    // this action explicit.
+    hiveConf.setClassLoader(initClassLoader)
+
+    // 1: Take all from the hadoopConf to this hiveConf.
+    // This hadoopConf contains user settings in Hadoop's core-site.xml file
+    // and Hive's hive-site.xml file. Note, we load hive-site.xml file manually in
+    // SharedState and put settings in this hadoopConf instead of relying on HiveConf
+    // to load user settings. Otherwise, HiveConf's initialize method will override
+    // settings in the hadoopConf. This issue only shows up when spark.sql.hive.metastore.jars
+    // is not set to builtin. When spark.sql.hive.metastore.jars is builtin, the classpath
+    // has hive-site.xml. So, HiveConf will use that to override its default values.
+    // 2: we set all spark confs to this hiveConf.
+    // 3: we set all entries in config to this hiveConf.
+    (hadoopConf.iterator().asScala.map(kv => kv.getKey -> kv.getValue)
+      ++ sparkConf.getAll.toMap ++ extraConfig).foreach { case (k, v) =>
+      logDebug(
+        s"""
+           |Applying Hadoop/Hive/Spark and extra properties to Hive Conf:
+           |$k=${if (k.toLowerCase(Locale.ROOT).contains("password")) "xxx" else v}
+         """.stripMargin)
+      hiveConf.set(k, v)
+    }
+    val state = new SessionState(hiveConf)
+    if (clientLoader.cachedHive != null) {
+      Hive.set(clientLoader.cachedHive.asInstanceOf[Hive])
+    }
+    SessionState.start(state)
+    state.out = new PrintStream(outputBuffer, true, "UTF-8")
+    state.err = new PrintStream(outputBuffer, true, "UTF-8")
+    state
+  }
+
   /** Returns the configuration for the current session. */
   def conf: HiveConf = state.getConf
 
@@ -269,6 +240,9 @@ private[hive] class HiveClientImpl(
     }
   }
 
+  /** Return the associated Hive [[SessionState]] of this [[HiveClientImpl]] */
+  override def getState: SessionState = withHiveState(state)
+
   /**
    * Runs `f` with ThreadLocal session state and classloaders configured for this version of hive.
    */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org