You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/02 06:30:03 UTC

[25/33] git commit: Updated docs for SparkConf and handled review comments

Updated docs for SparkConf and handled review comments


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/0fa58097
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/0fa58097
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/0fa58097

Branch: refs/heads/master
Commit: 0fa5809768cf60ec62b4277f04e23a44dc1582e2
Parents: 994f080
Author: Matei Zaharia <ma...@databricks.com>
Authored: Mon Dec 30 22:17:28 2013 -0500
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Mon Dec 30 22:17:28 2013 -0500

----------------------------------------------------------------------
 .../scala/org/apache/spark/Partitioner.scala    |  2 +-
 .../main/scala/org/apache/spark/SparkConf.scala | 31 +++++----
 .../scala/org/apache/spark/SparkContext.scala   | 18 +++--
 .../main/scala/org/apache/spark/SparkEnv.scala  | 13 ++--
 .../spark/api/java/JavaSparkContext.scala       |  6 ++
 .../spark/deploy/FaultToleranceTest.scala       |  4 +-
 .../org/apache/spark/executor/Executor.scala    |  2 +-
 .../scala/org/apache/spark/util/Utils.scala     | 10 +--
 core/src/test/resources/spark.conf              |  2 +
 docs/_config.yml                                |  2 +-
 docs/configuration.md                           | 71 +++++++++++++++-----
 docs/css/bootstrap.min.css                      |  2 +-
 docs/job-scheduling.md                          | 21 +++---
 docs/monitoring.md                              |  3 +-
 docs/python-programming-guide.md                | 15 +++--
 docs/quick-start.md                             | 52 ++++++++++----
 docs/running-on-mesos.md                        | 19 +++---
 docs/scala-programming-guide.md                 |  4 +-
 docs/spark-standalone.md                        | 15 +++--
 docs/streaming-programming-guide.md             |  4 +-
 docs/tuning.md                                  | 21 +++---
 python/pyspark/conf.py                          | 24 +++++--
 python/pyspark/context.py                       | 24 +++----
 23 files changed, 241 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0fa58097/core/src/main/scala/org/apache/spark/Partitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index 7cb545a..31b0773 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -52,7 +52,7 @@ object Partitioner {
     for (r <- bySize if r.partitioner != None) {
       return r.partitioner.get
     }
-    if (rdd.context.conf.getOrElse("spark.default.parallelism", null) != null) {
+    if (rdd.context.conf.contains("spark.default.parallelism")) {
       return new HashPartitioner(rdd.context.defaultParallelism)
     } else {
       return new HashPartitioner(bySize.head.partitions.size)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0fa58097/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index ae52de4..96239cf 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -16,6 +16,12 @@ import com.typesafe.config.ConfigFactory
  * For unit tests, you can also call `new SparkConf(false)` to skip loading external settings and
  * get the same configuration no matter what is on the classpath.
  *
+ * All setter methods in this class support chaining. For example, you can write
+ * `new SparkConf().setMaster("local").setAppName("My app")`.
+ *
+ * Note that once a SparkConf object is passed to Spark, it is cloned and can no longer be modified
+ * by the user. Spark does not support modifying the configuration at runtime.
+ *
  * @param loadDefaults whether to load values from the system properties and classpath
  */
 class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {
@@ -69,10 +75,7 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {
 
   /** Set JAR files to distribute to the cluster. (Java-friendly version.) */
   def setJars(jars: Array[String]): SparkConf = {
-    if (!jars.isEmpty) {
-      settings("spark.jars") = jars.mkString(",")
-    }
-    this
+    setJars(jars.toSeq)
   }
 
   /**
@@ -102,15 +105,11 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {
    * (Java-friendly version.)
    */
   def setExecutorEnv(variables: Array[(String, String)]): SparkConf = {
-    for ((k, v) <- variables) {
-      setExecutorEnv(k, v)
-    }
-    this
+    setExecutorEnv(variables.toSeq)
   }
 
   /**
-   * Set the location where Spark is installed on worker nodes. This is only needed on Mesos if
-   * you are not using `spark.executor.uri` to disseminate the Spark binary distribution.
+   * Set the location where Spark is installed on worker nodes.
    */
   def setSparkHome(home: String): SparkConf = {
     if (home != null) {
@@ -154,8 +153,8 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {
   /** Get all executor environment variables set on this SparkConf */
   def getExecutorEnv: Seq[(String, String)] = {
     val prefix = "spark.executorEnv."
-    getAll.filter(pair => pair._1.startsWith(prefix))
-          .map(pair => (pair._1.substring(prefix.length), pair._2))
+    getAll.filter{case (k, v) => k.startsWith(prefix)}
+          .map{case (k, v) => (k.substring(prefix.length), v)}
   }
 
   /** Does the configuration contain a given parameter? */
@@ -165,4 +164,12 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {
   override def clone: SparkConf = {
     new SparkConf(false).setAll(settings)
   }
+
+  /**
+   * Return a string listing all keys and values, one per line. This is useful to print the
+   * configuration out for debugging.
+   */
+  def toDebugString: String = {
+    settings.toArray.sorted.map{case (k, v) => k + "=" + v}.mkString("\n")
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0fa58097/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 810ed18..8134ce7 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -55,14 +55,14 @@ import org.apache.spark.util._
  * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
  * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
  *
- * @param conf_ a Spark Config object describing the application configuration. Any settings in
+ * @param config a Spark Config object describing the application configuration. Any settings in
  *   this config overrides the default configs as well as system properties.
  * @param preferredNodeLocationData used in YARN mode to select nodes to launch containers on. Can
  *   be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]]
  *   from a list of input files or InputFormats for the application.
  */
 class SparkContext(
-    conf_ : SparkConf,
+    config: SparkConf,
     // This is used only by YARN for now, but should be relevant to other cluster types (Mesos, etc)
     // too. This is typically generated from InputFormatInfo.computePreferredLocations. It contains
     // a map from hostname to a list of input format splits on the host.
@@ -107,7 +107,13 @@ class SparkContext(
       preferredNodeLocationData)
   }
 
-  val conf = conf_.clone()
+  private[spark] val conf = config.clone()
+
+  /**
+   * Return a copy of this SparkContext's configuration. The configuration ''cannot'' be
+   * changed at runtime.
+   */
+  def getConf: SparkConf = conf.clone()
 
   if (!conf.contains("spark.master")) {
     throw new SparkException("A master URL must be set in your configuration")
@@ -135,11 +141,11 @@ class SparkContext(
   initLogging()
 
   // Create the Spark execution environment (cache, map output tracker, etc)
-  private[spark] val env = SparkEnv.createFromSystemProperties(
+  private[spark] val env = SparkEnv.create(
+    conf,
     "<driver>",
     conf.get("spark.driver.host"),
     conf.get("spark.driver.port").toInt,
-    conf,
     isDriver = true,
     isLocal = isLocal)
   SparkEnv.set(env)
@@ -730,7 +736,7 @@ class SparkContext(
    * (in that order of preference). If neither of these is set, return None.
    */
   private[spark] def getSparkHome(): Option[String] = {
-    if (conf.getOrElse("spark.home", null) != null) {
+    if (conf.contains("spark.home")) {
       Some(conf.get("spark.home"))
     } else if (System.getenv("SPARK_HOME") != null) {
       Some(System.getenv("SPARK_HOME"))

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0fa58097/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 34fad3e..d06af8e 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -40,7 +40,7 @@ import com.google.common.collect.MapMaker
  * objects needs to have the right SparkEnv set. You can get the current environment with
  * SparkEnv.get (e.g. after creating a SparkContext) and set it with SparkEnv.set.
  */
-class SparkEnv (
+class SparkEnv private[spark] (
     val executorId: String,
     val actorSystem: ActorSystem,
     val serializerManager: SerializerManager,
@@ -63,7 +63,7 @@ class SparkEnv (
   // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
   private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()
 
-  def stop() {
+  private[spark] def stop() {
     pythonWorkers.foreach { case(key, worker) => worker.stop() }
     httpFileServer.stop()
     mapOutputTracker.stop()
@@ -79,6 +79,7 @@ class SparkEnv (
     //actorSystem.awaitTermination()
   }
 
+  private[spark]
   def createPythonWorker(pythonExec: String, envVars: Map[String, String]): java.net.Socket = {
     synchronized {
       val key = (pythonExec, envVars)
@@ -111,11 +112,11 @@ object SparkEnv extends Logging {
 	  env.get()
   }
 
-  def createFromSystemProperties(
+  private[spark] def create(
+      conf: SparkConf,
       executorId: String,
       hostname: String,
       port: Int,
-      conf: SparkConf,
       isDriver: Boolean,
       isLocal: Boolean): SparkEnv = {
 
@@ -129,7 +130,7 @@ object SparkEnv extends Logging {
     }
 
     // set only if unset until now.
-    if (conf.getOrElse("spark.hostPort",  null) == null) {
+    if (!conf.contains("spark.hostPort")) {
       if (!isDriver){
         // unexpected
         Utils.logErrorWithStack("Unexpected NOT to have spark.hostPort set")
@@ -216,7 +217,7 @@ object SparkEnv extends Logging {
     }
 
     // Warn about deprecated spark.cache.class property
-    if (conf.getOrElse("spark.cache.class", null) != null) {
+    if (conf.contains("spark.cache.class")) {
       logWarning("The spark.cache.class property is no longer being used! Specify storage " +
         "levels using the RDD.persist() method instead.")
     }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0fa58097/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index e03cf9d..d6aeed7 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -418,6 +418,12 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
       implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
     new JavaRDD(sc.checkpointFile(path))
   }
+
+  /**
+   * Return a copy of this JavaSparkContext's configuration. The configuration ''cannot'' be
+   * changed at runtime.
+   */
+  def getConf: SparkConf = sc.getConf
 }
 
 object JavaSparkContext {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0fa58097/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
index 0aa8852..4dfb19e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
@@ -190,7 +190,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
   /** Creates a SparkContext, which constructs a Client to interact with our cluster. */
   def createClient() = {
     if (sc != null) { sc.stop() }
-    // Counter-hack: Because of a hack in SparkEnv#createFromSystemProperties() that changes this
+    // Counter-hack: Because of a hack in SparkEnv#create() that changes this
     // property, we need to reset it.
     System.setProperty("spark.driver.port", "0")
     sc = new SparkContext(getMasterUrls(masters), "fault-tolerance", containerSparkHome)
@@ -417,4 +417,4 @@ private[spark] object Docker extends Logging {
     "docker ps -l -q".!(ProcessLogger(line => id = line))
     new DockerId(id)
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0fa58097/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index a6eabc4..2400154 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -109,7 +109,7 @@ private[spark] class Executor(
   // Initialize Spark environment (using system properties read above)
   private val env = {
     if (!isLocal) {
-      val _env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, conf,
+      val _env = SparkEnv.create(conf, executorId, slaveHostname, 0,
         isDriver = false, isLocal = false)
       SparkEnv.set(_env)
       _env.metricsSystem.registerSource(executorSource)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0fa58097/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index b6b89cc..ca3320b 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -397,12 +397,11 @@ private[spark] object Utils extends Logging {
   }
 
   def localHostPort(conf: SparkConf): String = {
-    val retval = conf.getOrElse("spark.hostPort",  null)
+    val retval = conf.getOrElse("spark.hostPort", null)
     if (retval == null) {
       logErrorWithStack("spark.hostPort not set but invoking localHostPort")
       return localHostName()
     }
-
     retval
   }
 
@@ -414,9 +413,12 @@ private[spark] object Utils extends Logging {
     assert(hostPort.indexOf(':') != -1, message)
   }
 
-  // Used by DEBUG code : remove when all testing done
   def logErrorWithStack(msg: String) {
-    try { throw new Exception } catch { case ex: Exception => { logError(msg, ex) } }
+    try {
+      throw new Exception
+    } catch {
+      case ex: Exception => logError(msg, ex)
+    }
   }
 
   // Typically, this will be of order of number of nodes in cluster

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0fa58097/core/src/test/resources/spark.conf
----------------------------------------------------------------------
diff --git a/core/src/test/resources/spark.conf b/core/src/test/resources/spark.conf
index 6c99bdc..aa4e751 100644
--- a/core/src/test/resources/spark.conf
+++ b/core/src/test/resources/spark.conf
@@ -1,3 +1,5 @@
+# A simple spark.conf file used only in our unit tests
+
 spark.test.intTestProperty = 1
 
 spark.test {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0fa58097/docs/_config.yml
----------------------------------------------------------------------
diff --git a/docs/_config.yml b/docs/_config.yml
index 02067f9..11d18f0 100644
--- a/docs/_config.yml
+++ b/docs/_config.yml
@@ -4,7 +4,7 @@ markdown: kramdown
 # These allow the documentation to be updated with nerw releases
 # of Spark, Scala, and Mesos.
 SPARK_VERSION: 0.9.0-incubating-SNAPSHOT
-SPARK_VERSION_SHORT: 0.9.0-SNAPSHOT
+SPARK_VERSION_SHORT: 0.9.0
 SCALA_VERSION: 2.10
 MESOS_VERSION: 0.13.0
 SPARK_ISSUE_TRACKER_URL: https://spark-project.atlassian.net

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0fa58097/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 677d182..567aba0 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -3,26 +3,37 @@ layout: global
 title: Spark Configuration
 ---
 
-Spark provides three main locations to configure the system:
+Spark provides three locations to configure the system:
 
-* [Java system properties](#system-properties), which control internal configuration parameters and can be set
-  either programmatically (by calling `System.setProperty` *before* creating a `SparkContext`) or through
-  JVM arguments.
-* [Environment variables](#environment-variables) for configuring per-machine settings such as the IP address,
-  which can be set in the `conf/spark-env.sh` script.
-* [Logging configuration](#configuring-logging), which is done through `log4j.properties`.
+* [Spark properties](#spark-properties) control most application parameters and can be set by passing
+  a [SparkConf](api/core/index.html#org.apache.spark.SparkConf) object to SparkContext, or through Java
+  system properties.
+* [Environment variables](#environment-variables) can be used to set per-machine settings, such as
+  the IP address, through the `conf/spark-env.sh` script on each node.
+* [Logging](#configuring-logging) can be configured through `log4j.properties`.
 
 
-# System Properties
+# Spark Properties
 
-To set a system property for configuring Spark, you need to either pass it with a -D flag to the JVM (for example `java -Dspark.cores.max=5 MyProgram`) or call `System.setProperty` in your code *before* creating your Spark context, as follows:
+Spark properties control most application settings and are configured separately for each application.
+The preferred way to set them is by passing a [SparkConf](api/core/index.html#org.apache.spark.SparkConf)
+class to your SparkContext constructor.
+Alternatively, Spark will also load them from Java system properties (for compatibility with old versions
+of Spark) and from a [`spark.conf` file](#configuration-files) on your classpath.
+
+SparkConf lets you configure most of the common properties to initialize a cluster (e.g., master URL and
+application name), as well as arbitrary key-value pairs through the `set()` method. For example, we could
+initialize an application as follows:
 
 {% highlight scala %}
-System.setProperty("spark.cores.max", "5")
-val sc = new SparkContext(...)
+val conf = new SparkConf()
+             .setMaster("local")
+             .setAppName("My application")
+             .set("spark.executor.memory", "1g")
+val sc = new SparkContext(conf)
 {% endhighlight %}
 
-Most of the configurable system properties control internal settings that have reasonable default values. However,
+Most of the properties control internal settings that have reasonable default values. However,
 there are at least five properties that you will commonly want to control:
 
 <table class="table">
@@ -385,11 +396,40 @@ Apart from these, the following properties are also available, and may be useful
 </tr>
 </table>
 
+## Viewing Spark Properties
+
+The application web UI at `http://<driver>:4040` lists Spark properties in the "Environment" tab.
+This is a useful place to check to make sure that your properties have been set correctly.
+
+## Configuration Files
+
+You can also configure Spark properties through a `spark.conf` file on your Java classpath.
+Because these properties are usually application-specific, we recommend putting this fine *only* on your
+application's classpath, and not in a global Spark classpath.
+
+The `spark.conf` file uses Typesafe Config's [HOCON format](https://github.com/typesafehub/config#json-superset),
+which is a superset of Java properties files and JSON. For example, the following is a simple config file:
+
+{% highlight awk %}
+# Comments are allowed
+spark.executor.memory = 512m
+spark.serializer = org.apache.spark.serializer.KryoSerializer
+{% endhighlight %}
+
+The format also allows hierarchical nesting, as follows:
+
+{% highlight awk %}
+spark.akka {
+  threads = 8
+  timeout = 200
+}
+{% endhighlight %}
+
 # Environment Variables
 
-Certain Spark settings can also be configured through environment variables, which are read from the `conf/spark-env.sh`
+Certain Spark settings can be configured through environment variables, which are read from the `conf/spark-env.sh`
 script in the directory where Spark is installed (or `conf/spark-env.cmd` on Windows). These variables are meant to be for machine-specific settings, such
-as library search paths. While Java system properties can also be set here, for application settings, we recommend setting
+as library search paths. While Spark properties can also be set there through `SPARK_JAVA_OPTS`, for per-application settings, we recommend setting
 these properties within the application instead of in `spark-env.sh` so that different applications can use different
 settings.
 
@@ -406,7 +446,8 @@ The following variables can be set in `spark-env.sh`:
    Note that applications can also add dependencies for themselves through `SparkContext.addJar` -- we recommend
    doing that when possible.
 * `SPARK_JAVA_OPTS`, to add JVM options. This includes Java options like garbage collector settings and any system
-   properties that you'd like to pass with `-D` (e.g., `-Dspark.local.dir=/disk1,/disk2`). 
+   properties that you'd like to pass with `-D`. One use case is to set some Spark properties differently on this
+   machine, e.g., `-Dspark.local.dir=/disk1,/disk2`.
 * Options for the Spark [standalone cluster scripts](spark-standalone.html#cluster-launch-scripts), such as number of cores
   to use on each machine and maximum memory.