You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/08/17 16:24:52 UTC
spark git commit: [SPARK-21428] Turn IsolatedClientLoader off while
using builtin Hive jars for reusing CliSessionState
Repository: spark
Updated Branches:
refs/heads/master d695a528b -> b83b502c4
[SPARK-21428] Turn IsolatedClientLoader off while using builtin Hive jars for reusing CliSessionState
## What changes were proposed in this pull request?
Set isolated to false while using builtin hive jars and `SessionState.get` returns a `CliSessionState` instance.
## How was this patch tested?
1 Unit Tests
2 Manually verified: `hive.exec.strachdir` was only created once because of reusing cliSessionState
```java
➜ spark git:(SPARK-21428) ✗ bin/spark-sql --conf spark.sql.hive.metastore.jars=builtin
log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
17/07/16 23:59:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/07/16 23:59:27 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
17/07/16 23:59:27 INFO ObjectStore: ObjectStore, initialize called
17/07/16 23:59:28 INFO Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored
17/07/16 23:59:28 INFO Persistence: Property datanucleus.cache.level2 unknown - will be ignored
17/07/16 23:59:29 INFO ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes="Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order"
17/07/16 23:59:30 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
17/07/16 23:59:30 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
17/07/16 23:59:31 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
17/07/16 23:59:31 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
17/07/16 23:59:31 INFO MetaStoreDirectSql: Using direct SQL, underlying DB is DERBY
17/07/16 23:59:31 INFO ObjectStore: Initialized ObjectStore
17/07/16 23:59:31 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
17/07/16 23:59:31 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
17/07/16 23:59:32 INFO HiveMetaStore: Added admin role in metastore
17/07/16 23:59:32 INFO HiveMetaStore: Added public role in metastore
17/07/16 23:59:32 INFO HiveMetaStore: No user is added in admin role, since config is empty
17/07/16 23:59:32 INFO HiveMetaStore: 0: get_all_databases
17/07/16 23:59:32 INFO audit: ugi=Kent ip=unknown-ip-addr cmd=get_all_databases
17/07/16 23:59:32 INFO HiveMetaStore: 0: get_functions: db=default pat=*
17/07/16 23:59:32 INFO audit: ugi=Kent ip=unknown-ip-addr cmd=get_functions: db=default pat=*
17/07/16 23:59:32 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MResourceUri" is tagged as "embedded-only" so does not have its own datastore table.
17/07/16 23:59:32 INFO SessionState: Created local directory: /var/folders/k2/04p4k4ws73l6711h_mz2_tq00000gn/T/beea7261-221a-4711-89e8-8b12a9d37370_resources
17/07/16 23:59:32 INFO SessionState: Created HDFS directory: /tmp/hive/Kent/beea7261-221a-4711-89e8-8b12a9d37370
17/07/16 23:59:32 INFO SessionState: Created local directory: /var/folders/k2/04p4k4ws73l6711h_mz2_tq00000gn/T/Kent/beea7261-221a-4711-89e8-8b12a9d37370
17/07/16 23:59:32 INFO SessionState: Created HDFS directory: /tmp/hive/Kent/beea7261-221a-4711-89e8-8b12a9d37370/_tmp_space.db
17/07/16 23:59:32 INFO SparkContext: Running Spark version 2.3.0-SNAPSHOT
17/07/16 23:59:32 INFO SparkContext: Submitted application: SparkSQL::10.0.0.8
17/07/16 23:59:32 INFO SecurityManager: Changing view acls to: Kent
17/07/16 23:59:32 INFO SecurityManager: Changing modify acls to: Kent
17/07/16 23:59:32 INFO SecurityManager: Changing view acls groups to:
17/07/16 23:59:32 INFO SecurityManager: Changing modify acls groups to:
17/07/16 23:59:32 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(Kent); groups with view permissions: Set(); users with modify permissions: Set(Kent); groups with modify permissions: Set()
17/07/16 23:59:33 INFO Utils: Successfully started service 'sparkDriver' on port 51889.
17/07/16 23:59:33 INFO SparkEnv: Registering MapOutputTracker
17/07/16 23:59:33 INFO SparkEnv: Registering BlockManagerMaster
17/07/16 23:59:33 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
17/07/16 23:59:33 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
17/07/16 23:59:33 INFO DiskBlockManager: Created local directory at /private/var/folders/k2/04p4k4ws73l6711h_mz2_tq00000gn/T/blockmgr-9cfae28a-01e9-4c73-a1f1-f76fa52fc7a5
17/07/16 23:59:33 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
17/07/16 23:59:33 INFO SparkEnv: Registering OutputCommitCoordinator
17/07/16 23:59:33 INFO Utils: Successfully started service 'SparkUI' on port 4040.
17/07/16 23:59:33 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://10.0.0.8:4040
17/07/16 23:59:33 INFO Executor: Starting executor ID driver on host localhost
17/07/16 23:59:33 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 51890.
17/07/16 23:59:33 INFO NettyBlockTransferService: Server created on 10.0.0.8:51890
17/07/16 23:59:33 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
17/07/16 23:59:33 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 10.0.0.8, 51890, None)
17/07/16 23:59:33 INFO BlockManagerMasterEndpoint: Registering block manager 10.0.0.8:51890 with 366.3 MB RAM, BlockManagerId(driver, 10.0.0.8, 51890, None)
17/07/16 23:59:33 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 10.0.0.8, 51890, None)
17/07/16 23:59:33 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 10.0.0.8, 51890, None)
17/07/16 23:59:34 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/Users/Kent/Documents/spark/spark-warehouse').
17/07/16 23:59:34 INFO SharedState: Warehouse path is 'file:/Users/Kent/Documents/spark/spark-warehouse'.
17/07/16 23:59:34 INFO HiveUtils: Initializing HiveMetastoreConnection version 1.2.1 using Spark classes.
17/07/16 23:59:34 INFO HiveClientImpl: Warehouse location for Hive client (version 1.2.2) is /user/hive/warehouse
17/07/16 23:59:34 INFO HiveMetaStore: 0: get_database: default
17/07/16 23:59:34 INFO audit: ugi=Kent ip=unknown-ip-addr cmd=get_database: default
17/07/16 23:59:34 INFO HiveClientImpl: Warehouse location for Hive client (version 1.2.2) is /user/hive/warehouse
17/07/16 23:59:34 INFO HiveMetaStore: 0: get_database: global_temp
17/07/16 23:59:34 INFO audit: ugi=Kent ip=unknown-ip-addr cmd=get_database: global_temp
17/07/16 23:59:34 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
17/07/16 23:59:34 INFO HiveClientImpl: Warehouse location for Hive client (version 1.2.2) is /user/hive/warehouse
17/07/16 23:59:34 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
spark-sql>
```
cc cloud-fan gatorsmile
Author: Kent Yao <ya...@hotmail.com>
Author: hzyaoqin <hz...@corp.netease.com>
Closes #18648 from yaooqinn/SPARK-21428.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b83b502c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b83b502c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b83b502c
Branch: refs/heads/master
Commit: b83b502c4189c571bda776511c6f7541c6067aae
Parents: d695a52
Author: Kent Yao <ya...@hotmail.com>
Authored: Fri Aug 18 00:24:45 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Fri Aug 18 00:24:45 2017 +0800
----------------------------------------------------------------------
.../apache/spark/deploy/SparkHadoopUtil.scala | 12 +-
.../org/apache/spark/deploy/SparkSubmit.scala | 19 +--
.../thriftserver/HiveCliSessionStateSuite.scala | 64 ++++++++
.../org/apache/spark/sql/hive/HiveUtils.scala | 19 ++-
.../spark/sql/hive/client/HiveClient.scala | 6 +
.../spark/sql/hive/client/HiveClientImpl.scala | 158 ++++++++-----------
6 files changed, 170 insertions(+), 108 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/b83b502c/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index e26f61d..2a92ef9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -17,7 +17,7 @@
package org.apache.spark.deploy
-import java.io.IOException
+import java.io.{File, IOException}
import java.security.PrivilegedExceptionAction
import java.text.DateFormat
import java.util.{Arrays, Comparator, Date, Locale}
@@ -155,8 +155,14 @@ class SparkHadoopUtil extends Logging {
def getSecretKeyFromUserCredentials(key: String): Array[Byte] = { null }
- def loginUserFromKeytab(principalName: String, keytabFilename: String) {
- UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename)
+ def loginUserFromKeytab(principalName: String, keytabFilename: String): Unit = {
+ if (!new File(keytabFilename).exists()) {
+ throw new SparkException(s"Keytab file: ${keytabFilename} does not exist")
+ } else {
+ logInfo("Attempting to login to Kerberos" +
+ s" using principal: ${principalName} and keytab: ${keytabFilename}")
+ UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename)
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/b83b502c/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 0197800..6d744a0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -559,18 +559,13 @@ object SparkSubmit extends CommandLineUtils {
if (clusterManager == YARN || clusterManager == LOCAL) {
if (args.principal != null) {
require(args.keytab != null, "Keytab must be specified when principal is specified")
- if (!new File(args.keytab).exists()) {
- throw new SparkException(s"Keytab file: ${args.keytab} does not exist")
- } else {
- // Add keytab and principal configurations in sysProps to make them available
- // for later use; e.g. in spark sql, the isolated class loader used to talk
- // to HiveMetastore will use these settings. They will be set as Java system
- // properties and then loaded by SparkConf
- sysProps.put("spark.yarn.keytab", args.keytab)
- sysProps.put("spark.yarn.principal", args.principal)
-
- UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
- }
+ SparkHadoopUtil.get.loginUserFromKeytab(args.principal, args.keytab)
+ // Add keytab and principal configurations in sysProps to make them available
+ // for later use; e.g. in spark sql, the isolated class loader used to talk
+ // to HiveMetastore will use these settings. They will be set as Java system
+ // properties and then loaded by SparkConf
+ sysProps.put("spark.yarn.keytab", args.keytab)
+ sysProps.put("spark.yarn.principal", args.principal)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/b83b502c/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveCliSessionStateSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveCliSessionStateSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveCliSessionStateSuite.scala
new file mode 100644
index 0000000..5f9ea4d
--- /dev/null
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveCliSessionStateSuite.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.thriftserver
+
+import org.apache.hadoop.hive.cli.CliSessionState
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.ql.session.SessionState
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.sql.hive.HiveUtils
+
+class HiveCliSessionStateSuite extends SparkFunSuite {
+
+ def withSessionClear(f: () => Unit): Unit = {
+ try f finally SessionState.detachSession()
+ }
+
+ test("CliSessionState will be reused") {
+ withSessionClear { () =>
+ val hiveConf = new HiveConf(classOf[SessionState])
+ HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false).foreach {
+ case (key, value) => hiveConf.set(key, value)
+ }
+ val sessionState: SessionState = new CliSessionState(hiveConf)
+ SessionState.start(sessionState)
+ val s1 = SessionState.get
+ val sparkConf = new SparkConf()
+ val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
+ val s2 = HiveUtils.newClientForMetadata(sparkConf, hadoopConf).getState
+ assert(s1 === s2)
+ assert(s2.isInstanceOf[CliSessionState])
+ }
+ }
+
+ test("SessionState will not be reused") {
+ withSessionClear { () =>
+ val sparkConf = new SparkConf()
+ val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
+ HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false).foreach {
+ case (key, value) => hadoopConf.set(key, value)
+ }
+ val hiveClient = HiveUtils.newClientForMetadata(sparkConf, hadoopConf)
+ val s1 = hiveClient.getState
+ val s2 = hiveClient.newSession().getState
+ assert(s1 !== s2)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/b83b502c/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
index 6439250..561c127 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
@@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.common.`type`.HiveDecimal
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
+import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable}
import org.apache.hadoop.util.VersionInfo
@@ -231,6 +232,22 @@ private[spark] object HiveUtils extends Logging {
}
/**
+ * Check current Thread's SessionState type
+ * @return true when SessionState.get returns an instance of CliSessionState,
+ * false when it gets non-CliSessionState instance or null
+ */
+ def isCliSessionState(): Boolean = {
+ val state = SessionState.get
+ var temp: Class[_] = if (state != null) state.getClass else null
+ var found = false
+ while (temp != null && !found) {
+ found = temp.getName == "org.apache.hadoop.hive.cli.CliSessionState"
+ temp = temp.getSuperclass
+ }
+ found
+ }
+
+ /**
* Create a [[HiveClient]] used for execution.
*
* Currently this must always be Hive 13 as this is the version of Hive that is packaged
@@ -313,7 +330,7 @@ private[spark] object HiveUtils extends Logging {
hadoopConf = hadoopConf,
execJars = jars.toSeq,
config = configurations,
- isolationOn = true,
+ isolationOn = !isCliSessionState(),
barrierPrefixes = hiveMetastoreBarrierPrefixes,
sharedPrefixes = hiveMetastoreSharedPrefixes)
} else if (hiveMetastoreJars == "maven") {
http://git-wip-us.apache.org/repos/asf/spark/blob/b83b502c/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index 16a80f9..8cff0ca 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -39,6 +39,12 @@ private[hive] trait HiveClient {
def getConf(key: String, defaultValue: String): String
/**
+ * Return the associated Hive SessionState of this [[HiveClientImpl]]
+ * @return [[Any]] not SessionState to avoid linkage error
+ */
+ def getState: Any
+
+ /**
* Runs a HiveQL command using Hive, returning the results as a list of strings. Each row will
* result in one string.
*/
http://git-wip-us.apache.org/repos/asf/spark/blob/b83b502c/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index bde9a81..5e5c0a2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -35,9 +35,9 @@ import org.apache.hadoop.hive.ql.metadata.{Hive, Partition => HivePartition, Tab
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC
import org.apache.hadoop.hive.ql.processors._
import org.apache.hadoop.hive.ql.session.SessionState
-import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.AnalysisException
@@ -105,100 +105,33 @@ private[hive] class HiveClientImpl(
// Create an internal session state for this HiveClientImpl.
val state: SessionState = {
val original = Thread.currentThread().getContextClassLoader
- // Switch to the initClassLoader.
- Thread.currentThread().setContextClassLoader(initClassLoader)
-
- // Set up kerberos credentials for UserGroupInformation.loginUser within
- // current class loader
- if (sparkConf.contains("spark.yarn.principal") && sparkConf.contains("spark.yarn.keytab")) {
- val principalName = sparkConf.get("spark.yarn.principal")
- val keytabFileName = sparkConf.get("spark.yarn.keytab")
- if (!new File(keytabFileName).exists()) {
- throw new SparkException(s"Keytab file: ${keytabFileName}" +
- " specified in spark.yarn.keytab does not exist")
- } else {
- logInfo("Attempting to login to Kerberos" +
- s" using principal: ${principalName} and keytab: ${keytabFileName}")
- UserGroupInformation.loginUserFromKeytab(principalName, keytabFileName)
- }
- }
-
- def isCliSessionState(state: SessionState): Boolean = {
- var temp: Class[_] = if (state != null) state.getClass else null
- var found = false
- while (temp != null && !found) {
- found = temp.getName == "org.apache.hadoop.hive.cli.CliSessionState"
- temp = temp.getSuperclass
+ if (clientLoader.isolationOn) {
+ // Switch to the initClassLoader.
+ Thread.currentThread().setContextClassLoader(initClassLoader)
+ // Set up kerberos credentials for UserGroupInformation.loginUser within current class loader
+ if (sparkConf.contains("spark.yarn.principal") && sparkConf.contains("spark.yarn.keytab")) {
+ val principal = sparkConf.get("spark.yarn.principal")
+ val keytab = sparkConf.get("spark.yarn.keytab")
+ SparkHadoopUtil.get.loginUserFromKeytab(principal, keytab)
}
- found
- }
-
- val ret = try {
- // originState will be created if not exists, will never be null
- val originalState = SessionState.get()
- if (isCliSessionState(originalState)) {
- // In `SparkSQLCLIDriver`, we have already started a `CliSessionState`,
- // which contains information like configurations from command line. Later
- // we call `SparkSQLEnv.init()` there, which would run into this part again.
- // so we should keep `conf` and reuse the existing instance of `CliSessionState`.
- originalState
- } else {
- val hiveConf = new HiveConf(classOf[SessionState])
- // 1: we set all confs in the hadoopConf to this hiveConf.
- // This hadoopConf contains user settings in Hadoop's core-site.xml file
- // and Hive's hive-site.xml file. Note, we load hive-site.xml file manually in
- // SharedState and put settings in this hadoopConf instead of relying on HiveConf
- // to load user settings. Otherwise, HiveConf's initialize method will override
- // settings in the hadoopConf. This issue only shows up when spark.sql.hive.metastore.jars
- // is not set to builtin. When spark.sql.hive.metastore.jars is builtin, the classpath
- // has hive-site.xml. So, HiveConf will use that to override its default values.
- hadoopConf.iterator().asScala.foreach { entry =>
- val key = entry.getKey
- val value = entry.getValue
- if (key.toLowerCase(Locale.ROOT).contains("password")) {
- logDebug(s"Applying Hadoop and Hive config to Hive Conf: $key=xxx")
- } else {
- logDebug(s"Applying Hadoop and Hive config to Hive Conf: $key=$value")
- }
- hiveConf.set(key, value)
- }
- // HiveConf is a Hadoop Configuration, which has a field of classLoader and
- // the initial value will be the current thread's context class loader
- // (i.e. initClassLoader at here).
- // We call initialConf.setClassLoader(initClassLoader) at here to make
- // this action explicit.
- hiveConf.setClassLoader(initClassLoader)
- // 2: we set all spark confs to this hiveConf.
- sparkConf.getAll.foreach { case (k, v) =>
- if (k.toLowerCase(Locale.ROOT).contains("password")) {
- logDebug(s"Applying Spark config to Hive Conf: $k=xxx")
- } else {
- logDebug(s"Applying Spark config to Hive Conf: $k=$v")
- }
- hiveConf.set(k, v)
- }
- // 3: we set all entries in config to this hiveConf.
- extraConfig.foreach { case (k, v) =>
- if (k.toLowerCase(Locale.ROOT).contains("password")) {
- logDebug(s"Applying extra config to HiveConf: $k=xxx")
- } else {
- logDebug(s"Applying extra config to HiveConf: $k=$v")
- }
- hiveConf.set(k, v)
- }
- val state = new SessionState(hiveConf)
- if (clientLoader.cachedHive != null) {
- Hive.set(clientLoader.cachedHive.asInstanceOf[Hive])
- }
- SessionState.start(state)
- state.out = new PrintStream(outputBuffer, true, "UTF-8")
- state.err = new PrintStream(outputBuffer, true, "UTF-8")
- state
+ try {
+ newState()
+ } finally {
+ Thread.currentThread().setContextClassLoader(original)
}
- } finally {
- Thread.currentThread().setContextClassLoader(original)
+ } else {
+ // Isolation off means we detect a CliSessionState instance in current thread.
+ // 1: Inside the spark project, we have already started a CliSessionState in
+ // `SparkSQLCLIDriver`, which contains configurations from command lines. Later, we call
+ // `SparkSQLEnv.init()` there, which would new a hive client again. so we should keep those
+ // configurations and reuse the existing instance of `CliSessionState`. In this case,
+ // SessionState.get will always return a CliSessionState.
+ // 2: In another case, a user app may start a CliSessionState outside spark project with built
+ // in hive jars, which will turn off isolation, if SessionSate.detachSession is
+ // called to remove the current state after that, hive client created later will initialize
+ // its own state by newState()
+ Option(SessionState.get).getOrElse(newState())
}
- ret
}
// Log the default warehouse location.
@@ -206,6 +139,44 @@ private[hive] class HiveClientImpl(
s"Warehouse location for Hive client " +
s"(version ${version.fullVersion}) is ${conf.get("hive.metastore.warehouse.dir")}")
+ private def newState(): SessionState = {
+ val hiveConf = new HiveConf(classOf[SessionState])
+ // HiveConf is a Hadoop Configuration, which has a field of classLoader and
+ // the initial value will be the current thread's context class loader
+ // (i.e. initClassLoader at here).
+ // We call initialConf.setClassLoader(initClassLoader) at here to make
+ // this action explicit.
+ hiveConf.setClassLoader(initClassLoader)
+
+ // 1: Take all from the hadoopConf to this hiveConf.
+ // This hadoopConf contains user settings in Hadoop's core-site.xml file
+ // and Hive's hive-site.xml file. Note, we load hive-site.xml file manually in
+ // SharedState and put settings in this hadoopConf instead of relying on HiveConf
+ // to load user settings. Otherwise, HiveConf's initialize method will override
+ // settings in the hadoopConf. This issue only shows up when spark.sql.hive.metastore.jars
+ // is not set to builtin. When spark.sql.hive.metastore.jars is builtin, the classpath
+ // has hive-site.xml. So, HiveConf will use that to override its default values.
+ // 2: we set all spark confs to this hiveConf.
+ // 3: we set all entries in config to this hiveConf.
+ (hadoopConf.iterator().asScala.map(kv => kv.getKey -> kv.getValue)
+ ++ sparkConf.getAll.toMap ++ extraConfig).foreach { case (k, v) =>
+ logDebug(
+ s"""
+ |Applying Hadoop/Hive/Spark and extra properties to Hive Conf:
+ |$k=${if (k.toLowerCase(Locale.ROOT).contains("password")) "xxx" else v}
+ """.stripMargin)
+ hiveConf.set(k, v)
+ }
+ val state = new SessionState(hiveConf)
+ if (clientLoader.cachedHive != null) {
+ Hive.set(clientLoader.cachedHive.asInstanceOf[Hive])
+ }
+ SessionState.start(state)
+ state.out = new PrintStream(outputBuffer, true, "UTF-8")
+ state.err = new PrintStream(outputBuffer, true, "UTF-8")
+ state
+ }
+
/** Returns the configuration for the current session. */
def conf: HiveConf = state.getConf
@@ -269,6 +240,9 @@ private[hive] class HiveClientImpl(
}
}
+ /** Return the associated Hive [[SessionState]] of this [[HiveClientImpl]] */
+ override def getState: SessionState = withHiveState(state)
+
/**
* Runs `f` with ThreadLocal session state and classloaders configured for this version of hive.
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org