You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/03/12 10:18:47 UTC

[spark] branch branch-3.0 updated: [SPARK-31066][SQL][TEST-HIVE1.2] Disable useless and uncleaned hive SessionState initialization parts

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 5d61626  [SPARK-31066][SQL][TEST-HIVE1.2] Disable useless and uncleaned hive SessionState initialization parts
5d61626 is described below

commit 5d616262954f0b163e74c7a5527e5b3dfa913f52
Author: Kent Yao <ya...@hotmail.com>
AuthorDate: Thu Mar 12 18:13:52 2020 +0800

    [SPARK-31066][SQL][TEST-HIVE1.2] Disable useless and uncleaned hive SessionState initialization parts
    
    ### What changes were proposed in this pull request?
    
    As a common usage and according to the spark doc, users may often just copy their `hive-site.xml` to Spark directly from hive projects. Sometimes, the config file is not that clean for spark and may cause some side effects.
    
    for example, `hive.session.history.enabled` will create a log for the hive jobs but useless for spark and also it will not be deleted on JVM exit.
    
    this pr
     1) disable `hive.session.history.enabled` explicitly to disable creating `hive_job_log` file, e.g.
    ```
    Hive history file=/var/folders/01/h81cs4sn3dq2dd_k4j6fhrmc0000gn/T//kentyao/hive_job_log_79c63b29-95a4-4935-a9eb-2d89844dfe4f_493861201.txt
    ```
    2) set `hive.execution.engine` to `spark` explicitly in case the config is `tez` and casue uneccesary problem like this:
    
    ```
    Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/tez/dag/api/SessionNotRunning
    	at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:529)
    ```
    
    ### Why are the changes needed?
    
    reduce overhead of internal complexity and users' hive cognitive load for  running spark
    
    ### Does this PR introduce any user-facing change?
    
    yes, `hive_job_log` file will not be created even enabled, and will not try to initialize tez kinds of stuff
    ### How was this patch tested?
    
    add ut and verify manually
    
    Closes #27827 from yaooqinn/SPARK-31066.
    
    Authored-by: Kent Yao <ya...@hotmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 18f27308749e695f9942768d7ba85cef9fceb174)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../sql/hive/thriftserver/SparkSQLCLIDriver.scala  |  8 +--
 .../spark/sql/hive/client/HiveClientImpl.scala     | 82 +++++++++++++---------
 .../spark/sql/hive/client/VersionsSuite.scala      | 12 ++++
 3 files changed, 64 insertions(+), 38 deletions(-)

diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
index 19f7ea8..6b76927 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
@@ -44,6 +44,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.hive.HiveUtils
+import org.apache.spark.sql.hive.client.HiveClientImpl
 import org.apache.spark.sql.hive.security.HiveDelegationTokenProvider
 import org.apache.spark.util.ShutdownHookManager
 
