You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2021/02/05 21:21:09 UTC

[spark] branch branch-3.0 updated: [SPARK-34346][CORE][SQL][3.0] io.file.buffer.size set by spark.buffer.size will override by loading hive-site.xml accidentally may cause perf regression

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new b223135  [SPARK-34346][CORE][SQL][3.0] io.file.buffer.size set by spark.buffer.size will override by loading hive-site.xml accidentally may cause perf regression
b223135 is described below

commit b2231354413aa6f9af06b30d7aba3c0b9372bd87
Author: Kent Yao <ya...@apache.org>
AuthorDate: Fri Feb 5 13:20:28 2021 -0800

    [SPARK-34346][CORE][SQL][3.0] io.file.buffer.size set by spark.buffer.size will override by loading hive-site.xml accidentally may cause perf regression
    
    Backport  #31460 to 3.0
    
    ### What changes were proposed in this pull request?
    In many real-world cases, when interacting with hive catalog through Spark SQL, users may just share the `hive-site.xml` for their hive jobs and make a copy to `SPARK_HOME`/conf w/o modification. In Spark, when we generate Hadoop configurations, we will use `spark.buffer.size(65536)` to reset `io.file.buffer.size(4096)`. But when we load the hive-site.xml, we may ignore this behavior and reset `io.file.buffer.size` again according to `hive-site.xml`.
    
    1. The configuration priority for setting Hadoop and Hive config here is not right, while literally, the order should be `spark > spark.hive > spark.hadoop > hive > hadoop`
    
    2. This breaks `spark.buffer.size` congfig's behavior for tuning the IO performance w/ HDFS if there is an existing `io.file.buffer.size` in hive-site.xml
    
    ### Why are the changes needed?
    
    bugfix for configuration behavior and fix performance regression by that behavior change
    ### Does this PR introduce _any_ user-facing change?
    
    this pr restores silent user face change
    ### How was this patch tested?
    
    new tests
    
    Closes #31492 from yaooqinn/SPARK-34346-30.
    
    Authored-by: Kent Yao <ya...@apache.org>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../org/apache/spark/deploy/SparkHadoopUtil.scala  | 11 ++++++-
 core/src/test/resources/core-site.xml              | 24 +++++++++++++++
 core/src/test/resources/hive-site.xml              | 34 ++++++++++++++++++++++
 .../scala/org/apache/spark/SparkContextSuite.scala | 33 +++++++++++++++++++++
 .../apache/spark/sql/internal/SharedState.scala    | 31 ++++++++------------
 .../spark/sql/internal/SharedStateSuite.scala      | 11 -------
 .../sql/hive/thriftserver/SparkSQLCLIDriver.scala  |  2 +-
 7 files changed, 114 insertions(+), 32 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 6f799a5..d872c3b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -392,7 +392,7 @@ private[spark] class SparkHadoopUtil extends Logging {
 
 }
 
-private[spark] object SparkHadoopUtil {
+private[spark] object SparkHadoopUtil extends Logging {
 
   private lazy val instance = new SparkHadoopUtil
 
@@ -450,6 +450,7 @@ private[spark] object SparkHadoopUtil {
           hadoopConf.set("fs.s3a.session.token", sessionToken)
         }
       }
+      loadHiveConfFile(conf, hadoopConf)
       appendSparkHadoopConfigs(conf, hadoopConf)
       appendSparkHiveConfigs(conf, hadoopConf)
       val bufferSize = conf.get(BUFFER_SIZE).toString
@@ -457,6 +458,14 @@ private[spark] object SparkHadoopUtil {
     }
   }
 
+  private def loadHiveConfFile(conf: SparkConf, hadoopConf: Configuration): Unit = {
+    val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml")
+    if (configFile != null) {
+      logInfo(s"Loading hive config file: $configFile")
+      hadoopConf.addResource(configFile)
+    }
+  }
+
   private def appendSparkHadoopConfigs(conf: SparkConf, hadoopConf: Configuration): Unit = {
     // Copy any "spark.hadoop.foo=bar" spark properties into conf as "foo=bar"
     for ((key, value) <- conf.getAll if key.startsWith("spark.hadoop.")) {
diff --git a/core/src/test/resources/core-site.xml b/core/src/test/resources/core-site.xml
new file mode 100644
index 0000000..84eddf8
--- /dev/null
+++ b/core/src/test/resources/core-site.xml
@@ -0,0 +1,24 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <property>
+        <name>hadoop.tmp.dir</name>
+        <value>/tmp/hive_zero</value>
+        <description>default is /tmp/hadoop-${user.name} and will be overridden</description>
+    </property>
+</configuration>
diff --git a/core/src/test/resources/hive-site.xml b/core/src/test/resources/hive-site.xml
new file mode 100644
index 0000000..d7117c3
--- /dev/null
+++ b/core/src/test/resources/hive-site.xml
@@ -0,0 +1,34 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <property>
+        <name>hive.in.test</name>
+        <value>true</value>
+        <description>Internal marker for test.</description>
+    </property>
+    <property>
+        <name>hadoop.tmp.dir</name>
+        <value>/tmp/hive_one</value>
+        <description>default is /tmp/hadoop-${user.name} and will be overridden</description>
+    </property>
+
+    <property>
+        <name>io.file.buffer.size</name>
+        <value>201811</value>
+    </property>
+</configuration>
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 7b2431a..e0a5f52 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -974,6 +974,39 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
       new SparkContext(new SparkConf().setAppName("test").setMaster("local")).stop()
     }
   }
