You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/02 06:30:03 UTC
[25/33] git commit: Updated docs for SparkConf and handled review
comments
Updated docs for SparkConf and handled review comments
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/0fa58097
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/0fa58097
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/0fa58097
Branch: refs/heads/master
Commit: 0fa5809768cf60ec62b4277f04e23a44dc1582e2
Parents: 994f080
Author: Matei Zaharia <ma...@databricks.com>
Authored: Mon Dec 30 22:17:28 2013 -0500
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Mon Dec 30 22:17:28 2013 -0500
----------------------------------------------------------------------
.../scala/org/apache/spark/Partitioner.scala | 2 +-
.../main/scala/org/apache/spark/SparkConf.scala | 31 +++++----
.../scala/org/apache/spark/SparkContext.scala | 18 +++--
.../main/scala/org/apache/spark/SparkEnv.scala | 13 ++--
.../spark/api/java/JavaSparkContext.scala | 6 ++
.../spark/deploy/FaultToleranceTest.scala | 4 +-
.../org/apache/spark/executor/Executor.scala | 2 +-
.../scala/org/apache/spark/util/Utils.scala | 10 +--
core/src/test/resources/spark.conf | 2 +
docs/_config.yml | 2 +-
docs/configuration.md | 71 +++++++++++++++-----
docs/css/bootstrap.min.css | 2 +-
docs/job-scheduling.md | 21 +++---
docs/monitoring.md | 3 +-
docs/python-programming-guide.md | 15 +++--
docs/quick-start.md | 52 ++++++++++----
docs/running-on-mesos.md | 19 +++---
docs/scala-programming-guide.md | 4 +-
docs/spark-standalone.md | 15 +++--
docs/streaming-programming-guide.md | 4 +-
docs/tuning.md | 21 +++---
python/pyspark/conf.py | 24 +++++--
python/pyspark/context.py | 24 +++----
23 files changed, 241 insertions(+), 124 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0fa58097/core/src/main/scala/org/apache/spark/Partitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index 7cb545a..31b0773 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -52,7 +52,7 @@ object Partitioner {
for (r <- bySize if r.partitioner != None) {
return r.partitioner.get
}
- if (rdd.context.conf.getOrElse("spark.default.parallelism", null) != null) {
+ if (rdd.context.conf.contains("spark.default.parallelism")) {
return new HashPartitioner(rdd.context.defaultParallelism)
} else {
return new HashPartitioner(bySize.head.partitions.size)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0fa58097/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index ae52de4..96239cf 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -16,6 +16,12 @@ import com.typesafe.config.ConfigFactory
* For unit tests, you can also call `new SparkConf(false)` to skip loading external settings and
* get the same configuration no matter what is on the classpath.
*
+ * All setter methods in this class support chaining. For example, you can write
+ * `new SparkConf().setMaster("local").setAppName("My app")`.
+ *
+ * Note that once a SparkConf object is passed to Spark, it is cloned and can no longer be modified
+ * by the user. Spark does not support modifying the configuration at runtime.
+ *
* @param loadDefaults whether to load values from the system properties and classpath
*/
class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {
@@ -69,10 +75,7 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {
/** Set JAR files to distribute to the cluster. (Java-friendly version.) */
def setJars(jars: Array[String]): SparkConf = {
- if (!jars.isEmpty) {
- settings("spark.jars") = jars.mkString(",")
- }
- this
+ setJars(jars.toSeq)
}
/**
@@ -102,15 +105,11 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {
* (Java-friendly version.)
*/
def setExecutorEnv(variables: Array[(String, String)]): SparkConf = {
- for ((k, v) <- variables) {
- setExecutorEnv(k, v)
- }
- this
+ setExecutorEnv(variables.toSeq)
}
/**
- * Set the location where Spark is installed on worker nodes. This is only needed on Mesos if
- * you are not using `spark.executor.uri` to disseminate the Spark binary distribution.
+ * Set the location where Spark is installed on worker nodes.
*/
def setSparkHome(home: String): SparkConf = {
if (home != null) {
@@ -154,8 +153,8 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {
/** Get all executor environment variables set on this SparkConf */
def getExecutorEnv: Seq[(String, String)] = {
val prefix = "spark.executorEnv."
- getAll.filter(pair => pair._1.startsWith(prefix))
- .map(pair => (pair._1.substring(prefix.length), pair._2))
+ getAll.filter{case (k, v) => k.startsWith(prefix)}
+ .map{case (k, v) => (k.substring(prefix.length), v)}
}
/** Does the configuration contain a given parameter? */
@@ -165,4 +164,12 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {
override def clone: SparkConf = {
new SparkConf(false).setAll(settings)
}
+
+ /**
+ * Return a string listing all keys and values, one per line. This is useful to print the
+ * configuration out for debugging.
+ */
+ def toDebugString: String = {
+ settings.toArray.sorted.map{case (k, v) => k + "=" + v}.mkString("\n")
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0fa58097/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 810ed18..8134ce7 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -55,14 +55,14 @@ import org.apache.spark.util._
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
* cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
*
- * @param conf_ a Spark Config object describing the application configuration. Any settings in
+ * @param config a Spark Config object describing the application configuration. Any settings in
* this config overrides the default configs as well as system properties.
* @param preferredNodeLocationData used in YARN mode to select nodes to launch containers on. Can
* be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]]
* from a list of input files or InputFormats for the application.
*/
class SparkContext(
- conf_ : SparkConf,
+ config: SparkConf,
// This is used only by YARN for now, but should be relevant to other cluster types (Mesos, etc)
// too. This is typically generated from InputFormatInfo.computePreferredLocations. It contains
// a map from hostname to a list of input format splits on the host.
@@ -107,7 +107,13 @@ class SparkContext(
preferredNodeLocationData)
}
- val conf = conf_.clone()
+ private[spark] val conf = config.clone()
+
+ /**
+ * Return a copy of this SparkContext's configuration. The configuration ''cannot'' be
+ * changed at runtime.
+ */
+ def getConf: SparkConf = conf.clone()
if (!conf.contains("spark.master")) {
throw new SparkException("A master URL must be set in your configuration")
@@ -135,11 +141,11 @@ class SparkContext(
initLogging()
// Create the Spark execution environment (cache, map output tracker, etc)
- private[spark] val env = SparkEnv.createFromSystemProperties(
+ private[spark] val env = SparkEnv.create(
+ conf,
"<driver>",
conf.get("spark.driver.host"),
conf.get("spark.driver.port").toInt,
- conf,
isDriver = true,
isLocal = isLocal)
SparkEnv.set(env)
@@ -730,7 +736,7 @@ class SparkContext(
* (in that order of preference). If neither of these is set, return None.
*/
private[spark] def getSparkHome(): Option[String] = {
- if (conf.getOrElse("spark.home", null) != null) {
+ if (conf.contains("spark.home")) {
Some(conf.get("spark.home"))
} else if (System.getenv("SPARK_HOME") != null) {
Some(System.getenv("SPARK_HOME"))
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0fa58097/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 34fad3e..d06af8e 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -40,7 +40,7 @@ import com.google.common.collect.MapMaker
* objects needs to have the right SparkEnv set. You can get the current environment with
* SparkEnv.get (e.g. after creating a SparkContext) and set it with SparkEnv.set.
*/
-class SparkEnv (
+class SparkEnv private[spark] (
val executorId: String,
val actorSystem: ActorSystem,
val serializerManager: SerializerManager,
@@ -63,7 +63,7 @@ class SparkEnv (
// (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()
- def stop() {
+ private[spark] def stop() {
pythonWorkers.foreach { case(key, worker) => worker.stop() }
httpFileServer.stop()
mapOutputTracker.stop()
@@ -79,6 +79,7 @@ class SparkEnv (
//actorSystem.awaitTermination()
}
+ private[spark]
def createPythonWorker(pythonExec: String, envVars: Map[String, String]): java.net.Socket = {
synchronized {
val key = (pythonExec, envVars)
@@ -111,11 +112,11 @@ object SparkEnv extends Logging {
env.get()
}
- def createFromSystemProperties(
+ private[spark] def create(
+ conf: SparkConf,
executorId: String,
hostname: String,
port: Int,
- conf: SparkConf,
isDriver: Boolean,
isLocal: Boolean): SparkEnv = {
@@ -129,7 +130,7 @@ object SparkEnv extends Logging {
}
// set only if unset until now.
- if (conf.getOrElse("spark.hostPort", null) == null) {
+ if (!conf.contains("spark.hostPort")) {
if (!isDriver){
// unexpected
Utils.logErrorWithStack("Unexpected NOT to have spark.hostPort set")
@@ -216,7 +217,7 @@ object SparkEnv extends Logging {
}
// Warn about deprecated spark.cache.class property
- if (conf.getOrElse("spark.cache.class", null) != null) {
+ if (conf.contains("spark.cache.class")) {
logWarning("The spark.cache.class property is no longer being used! Specify storage " +
"levels using the RDD.persist() method instead.")
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0fa58097/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index e03cf9d..d6aeed7 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -418,6 +418,12 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
new JavaRDD(sc.checkpointFile(path))
}
+
+ /**
+ * Return a copy of this JavaSparkContext's configuration. The configuration ''cannot'' be
+ * changed at runtime.
+ */
+ def getConf: SparkConf = sc.getConf
}
object JavaSparkContext {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0fa58097/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
index 0aa8852..4dfb19e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
@@ -190,7 +190,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
/** Creates a SparkContext, which constructs a Client to interact with our cluster. */
def createClient() = {
if (sc != null) { sc.stop() }
- // Counter-hack: Because of a hack in SparkEnv#createFromSystemProperties() that changes this
+ // Counter-hack: Because of a hack in SparkEnv#create() that changes this
// property, we need to reset it.
System.setProperty("spark.driver.port", "0")
sc = new SparkContext(getMasterUrls(masters), "fault-tolerance", containerSparkHome)
@@ -417,4 +417,4 @@ private[spark] object Docker extends Logging {
"docker ps -l -q".!(ProcessLogger(line => id = line))
new DockerId(id)
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0fa58097/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index a6eabc4..2400154 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -109,7 +109,7 @@ private[spark] class Executor(
// Initialize Spark environment (using system properties read above)
private val env = {
if (!isLocal) {
- val _env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, conf,
+ val _env = SparkEnv.create(conf, executorId, slaveHostname, 0,
isDriver = false, isLocal = false)
SparkEnv.set(_env)
_env.metricsSystem.registerSource(executorSource)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0fa58097/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index b6b89cc..ca3320b 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -397,12 +397,11 @@ private[spark] object Utils extends Logging {
}
def localHostPort(conf: SparkConf): String = {
- val retval = conf.getOrElse("spark.hostPort", null)
+ val retval = conf.getOrElse("spark.hostPort", null)
if (retval == null) {
logErrorWithStack("spark.hostPort not set but invoking localHostPort")
return localHostName()
}
-
retval
}
@@ -414,9 +413,12 @@ private[spark] object Utils extends Logging {
assert(hostPort.indexOf(':') != -1, message)
}
- // Used by DEBUG code : remove when all testing done
def logErrorWithStack(msg: String) {
- try { throw new Exception } catch { case ex: Exception => { logError(msg, ex) } }
+ try {
+ throw new Exception
+ } catch {
+ case ex: Exception => logError(msg, ex)
+ }
}
// Typically, this will be of order of number of nodes in cluster
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0fa58097/core/src/test/resources/spark.conf
----------------------------------------------------------------------
diff --git a/core/src/test/resources/spark.conf b/core/src/test/resources/spark.conf
index 6c99bdc..aa4e751 100644
--- a/core/src/test/resources/spark.conf
+++ b/core/src/test/resources/spark.conf
@@ -1,3 +1,5 @@
+# A simple spark.conf file used only in our unit tests
+
spark.test.intTestProperty = 1
spark.test {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0fa58097/docs/_config.yml
----------------------------------------------------------------------
diff --git a/docs/_config.yml b/docs/_config.yml
index 02067f9..11d18f0 100644
--- a/docs/_config.yml
+++ b/docs/_config.yml
@@ -4,7 +4,7 @@ markdown: kramdown
# These allow the documentation to be updated with nerw releases
# of Spark, Scala, and Mesos.
SPARK_VERSION: 0.9.0-incubating-SNAPSHOT
-SPARK_VERSION_SHORT: 0.9.0-SNAPSHOT
+SPARK_VERSION_SHORT: 0.9.0
SCALA_VERSION: 2.10
MESOS_VERSION: 0.13.0
SPARK_ISSUE_TRACKER_URL: https://spark-project.atlassian.net
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0fa58097/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 677d182..567aba0 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -3,26 +3,37 @@ layout: global
title: Spark Configuration
---
-Spark provides three main locations to configure the system:
+Spark provides three locations to configure the system:
-* [Java system properties](#system-properties), which control internal configuration parameters and can be set
- either programmatically (by calling `System.setProperty` *before* creating a `SparkContext`) or through
- JVM arguments.
-* [Environment variables](#environment-variables) for configuring per-machine settings such as the IP address,
- which can be set in the `conf/spark-env.sh` script.
-* [Logging configuration](#configuring-logging), which is done through `log4j.properties`.
+* [Spark properties](#spark-properties) control most application parameters and can be set by passing
+ a [SparkConf](api/core/index.html#org.apache.spark.SparkConf) object to SparkContext, or through Java
+ system properties.
+* [Environment variables](#environment-variables) can be used to set per-machine settings, such as
+ the IP address, through the `conf/spark-env.sh` script on each node.
+* [Logging](#configuring-logging) can be configured through `log4j.properties`.
-# System Properties
+# Spark Properties
-To set a system property for configuring Spark, you need to either pass it with a -D flag to the JVM (for example `java -Dspark.cores.max=5 MyProgram`) or call `System.setProperty` in your code *before* creating your Spark context, as follows:
+Spark properties control most application settings and are configured separately for each application.
+The preferred way to set them is by passing a [SparkConf](api/core/index.html#org.apache.spark.SparkConf)
+class to your SparkContext constructor.
+Alternatively, Spark will also load them from Java system properties (for compatibility with old versions
+of Spark) and from a [`spark.conf` file](#configuration-files) on your classpath.
+
+SparkConf lets you configure most of the common properties to initialize a cluster (e.g., master URL and
+application name), as well as arbitrary key-value pairs through the `set()` method. For example, we could
+initialize an application as follows:
{% highlight scala %}
-System.setProperty("spark.cores.max", "5")
-val sc = new SparkContext(...)
+val conf = new SparkConf()
+ .setMaster("local")
+ .setAppName("My application")
+ .set("spark.executor.memory", "1g")
+val sc = new SparkContext(conf)
{% endhighlight %}
-Most of the configurable system properties control internal settings that have reasonable default values. However,
+Most of the properties control internal settings that have reasonable default values. However,
there are at least five properties that you will commonly want to control:
<table class="table">
@@ -385,11 +396,40 @@ Apart from these, the following properties are also available, and may be useful
</tr>
</table>
+## Viewing Spark Properties
+
+The application web UI at `http://<driver>:4040` lists Spark properties in the "Environment" tab.
+This is a useful place to check to make sure that your properties have been set correctly.
+
+## Configuration Files
+
+You can also configure Spark properties through a `spark.conf` file on your Java classpath.
+Because these properties are usually application-specific, we recommend putting this fine *only* on your
+application's classpath, and not in a global Spark classpath.
+
+The `spark.conf` file uses Typesafe Config's [HOCON format](https://github.com/typesafehub/config#json-superset),
+which is a superset of Java properties files and JSON. For example, the following is a simple config file:
+
+{% highlight awk %}
+# Comments are allowed
+spark.executor.memory = 512m
+spark.serializer = org.apache.spark.serializer.KryoSerializer
+{% endhighlight %}
+
+The format also allows hierarchical nesting, as follows:
+
+{% highlight awk %}
+spark.akka {
+ threads = 8
+ timeout = 200
+}
+{% endhighlight %}
+
# Environment Variables
-Certain Spark settings can also be configured through environment variables, which are read from the `conf/spark-env.sh`
+Certain Spark settings can be configured through environment variables, which are read from the `conf/spark-env.sh`
script in the directory where Spark is installed (or `conf/spark-env.cmd` on Windows). These variables are meant to be for machine-specific settings, such
-as library search paths. While Java system properties can also be set here, for application settings, we recommend setting
+as library search paths. While Spark properties can also be set there through `SPARK_JAVA_OPTS`, for per-application settings, we recommend setting
these properties within the application instead of in `spark-env.sh` so that different applications can use different
settings.
@@ -406,7 +446,8 @@ The following variables can be set in `spark-env.sh`:
Note that applications can also add dependencies for themselves through `SparkContext.addJar` -- we recommend
doing that when possible.
* `SPARK_JAVA_OPTS`, to add JVM options. This includes Java options like garbage collector settings and any system
- properties that you'd like to pass with `-D` (e.g., `-Dspark.local.dir=/disk1,/disk2`).
+ properties that you'd like to pass with `-D`. One use case is to set some Spark properties differently on this
+ machine, e.g., `-Dspark.local.dir=/disk1,/disk2`.
* Options for the Spark [standalone cluster scripts](spark-standalone.html#cluster-launch-scripts), such as number of cores
to use on each machine and maximum memory.