You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/09/19 11:35:46 UTC

spark git commit: [SPARK-21428][SQL][FOLLOWUP] CliSessionState should point to the actual metastore not a dummy one

Repository: spark
Updated Branches:
  refs/heads/master 1bc17a6b8 -> 581200af7


[SPARK-21428][SQL][FOLLOWUP] CliSessionState should point to the actual metastore not a dummy one

## What changes were proposed in this pull request?

While running bin/spark-sql, we will reuse cliSessionState, but the Hive configurations generated here just points to a dummy meta store which actually should be the real one. And the warehouse is determined later in SharedState, HiveClient should respect this config changing in this case too.

## How was this patch tested?
existing ut

cc cloud-fan jiangxb1987

Author: Kent Yao <ya...@hotmail.com>

Closes #19068 from yaooqinn/SPARK-21428-FOLLOWUP.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/581200af
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/581200af
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/581200af

Branch: refs/heads/master
Commit: 581200af717bcefd11c9930ac063fe53c6fd2fde
Parents: 1bc17a6
Author: Kent Yao <ya...@hotmail.com>
Authored: Tue Sep 19 19:35:36 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Tue Sep 19 19:35:36 2017 +0800

----------------------------------------------------------------------
 .../sql/hive/thriftserver/SparkSQLCLIDriver.scala    | 14 +++++++++++---
 .../scala/org/apache/spark/sql/hive/HiveUtils.scala  |  6 +++---
 .../spark/sql/hive/client/HiveClientImpl.scala       | 15 +++++++++++++--
 .../spark/sql/hive/client/HiveVersionSuite.scala     |  2 +-
 .../apache/spark/sql/hive/client/VersionsSuite.scala |  2 +-
 5 files changed, 29 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/581200af/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
index 761e832..832a15d 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
@@ -37,6 +37,8 @@ import org.apache.hadoop.hive.ql.session.SessionState
 import org.apache.log4j.{Level, Logger}
 import org.apache.thrift.transport.TSocket
 
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.hive.HiveUtils
@@ -81,11 +83,17 @@ private[hive] object SparkSQLCLIDriver extends Logging {
       System.exit(1)
     }
 
+    val sparkConf = new SparkConf(loadDefaults = true)
+    val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
+    val extraConfigs = HiveUtils.formatTimeVarsForHiveClient(hadoopConf)
+
     val cliConf = new HiveConf(classOf[SessionState])
-    // Override the location of the metastore since this is only used for local execution.
-    HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false).foreach {
-      case (key, value) => cliConf.set(key, value)
+    (hadoopConf.iterator().asScala.map(kv => kv.getKey -> kv.getValue)
+      ++ sparkConf.getAll.toMap ++ extraConfigs).foreach {
+      case (k, v) =>
+        cliConf.set(k, v)
     }
+
     val sessionState = new CliSessionState(cliConf)
 
     sessionState.in = System.in

http://git-wip-us.apache.org/repos/asf/spark/blob/581200af/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
index 561c127..80b9a3d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
@@ -176,9 +176,9 @@ private[spark] object HiveUtils extends Logging {
   }
 
   /**
-   * Configurations needed to create a [[HiveClient]].
+   * Change time configurations needed to create a [[HiveClient]] into unified [[Long]] format.
    */
-  private[hive] def hiveClientConfigurations(hadoopConf: Configuration): Map[String, String] = {
+  private[hive] def formatTimeVarsForHiveClient(hadoopConf: Configuration): Map[String, String] = {
     // Hive 0.14.0 introduces timeout operations in HiveConf, and changes default values of a bunch
     // of time `ConfVar`s by adding time suffixes (`s`, `ms`, and `d` etc.).  This breaks backwards-
     // compatibility when users are trying to connecting to a Hive metastore of lower version,
@@ -280,7 +280,7 @@ private[spark] object HiveUtils extends Logging {
   protected[hive] def newClientForMetadata(
       conf: SparkConf,
       hadoopConf: Configuration): HiveClient = {
-    val configurations = hiveClientConfigurations(hadoopConf)
+    val configurations = formatTimeVarsForHiveClient(hadoopConf)
     newClientForMetadata(conf, hadoopConf, configurations)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/581200af/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 426db6a..c4e48c9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.hive.common.StatsSetupConst
 import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
 import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Order}
 import org.apache.hadoop.hive.metastore.api.{SerDeInfo, StorageDescriptor}
@@ -132,14 +133,24 @@ private[hive] class HiveClientImpl(
       // in hive jars, which will turn off isolation, if SessionSate.detachSession is
       // called to remove the current state after that, hive client created later will initialize
       // its own state by newState()
-      Option(SessionState.get).getOrElse(newState())
+      val ret = SessionState.get
+      if (ret != null) {
+        // hive.metastore.warehouse.dir is determined in SharedState after the CliSessionState
+        // instance constructed, we need to follow that change here.
+        Option(hadoopConf.get(ConfVars.METASTOREWAREHOUSE.varname)).foreach { dir =>
+          ret.getConf.setVar(ConfVars.METASTOREWAREHOUSE, dir)
+        }
+        ret
+      } else {
+        newState()
+      }
     }
   }
 
   // Log the default warehouse location.
   logInfo(
     s"Warehouse location for Hive client " +
-      s"(version ${version.fullVersion}) is ${conf.get("hive.metastore.warehouse.dir")}")
+      s"(version ${version.fullVersion}) is ${conf.getVar(ConfVars.METASTOREWAREHOUSE)}")
 
   private def newState(): SessionState = {
     val hiveConf = new HiveConf(classOf[SessionState])

http://git-wip-us.apache.org/repos/asf/spark/blob/581200af/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala
index ed475a0..951ebfa 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala
@@ -36,7 +36,7 @@ private[client] abstract class HiveVersionSuite(version: String) extends SparkFu
       hadoopConf.set("hive.metastore.schema.verification", "false")
     }
     HiveClientBuilder
-      .buildClient(version, hadoopConf, HiveUtils.hiveClientConfigurations(hadoopConf))
+      .buildClient(version, hadoopConf, HiveUtils.formatTimeVarsForHiveClient(hadoopConf))
   }
 
   override def suiteName: String = s"${super.suiteName}($version)"

http://git-wip-us.apache.org/repos/asf/spark/blob/581200af/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 1d9c8da..edb9a9f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -127,7 +127,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
         hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
         hadoopConf.set("hive.metastore.schema.verification", "false")
       }
-      client = buildClient(version, hadoopConf, HiveUtils.hiveClientConfigurations(hadoopConf))
+      client = buildClient(version, hadoopConf, HiveUtils.formatTimeVarsForHiveClient(hadoopConf))
       if (versionSpark != null) versionSpark.reset()
       versionSpark = TestHiveVersion(client)
       assert(versionSpark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org