+
+  test("SPARK-34346: hadoop configuration priority for spark/hive/hadoop configs") {
+    val testKey = "hadoop.tmp.dir"
+    val bufferKey = "io.file.buffer.size"
+    val hadoopConf0 = new Configuration()
+
+    val hiveConfFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml")
+    assert(hiveConfFile != null)
+    hadoopConf0.addResource(hiveConfFile)
+    assert(hadoopConf0.get(testKey) === "/tmp/hive_one")
+    assert(hadoopConf0.get(bufferKey) === "201811")
+
+    val sparkConf = new SparkConf()
+      .setAppName("test")
+      .setMaster("local")
+      .set(BUFFER_SIZE, 65536)
+    sc = new SparkContext(sparkConf)
+    assert(sc.hadoopConfiguration.get(testKey) === "/tmp/hive_one",
+      "hive configs have higher priority than hadoop ones ")
+    assert(sc.hadoopConfiguration.get(bufferKey).toInt === 65536,
+      "spark configs have higher priority than hive ones")
+
+    resetSparkContext()
+
+    sparkConf
+      .set("spark.hadoop.hadoop.tmp.dir", "/tmp/hive_two")
+      .set(s"spark.hadoop.$bufferKey", "20181117")
+    sc = new SparkContext(sparkConf)
+    assert(sc.hadoopConfiguration.get(testKey) === "/tmp/hive_two",
+      "spark.hadoop configs have higher priority than hive/hadoop ones")
+    assert(sc.hadoopConfiguration.get(bufferKey).toInt === 65536,
+      "spark configs have higher priority than spark.hadoop configs")
+  }
 }
 
 object SparkContextSuite {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index 1922a58..99d2482 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -58,7 +58,7 @@ private[sql] class SharedState(
   private val (conf, hadoopConf) = {
     // Load hive-site.xml into hadoopConf and determine the warehouse path which will be set into
     // both spark conf and hadoop conf avoiding be affected by any SparkSession level options
-    SharedState.loadHiveConfFile(sparkContext.conf, sparkContext.hadoopConfiguration)
+    SharedState.resolveWarehousePath(sparkContext.conf, sparkContext.hadoopConfiguration)
     val confClone = sparkContext.conf.clone()
     val hadoopConfClone = new Configuration(sparkContext.hadoopConfiguration)
     // If `SparkSession` is instantiated using an existing `SparkContext` instance and no existing
@@ -222,28 +222,21 @@ object SharedState extends Logging {
   }
 
   /**
-   * Load hive-site.xml into hadoopConf and determine the warehouse path we want to use, based on
-   * the config from both hive and Spark SQL. Finally set the warehouse config value to sparkConf.
+   * Determine the warehouse path using the key `spark.sql.warehouse.dir` in the [[SparkConf]], and
+   * `hive.metastore.warehouse.dir` in hadoop [[Configuration]].
+   * The priority order is:
+   * s.s.w.d in spark conf (user specified)
+   *   > h.m.w.d in hadoop conf (user specified)
+   *   > s.s.w.d in spark conf (default)
+   *
+   * After resolved, the final value will be application wide reachable in the sparkConf and
+   * hadoopConf from [[SparkContext]].
+   *
    */
-  def loadHiveConfFile(
+  def resolveWarehousePath(
       sparkConf: SparkConf,
       hadoopConf: Configuration): Unit = {
-    def containsInSparkConf(key: String): Boolean = {
-      sparkConf.contains(key) || sparkConf.contains("spark.hadoop." + key) ||
-        (key.startsWith("hive") && sparkConf.contains("spark." + key))
-    }
-
     val hiveWarehouseKey = "hive.metastore.warehouse.dir"
-    val configFile = Utils.getContextOrSparkClassLoader.getResourceAsStream("hive-site.xml")
-    if (configFile != null) {
-      logInfo(s"loading hive config file: $configFile")
-      val hadoopConfTemp = new Configuration()
-      hadoopConfTemp.clear()
-      hadoopConfTemp.addResource(configFile)
-      for (entry <- hadoopConfTemp.asScala if !containsInSparkConf(entry.getKey)) {
-        hadoopConf.set(entry.getKey, entry.getValue)
-      }
-    }
     // hive.metastore.warehouse.dir only stay in hadoopConf
     sparkConf.remove(hiveWarehouseKey)
     // Set the Hive metastore warehouse path to the one we use
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala
index 4d33fc1..81bf153 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala
@@ -52,15 +52,4 @@ class SharedStateSuite extends SharedSparkSession {
     assert(conf.isInstanceOf[Configuration])
     assert(conf.asInstanceOf[Configuration].get("fs.defaultFS") == "file:///")
   }
-
-  test("SPARK-33740: hadoop configs in hive-site.xml can overrides pre-existing hadoop ones") {
-    val conf = new SparkConf()
-    val hadoopConf = new Configuration()
-    SharedState.loadHiveConfFile(conf, hadoopConf)
-    assert(hadoopConf.get("hadoop.tmp.dir") === "/tmp/hive_one")
-    hadoopConf.clear()
-    SharedState.loadHiveConfFile(
-      conf.set("spark.hadoop.hadoop.tmp.dir", "noop"), hadoopConf)
-    assert(hadoopConf.get("hadoop.tmp.dir") === null)
-  }
 }
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
index 581aa68..ada36c1 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
@@ -131,7 +131,7 @@ private[hive] object SparkSQLCLIDriver extends Logging {
       UserGroupInformation.getCurrentUser.addCredentials(credentials)
     }
 
-    SharedState.loadHiveConfFile(sparkConf, conf)
+    SharedState.resolveWarehousePath(sparkConf, conf)
     SessionState.start(sessionState)
 
     // Clean up after we exit


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org