You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/08/06 00:30:53 UTC

spark git commit: [SPARK-21637][SPARK-21451][SQL] get `spark.hadoop.*` properties from sysProps to hiveconf

Repository: spark
Updated Branches:
  refs/heads/master dcac1d57f -> 41568e9a0


[SPARK-21637][SPARK-21451][SQL] get `spark.hadoop.*` properties from sysProps to hiveconf

## What changes were proposed in this pull request?
When we use `bin/spark-sql` command configuring `--conf spark.hadoop.foo=bar`, the `SparkSQLCliDriver` initializes an instance of  hiveconf, it does not add `foo->bar` to it.
this pr gets `spark.hadoop.*` properties from sysProps to this hiveconf

## How was this patch tested?
UT

Author: hzyaoqin <hz...@corp.netease.com>
Author: Kent Yao <ya...@hotmail.com>

Closes #18668 from yaooqinn/SPARK-21451.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/41568e9a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/41568e9a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/41568e9a

Branch: refs/heads/master
Commit: 41568e9a0fc4f1373171c6f8dc33c87d9affde70
Parents: dcac1d5
Author: hzyaoqin <hz...@corp.netease.com>
Authored: Sat Aug 5 17:30:47 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Sat Aug 5 17:30:47 2017 -0700

----------------------------------------------------------------------
 .../apache/spark/deploy/SparkHadoopUtil.scala   | 33 +++++++++++++++----
 docs/configuration.md                           | 34 +++++++++++++++++++-
 .../hive/thriftserver/SparkSQLCLIDriver.scala   | 19 +++++++----
 .../spark/sql/hive/thriftserver/CliSuite.scala  | 13 ++++++++
 .../org/apache/spark/sql/hive/HiveUtils.scala   |  8 +++++
 .../apache/spark/sql/hive/HiveUtilsSuite.scala  |  9 ++++++
 6 files changed, 102 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/41568e9a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index ce916b4..eeb6d10 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -22,8 +22,10 @@ import java.security.PrivilegedExceptionAction
 import java.text.DateFormat
 import java.util.{Arrays, Comparator, Date, Locale}
 
+import scala.collection.immutable.Map
 import scala.collection.JavaConverters._
 import scala.collection.mutable
+import scala.collection.mutable.HashMap
 import scala.util.control.NonFatal
 
 import com.google.common.primitives.Longs
@@ -74,7 +76,6 @@ class SparkHadoopUtil extends Logging {
     }
   }
 
