You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/08/06 00:30:53 UTC
spark git commit: [SPARK-21637][SPARK-21451][SQL] get
`spark.hadoop.*` properties from sysProps to hiveconf
Repository: spark
Updated Branches:
refs/heads/master dcac1d57f -> 41568e9a0
[SPARK-21637][SPARK-21451][SQL] get `spark.hadoop.*` properties from sysProps to hiveconf
## What changes were proposed in this pull request?
When we use `bin/spark-sql` command configuring `--conf spark.hadoop.foo=bar`, the `SparkSQLCliDriver` initializes an instance of hiveconf, it does not add `foo->bar` to it.
this pr gets `spark.hadoop.*` properties from sysProps to this hiveconf
## How was this patch tested?
UT
Author: hzyaoqin <hz...@corp.netease.com>
Author: Kent Yao <ya...@hotmail.com>
Closes #18668 from yaooqinn/SPARK-21451.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/41568e9a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/41568e9a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/41568e9a
Branch: refs/heads/master
Commit: 41568e9a0fc4f1373171c6f8dc33c87d9affde70
Parents: dcac1d5
Author: hzyaoqin <hz...@corp.netease.com>
Authored: Sat Aug 5 17:30:47 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Sat Aug 5 17:30:47 2017 -0700
----------------------------------------------------------------------
.../apache/spark/deploy/SparkHadoopUtil.scala | 33 +++++++++++++++----
docs/configuration.md | 34 +++++++++++++++++++-
.../hive/thriftserver/SparkSQLCLIDriver.scala | 19 +++++++----
.../spark/sql/hive/thriftserver/CliSuite.scala | 13 ++++++++
.../org/apache/spark/sql/hive/HiveUtils.scala | 8 +++++
.../apache/spark/sql/hive/HiveUtilsSuite.scala | 9 ++++++
6 files changed, 102 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/41568e9a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index ce916b4..eeb6d10 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -22,8 +22,10 @@ import java.security.PrivilegedExceptionAction
import java.text.DateFormat
import java.util.{Arrays, Comparator, Date, Locale}
+import scala.collection.immutable.Map
import scala.collection.JavaConverters._
import scala.collection.mutable
+import scala.collection.mutable.HashMap
import scala.util.control.NonFatal
import com.google.common.primitives.Longs
@@ -74,7 +76,6 @@ class SparkHadoopUtil extends Logging {
}
}
-
/**
* Appends S3-specific, spark.hadoop.*, and spark.buffer.size configurations to a Hadoop
* configuration.
@@ -99,18 +100,36 @@ class SparkHadoopUtil extends Logging {
hadoopConf.set("fs.s3a.session.token", sessionToken)
}
}
- // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
- conf.getAll.foreach { case (key, value) =>
- if (key.startsWith("spark.hadoop.")) {
- hadoopConf.set(key.substring("spark.hadoop.".length), value)
- }
- }
+ appendSparkHadoopConfigs(conf, hadoopConf)
val bufferSize = conf.get("spark.buffer.size", "65536")
hadoopConf.set("io.file.buffer.size", bufferSize)
}
}
/**
+ * Appends spark.hadoop.* configurations from a [[SparkConf]] to a Hadoop
+ * configuration without the spark.hadoop. prefix.
+ */
+ def appendSparkHadoopConfigs(conf: SparkConf, hadoopConf: Configuration): Unit = {
+ // Copy any "spark.hadoop.foo=bar" spark properties into conf as "foo=bar"
+ for ((key, value) <- conf.getAll if key.startsWith("spark.hadoop.")) {
+ hadoopConf.set(key.substring("spark.hadoop.".length), value)
+ }
+ }
+
+ /**
+ * Appends spark.hadoop.* configurations from a Map to another without the spark.hadoop. prefix.
+ */
+ def appendSparkHadoopConfigs(
+ srcMap: Map[String, String],
+ destMap: HashMap[String, String]): Unit = {
+ // Copy any "spark.hadoop.foo=bar" system properties into destMap as "foo=bar"
+ for ((key, value) <- srcMap if key.startsWith("spark.hadoop.")) {
+ destMap.put(key.substring("spark.hadoop.".length), value)
+ }
+ }
+
+ /**
* Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop
* subsystems.
*/
http://git-wip-us.apache.org/repos/asf/spark/blob/41568e9a/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 011d583..e7c0306 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -2357,5 +2357,37 @@ The location of these configuration files varies across Hadoop versions, but
a common location is inside of `/etc/hadoop/conf`. Some tools create
configurations on-the-fly, but offer a mechanisms to download copies of them.
-To make these files visible to Spark, set `HADOOP_CONF_DIR` in `$SPARK_HOME/spark-env.sh`
+To make these files visible to Spark, set `HADOOP_CONF_DIR` in `$SPARK_HOME/conf/spark-env.sh`
to a location containing the configuration files.
+
+# Custom Hadoop/Hive Configuration
+
+If your Spark application is interacting with Hadoop, Hive, or both, there are probably Hadoop/Hive
+configuration files in Spark's classpath.
+
+Multiple running applications might require different Hadoop/Hive client side configurations.
+You can copy and modify `hdfs-site.xml`, `core-site.xml`, `yarn-site.xml`, `hive-site.xml` in
+Spark's classpath for each application. In a Spark cluster running on YARN, these configuration
+files are set cluster-wide, and cannot safely be changed by the application.
+
+The better choice is to use spark hadoop properties in the form of `spark.hadoop.*`.
+They can be considered as same as normal spark properties which can be set in `$SPARK_HOME/conf/spark-defalut.conf`
+
+In some cases, you may want to avoid hard-coding certain configurations in a `SparkConf`. For
+instance, Spark allows you to simply create an empty conf and set spark/spark hadoop properties.
+
+{% highlight scala %}
+val conf = new SparkConf().set("spark.hadoop.abc.def","xyz")
+val sc = new SparkContext(conf)
+{% endhighlight %}
+
+Also, you can modify or add configurations at runtime:
+{% highlight bash %}
+./bin/spark-submit \
+ --name "My app" \
+ --master local[4] \
+ --conf spark.eventLog.enabled=false \
+ --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \
+ --conf spark.hadoop.abc.def=xyz \
+ myApp.jar
+{% endhighlight %}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/spark/blob/41568e9a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
index 38c4589..761e832 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
@@ -50,6 +50,7 @@ private[hive] object SparkSQLCLIDriver extends Logging {
private val prompt = "spark-sql"
private val continuedPrompt = "".padTo(prompt.length, ' ')
private var transport: TSocket = _
+ private final val SPARK_HADOOP_PROP_PREFIX = "spark.hadoop."
installSignalHandler()
@@ -134,6 +135,16 @@ private[hive] object SparkSQLCLIDriver extends Logging {
// Hive 1.2 + not supported in CLI
throw new RuntimeException("Remote operations not supported")
}
+ // Respect the configurations set by --hiveconf from the command line
+ // (based on Hive's CliDriver).
+ val hiveConfFromCmd = sessionState.getOverriddenConfigurations.entrySet().asScala
+ val newHiveConf = hiveConfFromCmd.map { kv =>
+ // If the same property is configured by spark.hadoop.xxx, we ignore it and
+ // obey settings from spark properties
+ val k = kv.getKey
+ val v = sys.props.getOrElseUpdate(SPARK_HADOOP_PROP_PREFIX + k, kv.getValue)
+ (k, v)
+ }
val cli = new SparkSQLCLIDriver
cli.setHiveVariables(oproc.getHiveVariables)
@@ -157,12 +168,8 @@ private[hive] object SparkSQLCLIDriver extends Logging {
// Execute -i init files (always in silent mode)
cli.processInitFiles(sessionState)
- // Respect the configurations set by --hiveconf from the command line
- // (based on Hive's CliDriver).
- val it = sessionState.getOverriddenConfigurations.entrySet().iterator()
- while (it.hasNext) {
- val kv = it.next()
- SparkSQLEnv.sqlContext.setConf(kv.getKey, kv.getValue)
+ newHiveConf.foreach { kv =>
+ SparkSQLEnv.sqlContext.setConf(kv._1, kv._2)
}
if (sessionState.execString != null) {
http://git-wip-us.apache.org/repos/asf/spark/blob/41568e9a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
index d3cec11..933fd73 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
@@ -283,4 +283,17 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
"SET conf3;" -> "conftest"
)
}
+
+ test("SPARK-21451: spark.sql.warehouse.dir should respect options in --hiveconf") {
+ runCliWithin(1.minute)("set spark.sql.warehouse.dir;" -> warehousePath.getAbsolutePath)
+ }
+
+ test("SPARK-21451: Apply spark.hadoop.* configurations") {
+ val tmpDir = Utils.createTempDir(namePrefix = "SPARK-21451")
+ runCliWithin(
+ 1.minute,
+ Seq(s"--conf", s"spark.hadoop.${ConfVars.METASTOREWAREHOUSE}=$tmpDir"))(
+ "set spark.sql.warehouse.dir;" -> tmpDir.getAbsolutePath)
+ tmpDir.delete()
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/41568e9a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
index be6339f..b32b6fb 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
@@ -36,6 +36,7 @@ import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable}
import org.apache.hadoop.util.VersionInfo
import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.CatalogTable
@@ -404,6 +405,13 @@ private[spark] object HiveUtils extends Logging {
propMap.put(ConfVars.METASTORE_EVENT_LISTENERS.varname, "")
propMap.put(ConfVars.METASTORE_END_FUNCTION_LISTENERS.varname, "")
+ // SPARK-21451: Spark will gather all `spark.hadoop.*` properties from a `SparkConf` to a
+ // Hadoop Configuration internally, as long as it happens after SparkContext initialized.
+ // Some instances such as `CliSessionState` used in `SparkSQLCliDriver` may also rely on these
+ // Configuration. But it happens before SparkContext initialized, we need to take them from
+ // system properties in the form of regular hadoop configurations.
+ SparkHadoopUtil.get.appendSparkHadoopConfigs(sys.props.toMap, propMap)
+
propMap.toMap
}
http://git-wip-us.apache.org/repos/asf/spark/blob/41568e9a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala
index 667a7dd..2ebb1de 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala
@@ -33,4 +33,13 @@ class HiveUtilsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton
assert(conf(ConfVars.METASTORE_END_FUNCTION_LISTENERS.varname) === "")
}
}
+
+ test("newTemporaryConfiguration respect spark.hadoop.foo=bar in SparkConf") {
+ sys.props.put("spark.hadoop.foo", "bar")
+ Seq(true, false) foreach { useInMemoryDerby =>
+ val hiveConf = HiveUtils.newTemporaryConfiguration(useInMemoryDerby)
+ assert(!hiveConf.contains("spark.hadoop.foo"))
+ assert(hiveConf("foo") === "bar")
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org