You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/03/12 10:14:41 UTC
[spark] branch master updated: [SPARK-31066][SQL][TEST-HIVE1.2]
Disable useless and uncleaned hive SessionState initialization parts
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 18f2730 [SPARK-31066][SQL][TEST-HIVE1.2] Disable useless and uncleaned hive SessionState initialization parts
18f2730 is described below
commit 18f27308749e695f9942768d7ba85cef9fceb174
Author: Kent Yao <ya...@hotmail.com>
AuthorDate: Thu Mar 12 18:13:52 2020 +0800
[SPARK-31066][SQL][TEST-HIVE1.2] Disable useless and uncleaned hive SessionState initialization parts
### What changes were proposed in this pull request?
As a common usage and according to the spark doc, users may often just copy their `hive-site.xml` to Spark directly from hive projects. Sometimes, the config file is not that clean for spark and may cause some side effects.
for example, `hive.session.history.enabled` will create a log for the hive jobs but useless for spark and also it will not be deleted on JVM exit.
this pr
1) disable `hive.session.history.enabled` explicitly to disable creating `hive_job_log` file, e.g.
```
Hive history file=/var/folders/01/h81cs4sn3dq2dd_k4j6fhrmc0000gn/T//kentyao/hive_job_log_79c63b29-95a4-4935-a9eb-2d89844dfe4f_493861201.txt
```
2) set `hive.execution.engine` to `spark` explicitly in case the config is `tez` and casue uneccesary problem like this:
```
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/tez/dag/api/SessionNotRunning
at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:529)
```
### Why are the changes needed?
reduce overhead of internal complexity and users' hive cognitive load for running spark
### Does this PR introduce any user-facing change?
yes, `hive_job_log` file will not be created even enabled, and will not try to initialize tez kinds of stuff
### How was this patch tested?
add ut and verify manually
Closes #27827 from yaooqinn/SPARK-31066.
Authored-by: Kent Yao <ya...@hotmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../sql/hive/thriftserver/SparkSQLCLIDriver.scala | 8 +--
.../spark/sql/hive/client/HiveClientImpl.scala | 82 +++++++++++++---------
.../spark/sql/hive/client/VersionsSuite.scala | 12 ++++
3 files changed, 64 insertions(+), 38 deletions(-)
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
index 19f7ea8..6b76927 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
@@ -44,6 +44,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.hive.HiveUtils
+import org.apache.spark.sql.hive.client.HiveClientImpl
import org.apache.spark.sql.hive.security.HiveDelegationTokenProvider
import org.apache.spark.util.ShutdownHookManager
@@ -88,12 +89,7 @@ private[hive] object SparkSQLCLIDriver extends Logging {
val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
val extraConfigs = HiveUtils.formatTimeVarsForHiveClient(hadoopConf)
- val cliConf = new HiveConf(classOf[SessionState])
- (hadoopConf.iterator().asScala.map(kv => kv.getKey -> kv.getValue)
- ++ sparkConf.getAll.toMap ++ extraConfigs).foreach {
- case (k, v) =>
- cliConf.set(k, v)
- }
+ val cliConf = HiveClientImpl.newHiveConf(sparkConf, hadoopConf, extraConfigs)
val sessionState = new CliSessionState(cliConf)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index f2c516e..4a3e813 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -58,7 +58,6 @@ import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.HiveExternalCatalog.{DATASOURCE_SCHEMA, DATASOURCE_SCHEMA_NUMPARTS, DATASOURCE_SCHEMA_PART_PREFIX}
import org.apache.spark.sql.hive.HiveUtils
-import org.apache.spark.sql.hive.client.HiveClientImpl._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.{CircularBuffer, Utils}
@@ -99,6 +98,8 @@ private[hive] class HiveClientImpl(
extends HiveClient
with Logging {
+ import HiveClientImpl._
+
// Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur.
private val outputBuffer = new CircularBuffer()
@@ -159,36 +160,7 @@ private[hive] class HiveClientImpl(
s"(version ${version.fullVersion}) is ${conf.getVar(ConfVars.METASTOREWAREHOUSE)}")
private def newState(): SessionState = {
- val hiveConf = new HiveConf(classOf[SessionState])
- // HiveConf is a Hadoop Configuration, which has a field of classLoader and
- // the initial value will be the current thread's context class loader
- // (i.e. initClassLoader at here).
- // We call hiveConf.setClassLoader(initClassLoader) at here to make
- // this action explicit.
- hiveConf.setClassLoader(initClassLoader)
-
- // 1: Take all from the hadoopConf to this hiveConf.
- // This hadoopConf contains user settings in Hadoop's core-site.xml file
- // and Hive's hive-site.xml file. Note, we load hive-site.xml file manually in
- // SharedState and put settings in this hadoopConf instead of relying on HiveConf
- // to load user settings. Otherwise, HiveConf's initialize method will override
- // settings in the hadoopConf. This issue only shows up when spark.sql.hive.metastore.jars
- // is not set to builtin. When spark.sql.hive.metastore.jars is builtin, the classpath
- // has hive-site.xml. So, HiveConf will use that to override its default values.
- // 2: we set all spark confs to this hiveConf.
- // 3: we set all entries in config to this hiveConf.
- val confMap = (hadoopConf.iterator().asScala.map(kv => kv.getKey -> kv.getValue) ++
- sparkConf.getAll.toMap ++ extraConfig).toMap
- confMap.foreach { case (k, v) => hiveConf.set(k, v) }
- SQLConf.get.redactOptions(confMap).foreach { case (k, v) =>
- logDebug(
- s"""
- |Applying Hadoop/Hive/Spark and extra properties to Hive Conf:
- |$k=$v
- """.stripMargin)
- }
- // Disable CBO because we removed the Calcite dependency.
- hiveConf.setBoolean("hive.cbo.enable", false)
+ val hiveConf = newHiveConf(sparkConf, hadoopConf, extraConfig, Some(initClassLoader))
val state = new SessionState(hiveConf)
if (clientLoader.cachedHive != null) {
Hive.set(clientLoader.cachedHive.asInstanceOf[Hive])
@@ -990,7 +962,7 @@ private[hive] class HiveClientImpl(
}
}
-private[hive] object HiveClientImpl {
+private[hive] object HiveClientImpl extends Logging {
/** Converts the native StructField to Hive's FieldSchema. */
def toHiveColumn(c: StructField): FieldSchema = {
val typeString = if (c.metadata.contains(HIVE_TYPE_STRING)) {
@@ -1232,4 +1204,50 @@ private[hive] object HiveClientImpl {
StatsSetupConst.RAW_DATA_SIZE,
StatsSetupConst.TOTAL_SIZE
)
+
+ def newHiveConf(
+ sparkConf: SparkConf,
+ hadoopConf: JIterable[JMap.Entry[String, String]],
+ extraConfig: Map[String, String],
+ classLoader: Option[ClassLoader] = None): HiveConf = {
+ val hiveConf = new HiveConf(classOf[SessionState])
+ // HiveConf is a Hadoop Configuration, which has a field of classLoader and
+ // the initial value will be the current thread's context class loader.
+ // We call hiveConf.setClassLoader(initClassLoader) at here to ensure it use the classloader
+ // we want.
+ classLoader.foreach(hiveConf.setClassLoader)
+ // 1: Take all from the hadoopConf to this hiveConf.
+ // This hadoopConf contains user settings in Hadoop's core-site.xml file
+ // and Hive's hive-site.xml file. Note, we load hive-site.xml file manually in
+ // SharedState and put settings in this hadoopConf instead of relying on HiveConf
+ // to load user settings. Otherwise, HiveConf's initialize method will override
+ // settings in the hadoopConf. This issue only shows up when spark.sql.hive.metastore.jars
+ // is not set to builtin. When spark.sql.hive.metastore.jars is builtin, the classpath
+ // has hive-site.xml. So, HiveConf will use that to override its default values.
+ // 2: we set all spark confs to this hiveConf.
+ // 3: we set all entries in config to this hiveConf.
+ val confMap = (hadoopConf.iterator().asScala.map(kv => kv.getKey -> kv.getValue) ++
+ sparkConf.getAll.toMap ++ extraConfig).toMap
+ confMap.foreach { case (k, v) => hiveConf.set(k, v) }
+ SQLConf.get.redactOptions(confMap).foreach { case (k, v) =>
+ logDebug(s"Applying Hadoop/Hive/Spark and extra properties to Hive Conf:$k=$v")
+ }
+ // Disable CBO because we removed the Calcite dependency.
+ hiveConf.setBoolean("hive.cbo.enable", false)
+ // If this is true, SessionState.start will create a file to log hive job which will not be
+ // deleted on exit and is useless for spark
+ if (hiveConf.getBoolean("hive.session.history.enabled", false)) {
+ logWarning("Detected HiveConf hive.session.history.enabled is true and will be reset to" +
+ " false to disable useless hive logic")
+ hiveConf.setBoolean("hive.session.history.enabled", false)
+ }
+ // If this is tez engine, SessionState.start might bring extra logic to initialize tez stuff,
+ // which is useless for spark.
+ if (hiveConf.get("hive.execution.engine") == "tez") {
+ logWarning("Detected HiveConf hive.execution.engine is 'tez' and will be reset to 'mr'" +
+ " to disable useless hive logic")
+ hiveConf.set("hive.execution.engine", "mr")
+ }
+ hiveConf
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 4760af7..7471142 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -82,6 +82,18 @@ class VersionsSuite extends SparkFunSuite with Logging {
assert("success" === client.getConf("test", null))
}
+ test("override useless and side-effect hive configurations ") {
+ val hadoopConf = new Configuration()
+ // These hive flags should be reset by spark
+ hadoopConf.setBoolean("hive.cbo.enable", true)
+ hadoopConf.setBoolean("hive.session.history.enabled", true)
+ hadoopConf.set("hive.execution.engine", "tez")
+ val client = buildClient(HiveUtils.builtinHiveVersion, hadoopConf)
+ assert(!client.getConf("hive.cbo.enable", "true").toBoolean)
+ assert(!client.getConf("hive.session.history.enabled", "true").toBoolean)
+ assert(client.getConf("hive.execution.engine", "tez") === "mr")
+ }
+
private def getNestedMessages(e: Throwable): String = {
var causes = ""
var lastException = e
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org