@@ -88,12 +89,7 @@ private[hive] object SparkSQLCLIDriver extends Logging {
     val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
     val extraConfigs = HiveUtils.formatTimeVarsForHiveClient(hadoopConf)
 
-    val cliConf = new HiveConf(classOf[SessionState])
-    (hadoopConf.iterator().asScala.map(kv => kv.getKey -> kv.getValue)
-      ++ sparkConf.getAll.toMap ++ extraConfigs).foreach {
-      case (k, v) =>
-        cliConf.set(k, v)
-    }
+    val cliConf = HiveClientImpl.newHiveConf(sparkConf, hadoopConf, extraConfigs)
 
     val sessionState = new CliSessionState(cliConf)
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index f2c516e..4a3e813 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -58,7 +58,6 @@ import org.apache.spark.sql.execution.QueryExecutionException
 import org.apache.spark.sql.hive.HiveExternalCatalog
 import org.apache.spark.sql.hive.HiveExternalCatalog.{DATASOURCE_SCHEMA, DATASOURCE_SCHEMA_NUMPARTS, DATASOURCE_SCHEMA_PART_PREFIX}
 import org.apache.spark.sql.hive.HiveUtils
-import org.apache.spark.sql.hive.client.HiveClientImpl._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.util.{CircularBuffer, Utils}
@@ -99,6 +98,8 @@ private[hive] class HiveClientImpl(
   extends HiveClient
   with Logging {
 
+  import HiveClientImpl._
+
   // Circular buffer to hold what hive prints to STDOUT and ERR.  Only printed when failures occur.
   private val outputBuffer = new CircularBuffer()
 
@@ -159,36 +160,7 @@ private[hive] class HiveClientImpl(
       s"(version ${version.fullVersion}) is ${conf.getVar(ConfVars.METASTOREWAREHOUSE)}")
 
   private def newState(): SessionState = {
-    val hiveConf = new HiveConf(classOf[SessionState])
-    // HiveConf is a Hadoop Configuration, which has a field of classLoader and
-    // the initial value will be the current thread's context class loader
-    // (i.e. initClassLoader at here).
-    // We call hiveConf.setClassLoader(initClassLoader) at here to make
-    // this action explicit.
-    hiveConf.setClassLoader(initClassLoader)
-
-    // 1: Take all from the hadoopConf to this hiveConf.
-    // This hadoopConf contains user settings in Hadoop's core-site.xml file
-    // and Hive's hive-site.xml file. Note, we load hive-site.xml file manually in
-    // SharedState and put settings in this hadoopConf instead of relying on HiveConf
-    // to load user settings. Otherwise, HiveConf's initialize method will override
-    // settings in the hadoopConf. This issue only shows up when spark.sql.hive.metastore.jars
-    // is not set to builtin. When spark.sql.hive.metastore.jars is builtin, the classpath
-    // has hive-site.xml. So, HiveConf will use that to override its default values.
-    // 2: we set all spark confs to this hiveConf.
-    // 3: we set all entries in config to this hiveConf.
-    val confMap = (hadoopConf.iterator().asScala.map(kv => kv.getKey -> kv.getValue) ++
-      sparkConf.getAll.toMap ++ extraConfig).toMap
-    confMap.foreach { case (k, v) => hiveConf.set(k, v) }
-    SQLConf.get.redactOptions(confMap).foreach { case (k, v) =>
-      logDebug(
-        s"""
-           |Applying Hadoop/Hive/Spark and extra properties to Hive Conf:
-           |$k=$v
-         """.stripMargin)
-    }
-    // Disable CBO because we removed the Calcite dependency.
-    hiveConf.setBoolean("hive.cbo.enable", false)
+    val hiveConf = newHiveConf(sparkConf, hadoopConf, extraConfig, Some(initClassLoader))
     val state = new SessionState(hiveConf)
     if (clientLoader.cachedHive != null) {
       Hive.set(clientLoader.cachedHive.asInstanceOf[Hive])
@@ -990,7 +962,7 @@ private[hive] class HiveClientImpl(
   }
 }
 
-private[hive] object HiveClientImpl {
+private[hive] object HiveClientImpl extends Logging {
   /** Converts the native StructField to Hive's FieldSchema. */
   def toHiveColumn(c: StructField): FieldSchema = {
     val typeString = if (c.metadata.contains(HIVE_TYPE_STRING)) {
@@ -1232,4 +1204,50 @@ private[hive] object HiveClientImpl {
     StatsSetupConst.RAW_DATA_SIZE,
     StatsSetupConst.TOTAL_SIZE
   )
+
+  def newHiveConf(
+      sparkConf: SparkConf,
+      hadoopConf: JIterable[JMap.Entry[String, String]],
+      extraConfig: Map[String, String],
+      classLoader: Option[ClassLoader] = None): HiveConf = {
+    val hiveConf = new HiveConf(classOf[SessionState])
+    // HiveConf is a Hadoop Configuration, which has a field of classLoader and
+    // the initial value will be the current thread's context class loader.
+    // We call hiveConf.setClassLoader(initClassLoader) at here to ensure it use the classloader
+    // we want.
+    classLoader.foreach(hiveConf.setClassLoader)
+    // 1: Take all from the hadoopConf to this hiveConf.
+    // This hadoopConf contains user settings in Hadoop's core-site.xml file
+    // and Hive's hive-site.xml file. Note, we load hive-site.xml file manually in
+    // SharedState and put settings in this hadoopConf instead of relying on HiveConf
+    // to load user settings. Otherwise, HiveConf's initialize method will override
+    // settings in the hadoopConf. This issue only shows up when spark.sql.hive.metastore.jars
+    // is not set to builtin. When spark.sql.hive.metastore.jars is builtin, the classpath
+    // has hive-site.xml. So, HiveConf will use that to override its default values.
+    // 2: we set all spark confs to this hiveConf.
+    // 3: we set all entries in config to this hiveConf.
+    val confMap = (hadoopConf.iterator().asScala.map(kv => kv.getKey -> kv.getValue) ++
+      sparkConf.getAll.toMap ++ extraConfig).toMap
+    confMap.foreach { case (k, v) => hiveConf.set(k, v) }
+    SQLConf.get.redactOptions(confMap).foreach { case (k, v) =>
+      logDebug(s"Applying Hadoop/Hive/Spark and extra properties to Hive Conf:$k=$v")
+    }
+    // Disable CBO because we removed the Calcite dependency.
+    hiveConf.setBoolean("hive.cbo.enable", false)
+    // If this is true, SessionState.start will create a file to log hive job which will not be
+    // deleted on exit and is useless for spark
+    if (hiveConf.getBoolean("hive.session.history.enabled", false)) {
+      logWarning("Detected HiveConf hive.session.history.enabled is true and will be reset to" +
+        " false to disable useless hive logic")
+      hiveConf.setBoolean("hive.session.history.enabled", false)
+    }
+    // If this is tez engine, SessionState.start might bring extra logic to initialize tez stuff,
+    // which is useless for spark.
+    if (hiveConf.get("hive.execution.engine") == "tez") {
+      logWarning("Detected HiveConf hive.execution.engine is 'tez' and will be reset to 'mr'" +
+        " to disable useless hive logic")
+      hiveConf.set("hive.execution.engine", "mr")
+    }
+    hiveConf
+  }
 }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 4760af7..7471142 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -82,6 +82,18 @@ class VersionsSuite extends SparkFunSuite with Logging {
     assert("success" === client.getConf("test", null))
   }
 
+  test("override useless and side-effect hive configurations ") {
+    val hadoopConf = new Configuration()
+    // These hive flags should be reset by spark
+    hadoopConf.setBoolean("hive.cbo.enable", true)
+    hadoopConf.setBoolean("hive.session.history.enabled", true)
+    hadoopConf.set("hive.execution.engine", "tez")
+    val client = buildClient(HiveUtils.builtinHiveVersion, hadoopConf)
+    assert(!client.getConf("hive.cbo.enable", "true").toBoolean)
+    assert(!client.getConf("hive.session.history.enabled", "true").toBoolean)
+    assert(client.getConf("hive.execution.engine", "tez") === "mr")
+  }
+
   private def getNestedMessages(e: Throwable): String = {
     var causes = ""
     var lastException = e


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org