You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/09/19 11:35:46 UTC
spark git commit: [SPARK-21428][SQL][FOLLOWUP] CliSessionState should
point to the actual metastore not a dummy one
Repository: spark
Updated Branches:
refs/heads/master 1bc17a6b8 -> 581200af7
[SPARK-21428][SQL][FOLLOWUP] CliSessionState should point to the actual metastore not a dummy one
## What changes were proposed in this pull request?
While running bin/spark-sql, we will reuse cliSessionState, but the Hive configurations generated here just points to a dummy meta store which actually should be the real one. And the warehouse is determined later in SharedState, HiveClient should respect this config changing in this case too.
## How was this patch tested?
existing ut
cc cloud-fan jiangxb1987
Author: Kent Yao <ya...@hotmail.com>
Closes #19068 from yaooqinn/SPARK-21428-FOLLOWUP.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/581200af
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/581200af
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/581200af
Branch: refs/heads/master
Commit: 581200af717bcefd11c9930ac063fe53c6fd2fde
Parents: 1bc17a6
Author: Kent Yao <ya...@hotmail.com>
Authored: Tue Sep 19 19:35:36 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Tue Sep 19 19:35:36 2017 +0800
----------------------------------------------------------------------
.../sql/hive/thriftserver/SparkSQLCLIDriver.scala | 14 +++++++++++---
.../scala/org/apache/spark/sql/hive/HiveUtils.scala | 6 +++---
.../spark/sql/hive/client/HiveClientImpl.scala | 15 +++++++++++++--
.../spark/sql/hive/client/HiveVersionSuite.scala | 2 +-
.../apache/spark/sql/hive/client/VersionsSuite.scala | 2 +-
5 files changed, 29 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/581200af/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
index 761e832..832a15d 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
@@ -37,6 +37,8 @@ import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.log4j.{Level, Logger}
import org.apache.thrift.transport.TSocket
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.hive.HiveUtils
@@ -81,11 +83,17 @@ private[hive] object SparkSQLCLIDriver extends Logging {
System.exit(1)
}
+ val sparkConf = new SparkConf(loadDefaults = true)
+ val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
+ val extraConfigs = HiveUtils.formatTimeVarsForHiveClient(hadoopConf)
+
val cliConf = new HiveConf(classOf[SessionState])
- // Override the location of the metastore since this is only used for local execution.
- HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false).foreach {
- case (key, value) => cliConf.set(key, value)
+ (hadoopConf.iterator().asScala.map(kv => kv.getKey -> kv.getValue)
+ ++ sparkConf.getAll.toMap ++ extraConfigs).foreach {
+ case (k, v) =>
+ cliConf.set(k, v)
}
+
val sessionState = new CliSessionState(cliConf)
sessionState.in = System.in
http://git-wip-us.apache.org/repos/asf/spark/blob/581200af/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
index 561c127..80b9a3d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
@@ -176,9 +176,9 @@ private[spark] object HiveUtils extends Logging {
}
/**
- * Configurations needed to create a [[HiveClient]].
+ * Change time configurations needed to create a [[HiveClient]] into unified [[Long]] format.
*/
- private[hive] def hiveClientConfigurations(hadoopConf: Configuration): Map[String, String] = {
+ private[hive] def formatTimeVarsForHiveClient(hadoopConf: Configuration): Map[String, String] = {
// Hive 0.14.0 introduces timeout operations in HiveConf, and changes default values of a bunch
// of time `ConfVar`s by adding time suffixes (`s`, `ms`, and `d` etc.). This breaks backwards-
// compatibility when users are trying to connecting to a Hive metastore of lower version,
@@ -280,7 +280,7 @@ private[spark] object HiveUtils extends Logging {
protected[hive] def newClientForMetadata(
conf: SparkConf,
hadoopConf: Configuration): HiveClient = {
- val configurations = hiveClientConfigurations(hadoopConf)
+ val configurations = formatTimeVarsForHiveClient(hadoopConf)
newClientForMetadata(conf, hadoopConf, configurations)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/581200af/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 426db6a..c4e48c9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Order}
import org.apache.hadoop.hive.metastore.api.{SerDeInfo, StorageDescriptor}
@@ -132,14 +133,24 @@ private[hive] class HiveClientImpl(
// in hive jars, which will turn off isolation, if SessionSate.detachSession is
// called to remove the current state after that, hive client created later will initialize
// its own state by newState()
- Option(SessionState.get).getOrElse(newState())
+ val ret = SessionState.get
+ if (ret != null) {
+ // hive.metastore.warehouse.dir is determined in SharedState after the CliSessionState
+ // instance constructed, we need to follow that change here.
+ Option(hadoopConf.get(ConfVars.METASTOREWAREHOUSE.varname)).foreach { dir =>
+ ret.getConf.setVar(ConfVars.METASTOREWAREHOUSE, dir)
+ }
+ ret
+ } else {
+ newState()
+ }
}
}
// Log the default warehouse location.
logInfo(
s"Warehouse location for Hive client " +
- s"(version ${version.fullVersion}) is ${conf.get("hive.metastore.warehouse.dir")}")
+ s"(version ${version.fullVersion}) is ${conf.getVar(ConfVars.METASTOREWAREHOUSE)}")
private def newState(): SessionState = {
val hiveConf = new HiveConf(classOf[SessionState])
http://git-wip-us.apache.org/repos/asf/spark/blob/581200af/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala
index ed475a0..951ebfa 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala
@@ -36,7 +36,7 @@ private[client] abstract class HiveVersionSuite(version: String) extends SparkFu
hadoopConf.set("hive.metastore.schema.verification", "false")
}
HiveClientBuilder
- .buildClient(version, hadoopConf, HiveUtils.hiveClientConfigurations(hadoopConf))
+ .buildClient(version, hadoopConf, HiveUtils.formatTimeVarsForHiveClient(hadoopConf))
}
override def suiteName: String = s"${super.suiteName}($version)"
http://git-wip-us.apache.org/repos/asf/spark/blob/581200af/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 1d9c8da..edb9a9f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -127,7 +127,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
hadoopConf.set("hive.metastore.schema.verification", "false")
}
- client = buildClient(version, hadoopConf, HiveUtils.hiveClientConfigurations(hadoopConf))
+ client = buildClient(version, hadoopConf, HiveUtils.formatTimeVarsForHiveClient(hadoopConf))
if (versionSpark != null) versionSpark.reset()
versionSpark = TestHiveVersion(client)
assert(versionSpark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org