You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/03/17 15:29:41 UTC
[spark] branch branch-3.0 updated: [SPARK-31170][SQL] Spark SQL Cli
should respect hive-site.xml and spark.sql.warehouse.dir
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 321341a [SPARK-31170][SQL] Spark SQL Cli should respect hive-site.xml and spark.sql.warehouse.dir
321341a is described below
commit 321341a4c3104380035350631c82a4b385f117e4
Author: Kent Yao <ya...@hotmail.com>
AuthorDate: Tue Mar 17 23:03:18 2020 +0800
[SPARK-31170][SQL] Spark SQL Cli should respect hive-site.xml and spark.sql.warehouse.dir
### What changes were proposed in this pull request?
In Spark CLI, we create a hive `CliSessionState` and it does not load the `hive-site.xml`. So the configurations in `hive-site.xml` will not take effects like other spark-hive integration apps.
Also, the warehouse directory is not correctly picked. If the `default` database does not exist, the `CliSessionState` will create one during the first time it talks to the metastore. The `Location` of the default DB will be neither the value of `spark.sql.warehousr.dir` nor the user-specified value of `hive.metastore.warehourse.dir`, but the default value of `hive.metastore.warehourse.dir `which will always be `/user/hive/warehouse`.
### Why are the changes needed?
fix bug for Spark SQL cli to pick right confs
### Does this PR introduce any user-facing change?
yes, the non-exists default database will be created in the location specified by the users via `spark.sql.warehouse.dir` or `hive.metastore.warehouse.dir`, or the default value of `spark.sql.warehouse.dir` if none of them specified.
### How was this patch tested?
add cli ut
Closes #27933 from yaooqinn/SPARK-31170.
Authored-by: Kent Yao <ya...@hotmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit 5bc0d76591b46f0c1c9ec283ee8e1c5da76e67d6)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../apache/spark/sql/internal/SharedState.scala | 80 +++++++++++-----------
.../sql/hive/thriftserver/SparkSQLCLIDriver.scala | 2 +
.../spark/sql/hive/thriftserver/CliSuite.scala | 12 ++++
.../spark/sql/hive/HiveSharedStateSuite.scala | 1 -
.../spark/sql/hive/HiveSparkSubmitSuite.scala | 2 +-
5 files changed, 55 insertions(+), 42 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index 5347264..eb74e96 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -41,7 +41,6 @@ import org.apache.spark.sql.streaming.ui.{StreamingQueryStatusListener, Streamin
import org.apache.spark.status.ElementTrackingStore
import org.apache.spark.util.Utils
-
/**
* A class that holds all state shared across sessions in a given [[SQLContext]].
*
@@ -55,45 +54,10 @@ private[sql] class SharedState(
SharedState.setFsUrlStreamHandlerFactory(sparkContext.conf)
- // Load hive-site.xml into hadoopConf and determine the warehouse path we want to use, based on
- // the config from both hive and Spark SQL. Finally set the warehouse config value to sparkConf.
- val warehousePath: String = {
- val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml")
- if (configFile != null) {
- logInfo(s"loading hive config file: $configFile")
- sparkContext.hadoopConfiguration.addResource(configFile)
- }
-
- // hive.metastore.warehouse.dir only stay in hadoopConf
- sparkContext.conf.remove("hive.metastore.warehouse.dir")
- // Set the Hive metastore warehouse path to the one we use
- val hiveWarehouseDir = sparkContext.hadoopConfiguration.get("hive.metastore.warehouse.dir")
- if (hiveWarehouseDir != null && !sparkContext.conf.contains(WAREHOUSE_PATH.key)) {
- // If hive.metastore.warehouse.dir is set and spark.sql.warehouse.dir is not set,
- // we will respect the value of hive.metastore.warehouse.dir.
- sparkContext.conf.set(WAREHOUSE_PATH.key, hiveWarehouseDir)
- logInfo(s"${WAREHOUSE_PATH.key} is not set, but hive.metastore.warehouse.dir " +
- s"is set. Setting ${WAREHOUSE_PATH.key} to the value of " +
- s"hive.metastore.warehouse.dir ('$hiveWarehouseDir').")
- hiveWarehouseDir
- } else {
- // If spark.sql.warehouse.dir is set, we will override hive.metastore.warehouse.dir using
- // the value of spark.sql.warehouse.dir.
- // When neither spark.sql.warehouse.dir nor hive.metastore.warehouse.dir is set,
- // we will set hive.metastore.warehouse.dir to the default value of spark.sql.warehouse.dir.
- val sparkWarehouseDir = sparkContext.conf.get(WAREHOUSE_PATH)
- logInfo(s"Setting hive.metastore.warehouse.dir ('$hiveWarehouseDir') to the value of " +
- s"${WAREHOUSE_PATH.key} ('$sparkWarehouseDir').")
- sparkContext.hadoopConfiguration.set("hive.metastore.warehouse.dir", sparkWarehouseDir)
- sparkWarehouseDir
- }
- }
- logInfo(s"Warehouse path is '$warehousePath'.")
-
- // These 2 variables should be initiated after `warehousePath`, because in the first place we need
- // to load hive-site.xml into hadoopConf and determine the warehouse path which will be set into
- // both spark conf and hadoop conf avoiding be affected by any SparkSession level options
private val (conf, hadoopConf) = {
+ // Load hive-site.xml into hadoopConf and determine the warehouse path which will be set into
+ // both spark conf and hadoop conf avoiding be affected by any SparkSession level options
+ SharedState.loadHiveConfFile(sparkContext.conf, sparkContext.hadoopConfiguration)
val confClone = sparkContext.conf.clone()
val hadoopConfClone = new Configuration(sparkContext.hadoopConfiguration)
// If `SparkSession` is instantiated using an existing `SparkContext` instance and no existing
@@ -166,7 +130,7 @@ private[sql] class SharedState(
val defaultDbDefinition = CatalogDatabase(
SessionCatalog.DEFAULT_DATABASE,
"default database",
- CatalogUtils.stringToURI(warehousePath),
+ CatalogUtils.stringToURI(conf.get(WAREHOUSE_PATH)),
Map())
// Create default database if it doesn't exist
if (!externalCatalog.databaseExists(SessionCatalog.DEFAULT_DATABASE)) {
@@ -258,4 +222,40 @@ object SharedState extends Logging {
throw new IllegalArgumentException(s"Error while instantiating '$className':", e)
}
}
+
+ /**
+ * Load hive-site.xml into hadoopConf and determine the warehouse path we want to use, based on
+ * the config from both hive and Spark SQL. Finally set the warehouse config value to sparkConf.
+ */
+ def loadHiveConfFile(sparkConf: SparkConf, hadoopConf: Configuration): Unit = {
+ val hiveWarehouseKey = "hive.metastore.warehouse.dir"
+ val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml")
+ if (configFile != null) {
+ logInfo(s"loading hive config file: $configFile")
+ hadoopConf.addResource(configFile)
+ }
+ // hive.metastore.warehouse.dir only stay in hadoopConf
+ sparkConf.remove(hiveWarehouseKey)
+ // Set the Hive metastore warehouse path to the one we use
+ val hiveWarehouseDir = hadoopConf.get(hiveWarehouseKey)
+ val warehousePath = if (hiveWarehouseDir != null && !sparkConf.contains(WAREHOUSE_PATH.key)) {
+ // If hive.metastore.warehouse.dir is set and spark.sql.warehouse.dir is not set,
+ // we will respect the value of hive.metastore.warehouse.dir.
+ sparkConf.set(WAREHOUSE_PATH.key, hiveWarehouseDir)
+ logInfo(s"${WAREHOUSE_PATH.key} is not set, but $hiveWarehouseKey is set. Setting" +
+ s" ${WAREHOUSE_PATH.key} to the value of $hiveWarehouseKey ('$hiveWarehouseDir').")
+ hiveWarehouseDir
+ } else {
+ // If spark.sql.warehouse.dir is set, we will override hive.metastore.warehouse.dir using
+ // the value of spark.sql.warehouse.dir.
+ // When neither spark.sql.warehouse.dir nor hive.metastore.warehouse.dir is set
+ // we will set hive.metastore.warehouse.dir to the default value of spark.sql.warehouse.dir.
+ val sparkWarehouseDir = sparkConf.get(WAREHOUSE_PATH)
+ logInfo(s"Setting $hiveWarehouseKey ('$hiveWarehouseDir') to the value of " +
+ s"${WAREHOUSE_PATH.key} ('$sparkWarehouseDir').")
+ hadoopConf.set(hiveWarehouseKey, sparkWarehouseDir)
+ sparkWarehouseDir
+ }
+ logInfo(s"Warehouse path is '$warehousePath'.")
+ }
}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
index 6b76927..3ddf4ec 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
@@ -46,6 +46,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.client.HiveClientImpl
import org.apache.spark.sql.hive.security.HiveDelegationTokenProvider
+import org.apache.spark.sql.internal.SharedState
import org.apache.spark.util.ShutdownHookManager
/**
@@ -87,6 +88,7 @@ private[hive] object SparkSQLCLIDriver extends Logging {
val sparkConf = new SparkConf(loadDefaults = true)
val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
+ SharedState.loadHiveConfFile(sparkConf, hadoopConf)
val extraConfigs = HiveUtils.formatTimeVarsForHiveClient(hadoopConf)
val cliConf = HiveClientImpl.newHiveConf(sparkConf, hadoopConf, extraConfigs)
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
index 43aafc3..ed77663 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
@@ -32,6 +32,7 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.spark.SparkFunSuite
import org.apache.spark.internal.Logging
import org.apache.spark.sql.hive.test.HiveTestJars
+import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer
import org.apache.spark.util.{ThreadUtils, Utils}
@@ -159,6 +160,17 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
}
}
+ test("Pick spark.sql.warehouse.dir first for Spark Cli if set") {
+ val sparkWareHouseDir = Utils.createTempDir()
+ new File(warehousePath, "metastore_db").delete()
+ runCliWithin(
+ 1.minute,
+ Seq("--conf", s"${StaticSQLConf.WAREHOUSE_PATH.key}=$sparkWareHouseDir"))(
+ "desc database default;" -> sparkWareHouseDir.getAbsolutePath
+ )
+ sparkWareHouseDir.delete()
+ }
+
test("Simple commands") {
val dataFilePath =
Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala
index 6e2dcfc..78535b0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala
@@ -45,7 +45,6 @@ class HiveSharedStateSuite extends SparkFunSuite {
GLOBAL_TEMP_DATABASE.key -> tmpDb)
val state = new SharedState(sc, initialConfigs)
- assert(state.warehousePath !== invalidPath, "warehouse path can't determine by session options")
assert(sc.conf.get(WAREHOUSE_PATH.key) !== invalidPath,
"warehouse conf in session options can't affect application wide spark conf")
assert(sc.hadoopConfiguration.get(ConfVars.METASTOREWAREHOUSE.varname) !== invalidPath,
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index 31ff62e..8b97489 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -787,7 +787,7 @@ object SPARK_18360 {
.enableHiveSupport().getOrCreate()
val defaultDbLocation = spark.catalog.getDatabase("default").locationUri
- assert(new Path(defaultDbLocation) == new Path(spark.sharedState.warehousePath))
+ assert(new Path(defaultDbLocation) == new Path(spark.conf.get(WAREHOUSE_PATH)))
val hiveClient =
spark.sharedState.externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org