-
   /**
    * Appends S3-specific, spark.hadoop.*, and spark.buffer.size configurations to a Hadoop
    * configuration.
@@ -99,18 +100,36 @@ class SparkHadoopUtil extends Logging {
           hadoopConf.set("fs.s3a.session.token", sessionToken)
         }
       }
-      // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
-      conf.getAll.foreach { case (key, value) =>
-        if (key.startsWith("spark.hadoop.")) {
-          hadoopConf.set(key.substring("spark.hadoop.".length), value)
-        }
-      }
+      appendSparkHadoopConfigs(conf, hadoopConf)
       val bufferSize = conf.get("spark.buffer.size", "65536")
       hadoopConf.set("io.file.buffer.size", bufferSize)
     }
   }
 
   /**
+   * Appends spark.hadoop.* configurations from a [[SparkConf]] to a Hadoop
+   * configuration without the spark.hadoop. prefix.
+   */
+  def appendSparkHadoopConfigs(conf: SparkConf, hadoopConf: Configuration): Unit = {
+    // Copy any "spark.hadoop.foo=bar" spark properties into conf as "foo=bar"
+    for ((key, value) <- conf.getAll if key.startsWith("spark.hadoop.")) {
+      hadoopConf.set(key.substring("spark.hadoop.".length), value)
+    }
+  }
+
+  /**
+   * Appends spark.hadoop.* configurations from a Map to another without the spark.hadoop. prefix.
+   */
+  def appendSparkHadoopConfigs(
+      srcMap: Map[String, String],
+      destMap: HashMap[String, String]): Unit = {
+    // Copy any "spark.hadoop.foo=bar" system properties into destMap as "foo=bar"
+    for ((key, value) <- srcMap if key.startsWith("spark.hadoop.")) {
+      destMap.put(key.substring("spark.hadoop.".length), value)
+    }
+  }
+
+  /**
    * Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop
    * subsystems.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/41568e9a/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 011d583..e7c0306 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -2357,5 +2357,37 @@ The location of these configuration files varies across Hadoop versions, but
 a common location is inside of `/etc/hadoop/conf`. Some tools create
 configurations on-the-fly, but offer a mechanisms to download copies of them.
 
-To make these files visible to Spark, set `HADOOP_CONF_DIR` in `$SPARK_HOME/spark-env.sh`
+To make these files visible to Spark, set `HADOOP_CONF_DIR` in `$SPARK_HOME/conf/spark-env.sh`
 to a location containing the configuration files.
+
+# Custom Hadoop/Hive Configuration
+
+If your Spark application is interacting with Hadoop, Hive, or both, there are probably Hadoop/Hive
+configuration files in Spark's classpath.
+
+Multiple running applications might require different Hadoop/Hive client side configurations.
+You can copy and modify `hdfs-site.xml`, `core-site.xml`, `yarn-site.xml`, `hive-site.xml` in
+Spark's classpath for each application. In a Spark cluster running on YARN, these configuration
+files are set cluster-wide, and cannot safely be changed by the application.
+
+The better choice is to use spark hadoop properties in the form of `spark.hadoop.*`. 
+They can be considered as same as normal spark properties which can be set in `$SPARK_HOME/conf/spark-defalut.conf`
+
+In some cases, you may want to avoid hard-coding certain configurations in a `SparkConf`. For
+instance, Spark allows you to simply create an empty conf and set spark/spark hadoop properties.
+
+{% highlight scala %}
+val conf = new SparkConf().set("spark.hadoop.abc.def","xyz")
+val sc = new SparkContext(conf)
+{% endhighlight %}
+
+Also, you can modify or add configurations at runtime:
+{% highlight bash %}
+./bin/spark-submit \ 
+  --name "My app" \ 
+  --master local[4] \  
+  --conf spark.eventLog.enabled=false \ 
+  --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \ 
+  --conf spark.hadoop.abc.def=xyz \ 
+  myApp.jar
+{% endhighlight %}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/41568e9a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
index 38c4589..761e832 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
@@ -50,6 +50,7 @@ private[hive] object SparkSQLCLIDriver extends Logging {
   private val prompt = "spark-sql"
   private val continuedPrompt = "".padTo(prompt.length, ' ')
   private var transport: TSocket = _
+  private final val SPARK_HADOOP_PROP_PREFIX = "spark.hadoop."
 
   installSignalHandler()
 
@@ -134,6 +135,16 @@ private[hive] object SparkSQLCLIDriver extends Logging {
       // Hive 1.2 + not supported in CLI
       throw new RuntimeException("Remote operations not supported")
     }
+    // Respect the configurations set by --hiveconf from the command line
+    // (based on Hive's CliDriver).
+    val hiveConfFromCmd = sessionState.getOverriddenConfigurations.entrySet().asScala
+    val newHiveConf = hiveConfFromCmd.map { kv =>
+      // If the same property is configured by spark.hadoop.xxx, we ignore it and
+      // obey settings from spark properties
+      val k = kv.getKey
+      val v = sys.props.getOrElseUpdate(SPARK_HADOOP_PROP_PREFIX + k, kv.getValue)
+      (k, v)
+    }
 
     val cli = new SparkSQLCLIDriver
     cli.setHiveVariables(oproc.getHiveVariables)
@@ -157,12 +168,8 @@ private[hive] object SparkSQLCLIDriver extends Logging {
     // Execute -i init files (always in silent mode)
     cli.processInitFiles(sessionState)
 
-    // Respect the configurations set by --hiveconf from the command line
-    // (based on Hive's CliDriver).
-    val it = sessionState.getOverriddenConfigurations.entrySet().iterator()
-    while (it.hasNext) {
-      val kv = it.next()
-      SparkSQLEnv.sqlContext.setConf(kv.getKey, kv.getValue)
+    newHiveConf.foreach { kv =>
+      SparkSQLEnv.sqlContext.setConf(kv._1, kv._2)
     }
 
     if (sessionState.execString != null) {

http://git-wip-us.apache.org/repos/asf/spark/blob/41568e9a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
index d3cec11..933fd73 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
@@ -283,4 +283,17 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
       "SET conf3;" -> "conftest"
     )
   }
+
+  test("SPARK-21451: spark.sql.warehouse.dir should respect options in --hiveconf") {
+    runCliWithin(1.minute)("set spark.sql.warehouse.dir;" -> warehousePath.getAbsolutePath)
+  }
+
+  test("SPARK-21451: Apply spark.hadoop.* configurations") {
+    val tmpDir = Utils.createTempDir(namePrefix = "SPARK-21451")
+    runCliWithin(
+      1.minute,
+      Seq(s"--conf", s"spark.hadoop.${ConfVars.METASTOREWAREHOUSE}=$tmpDir"))(
+      "set spark.sql.warehouse.dir;" -> tmpDir.getAbsolutePath)
+    tmpDir.delete()
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/41568e9a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
index be6339f..b32b6fb 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
@@ -36,6 +36,7 @@ import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable}
 import org.apache.hadoop.util.VersionInfo
 
 import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
@@ -404,6 +405,13 @@ private[spark] object HiveUtils extends Logging {
     propMap.put(ConfVars.METASTORE_EVENT_LISTENERS.varname, "")
     propMap.put(ConfVars.METASTORE_END_FUNCTION_LISTENERS.varname, "")
 
+    // SPARK-21451: Spark will gather all `spark.hadoop.*` properties from a `SparkConf` to a
+    // Hadoop Configuration internally, as long as it happens after SparkContext initialized.
+    // Some instances such as `CliSessionState` used in `SparkSQLCliDriver` may also rely on these
+    // Configuration. But it happens before SparkContext initialized, we need to take them from
+    // system properties in the form of regular hadoop configurations.
+    SparkHadoopUtil.get.appendSparkHadoopConfigs(sys.props.toMap, propMap)
+
     propMap.toMap
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/41568e9a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala
index 667a7dd..2ebb1de 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala
@@ -33,4 +33,13 @@ class HiveUtilsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton
       assert(conf(ConfVars.METASTORE_END_FUNCTION_LISTENERS.varname) === "")
     }
   }
+
+  test("newTemporaryConfiguration respect spark.hadoop.foo=bar in SparkConf") {
+    sys.props.put("spark.hadoop.foo", "bar")
+    Seq(true, false) foreach { useInMemoryDerby =>
+      val hiveConf = HiveUtils.newTemporaryConfiguration(useInMemoryDerby)
+      assert(!hiveConf.contains("spark.hadoop.foo"))
+      assert(hiveConf("foo") === "bar")
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org