You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/03/17 15:29:41 UTC

[spark] branch branch-3.0 updated: [SPARK-31170][SQL] Spark SQL Cli should respect hive-site.xml and spark.sql.warehouse.dir

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 321341a  [SPARK-31170][SQL] Spark SQL Cli should respect hive-site.xml and spark.sql.warehouse.dir
321341a is described below

commit 321341a4c3104380035350631c82a4b385f117e4
Author: Kent Yao <ya...@hotmail.com>
AuthorDate: Tue Mar 17 23:03:18 2020 +0800

    [SPARK-31170][SQL] Spark SQL Cli should respect hive-site.xml and spark.sql.warehouse.dir
    
    ### What changes were proposed in this pull request?
    
    In Spark CLI, we create a hive `CliSessionState` and it does not load the `hive-site.xml`. So the configurations in `hive-site.xml` will not take effects like other spark-hive integration apps.
    
    Also, the warehouse directory is not correctly picked. If the `default` database does not exist, the `CliSessionState` will create one during the first time it talks to the metastore. The `Location` of the default DB will be neither the value of `spark.sql.warehousr.dir` nor the user-specified value of `hive.metastore.warehourse.dir`, but the default value of `hive.metastore.warehourse.dir `which will always be `/user/hive/warehouse`.
    
    ### Why are the changes needed?
    
    fix bug for Spark SQL cli to pick right confs
    
    ### Does this PR introduce any user-facing change?
    
    yes, the non-exists default database will be created in the location specified by the users via `spark.sql.warehouse.dir` or `hive.metastore.warehouse.dir`, or the default value of `spark.sql.warehouse.dir` if none of them specified.
    
    ### How was this patch tested?
    
    add cli ut
    
    Closes #27933 from yaooqinn/SPARK-31170.
    
    Authored-by: Kent Yao <ya...@hotmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 5bc0d76591b46f0c1c9ec283ee8e1c5da76e67d6)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../apache/spark/sql/internal/SharedState.scala    | 80 +++++++++++-----------
 .../sql/hive/thriftserver/SparkSQLCLIDriver.scala  |  2 +
 .../spark/sql/hive/thriftserver/CliSuite.scala     | 12 ++++
 .../spark/sql/hive/HiveSharedStateSuite.scala      |  1 -
 .../spark/sql/hive/HiveSparkSubmitSuite.scala      |  2 +-
 5 files changed, 55 insertions(+), 42 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index 5347264..eb74e96 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -41,7 +41,6 @@ import org.apache.spark.sql.streaming.ui.{StreamingQueryStatusListener, Streamin
 import org.apache.spark.status.ElementTrackingStore
 import org.apache.spark.util.Utils
 
-
 /**
  * A class that holds all state shared across sessions in a given [[SQLContext]].
  *
@@ -55,45 +54,10 @@ private[sql] class SharedState(
 
   SharedState.setFsUrlStreamHandlerFactory(sparkContext.conf)
 
-  // Load hive-site.xml into hadoopConf and determine the warehouse path we want to use, based on
-  // the config from both hive and Spark SQL. Finally set the warehouse config value to sparkConf.
-  val warehousePath: String = {
-    val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml")
-    if (configFile != null) {
-      logInfo(s"loading hive config file: $configFile")
-      sparkContext.hadoopConfiguration.addResource(configFile)
-    }
-
-    // hive.metastore.warehouse.dir only stay in hadoopConf
-    sparkContext.conf.remove("hive.metastore.warehouse.dir")
-    // Set the Hive metastore warehouse path to the one we use
-    val hiveWarehouseDir = sparkContext.hadoopConfiguration.get("hive.metastore.warehouse.dir")
-    if (hiveWarehouseDir != null && !sparkContext.conf.contains(WAREHOUSE_PATH.key)) {
-      // If hive.metastore.warehouse.dir is set and spark.sql.warehouse.dir is not set,
-      // we will respect the value of hive.metastore.warehouse.dir.
-      sparkContext.conf.set(WAREHOUSE_PATH.key, hiveWarehouseDir)
-      logInfo(s"${WAREHOUSE_PATH.key} is not set, but hive.metastore.warehouse.dir " +
-        s"is set. Setting ${WAREHOUSE_PATH.key} to the value of " +
-        s"hive.metastore.warehouse.dir ('$hiveWarehouseDir').")
-      hiveWarehouseDir
-    } else {
-      // If spark.sql.warehouse.dir is set, we will override hive.metastore.warehouse.dir using
-      // the value of spark.sql.warehouse.dir.
-      // When neither spark.sql.warehouse.dir nor hive.metastore.warehouse.dir is set,
-      // we will set hive.metastore.warehouse.dir to the default value of spark.sql.warehouse.dir.
-      val sparkWarehouseDir = sparkContext.conf.get(WAREHOUSE_PATH)
-      logInfo(s"Setting hive.metastore.warehouse.dir ('$hiveWarehouseDir') to the value of " +
-        s"${WAREHOUSE_PATH.key} ('$sparkWarehouseDir').")
-      sparkContext.hadoopConfiguration.set("hive.metastore.warehouse.dir", sparkWarehouseDir)
-      sparkWarehouseDir
-    }
-  }
-  logInfo(s"Warehouse path is '$warehousePath'.")
-
-  // These 2 variables should be initiated after `warehousePath`, because in the first place we need
-  // to load hive-site.xml into hadoopConf and determine the warehouse path which will be set into
-  // both spark conf and hadoop conf avoiding be affected by any SparkSession level options
   private val (conf, hadoopConf) = {
+    // Load hive-site.xml into hadoopConf and determine the warehouse path which will be set into
+    // both spark conf and hadoop conf avoiding be affected by any SparkSession level options
+    SharedState.loadHiveConfFile(sparkContext.conf, sparkContext.hadoopConfiguration)
     val confClone = sparkContext.conf.clone()
     val hadoopConfClone = new Configuration(sparkContext.hadoopConfiguration)
     // If `SparkSession` is instantiated using an existing `SparkContext` instance and no existing
@@ -166,7 +130,7 @@ private[sql] class SharedState(
     val defaultDbDefinition = CatalogDatabase(
       SessionCatalog.DEFAULT_DATABASE,
       "default database",
-      CatalogUtils.stringToURI(warehousePath),
+      CatalogUtils.stringToURI(conf.get(WAREHOUSE_PATH)),
       Map())
     // Create default database if it doesn't exist
     if (!externalCatalog.databaseExists(SessionCatalog.DEFAULT_DATABASE)) {
@@ -258,4 +222,40 @@ object SharedState extends Logging {
         throw new IllegalArgumentException(s"Error while instantiating '$className':", e)
     }
   }
+
+  /**
+   * Load hive-site.xml into hadoopConf and determine the warehouse path we want to use, based on
+   * the config from both hive and Spark SQL. Finally set the warehouse config value to sparkConf.
+   */
+  def loadHiveConfFile(sparkConf: SparkConf, hadoopConf: Configuration): Unit = {
+    val hiveWarehouseKey = "hive.metastore.warehouse.dir"
+    val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml")
+    if (configFile != null) {
+      logInfo(s"loading hive config file: $configFile")
+      hadoopConf.addResource(configFile)
+    }
+    // hive.metastore.warehouse.dir only stay in hadoopConf
+    sparkConf.remove(hiveWarehouseKey)
+    // Set the Hive metastore warehouse path to the one we use
+    val hiveWarehouseDir = hadoopConf.get(hiveWarehouseKey)
+    val warehousePath = if (hiveWarehouseDir != null && !sparkConf.contains(WAREHOUSE_PATH.key)) {
+      // If hive.metastore.warehouse.dir is set and spark.sql.warehouse.dir is not set,
+      // we will respect the value of hive.metastore.warehouse.dir.
+      sparkConf.set(WAREHOUSE_PATH.key, hiveWarehouseDir)
+      logInfo(s"${WAREHOUSE_PATH.key} is not set, but $hiveWarehouseKey is set. Setting" +
+        s" ${WAREHOUSE_PATH.key} to the value of $hiveWarehouseKey ('$hiveWarehouseDir').")
+      hiveWarehouseDir
+    } else {
+      // If spark.sql.warehouse.dir is set, we will override hive.metastore.warehouse.dir using
+      // the value of spark.sql.warehouse.dir.
+      // When neither spark.sql.warehouse.dir nor hive.metastore.warehouse.dir is set
+      // we will set hive.metastore.warehouse.dir to the default value of spark.sql.warehouse.dir.
+      val sparkWarehouseDir = sparkConf.get(WAREHOUSE_PATH)
+      logInfo(s"Setting $hiveWarehouseKey ('$hiveWarehouseDir') to the value of " +
+        s"${WAREHOUSE_PATH.key} ('$sparkWarehouseDir').")
+      hadoopConf.set(hiveWarehouseKey, sparkWarehouseDir)
+      sparkWarehouseDir
+    }
+    logInfo(s"Warehouse path is '$warehousePath'.")
+  }
 }
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
index 6b76927..3ddf4ec 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
@@ -46,6 +46,7 @@ import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.hive.HiveUtils
 import org.apache.spark.sql.hive.client.HiveClientImpl
 import org.apache.spark.sql.hive.security.HiveDelegationTokenProvider
+import org.apache.spark.sql.internal.SharedState
 import org.apache.spark.util.ShutdownHookManager
 
 /**
@@ -87,6 +88,7 @@ private[hive] object SparkSQLCLIDriver extends Logging {
 
     val sparkConf = new SparkConf(loadDefaults = true)
     val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
+    SharedState.loadHiveConfFile(sparkConf, hadoopConf)
     val extraConfigs = HiveUtils.formatTimeVarsForHiveClient(hadoopConf)
 
     val cliConf = HiveClientImpl.newHiveConf(sparkConf, hadoopConf, extraConfigs)
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
index 43aafc3..ed77663 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
@@ -32,6 +32,7 @@ import org.scalatest.BeforeAndAfterAll
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.hive.test.HiveTestJars
+import org.apache.spark.sql.internal.StaticSQLConf
 import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer
 import org.apache.spark.util.{ThreadUtils, Utils}
 
@@ -159,6 +160,17 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
     }
   }
 
+  test("Pick spark.sql.warehouse.dir first for Spark Cli if set") {
+    val sparkWareHouseDir = Utils.createTempDir()
+    new File(warehousePath, "metastore_db").delete()
+    runCliWithin(
+      1.minute,
+      Seq("--conf", s"${StaticSQLConf.WAREHOUSE_PATH.key}=$sparkWareHouseDir"))(
+      "desc database default;" -> sparkWareHouseDir.getAbsolutePath
+    )
+    sparkWareHouseDir.delete()
+  }
+
   test("Simple commands") {
     val dataFilePath =
       Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala
index 6e2dcfc..78535b0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala
@@ -45,7 +45,6 @@ class HiveSharedStateSuite extends SparkFunSuite {
       GLOBAL_TEMP_DATABASE.key -> tmpDb)
 
     val state = new SharedState(sc, initialConfigs)
-    assert(state.warehousePath !== invalidPath, "warehouse path can't determine by session options")
     assert(sc.conf.get(WAREHOUSE_PATH.key) !== invalidPath,
       "warehouse conf in session options can't affect application wide spark conf")
     assert(sc.hadoopConfiguration.get(ConfVars.METASTOREWAREHOUSE.varname) !== invalidPath,
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index 31ff62e..8b97489 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -787,7 +787,7 @@ object SPARK_18360 {
       .enableHiveSupport().getOrCreate()
 
     val defaultDbLocation = spark.catalog.getDatabase("default").locationUri
-    assert(new Path(defaultDbLocation) == new Path(spark.sharedState.warehousePath))
+    assert(new Path(defaultDbLocation) == new Path(spark.conf.get(WAREHOUSE_PATH)))
 
     val hiveClient =
       spark.sharedState.externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org