You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/01/08 06:30:40 UTC

[3/6] git commit: Add way to limit default # of cores used by applications on standalone mode

Add way to limit default # of cores used by applications on standalone mode

Also documents the spark.deploy.spreadOut option.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/d8bcc8e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/d8bcc8e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/d8bcc8e9

Branch: refs/heads/master
Commit: d8bcc8e9a095c1b20dd7a17b6535800d39bff80e
Parents: 15d9534
Author: Matei Zaharia <ma...@databricks.com>
Authored: Tue Jan 7 14:35:52 2014 -0500
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Tue Jan 7 14:35:52 2014 -0500

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/SparkConf.scala |  7 ++++-
 .../scala/org/apache/spark/SparkContext.scala   |  2 +-
 .../spark/deploy/master/ApplicationInfo.scala   |  7 +++--
 .../org/apache/spark/deploy/master/Master.scala |  8 +++--
 docs/configuration.md                           | 33 +++++++++++++++++---
 docs/css/bootstrap.min.css                      |  2 +-
 docs/job-scheduling.md                          |  5 ++-
 docs/spark-standalone.md                        | 10 ++++++
 8 files changed, 60 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d8bcc8e9/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index b166527..2de3223 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -67,7 +67,7 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable with
 
   /** Set JAR files to distribute to the cluster. */
   def setJars(jars: Seq[String]): SparkConf = {
-    for (jar <- jars if (jar == null)) logWarning("null jar passed to SparkContext constructor") 
+    for (jar <- jars if (jar == null)) logWarning("null jar passed to SparkContext constructor")
     set("spark.jars", jars.filter(_ != null).mkString(","))
   }
 
@@ -165,6 +165,11 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable with
     getOption(key).map(_.toDouble).getOrElse(defaultValue)
   }
 
+  /** Get a parameter as a boolean, falling back to a default if not set */
+  def getBoolean(key: String, defaultValue: Boolean): Boolean = {
+    getOption(key).map(_.toBoolean).getOrElse(defaultValue)
+  }
+
   /** Get all executor environment variables set on this SparkConf */
   def getExecutorEnv: Seq[(String, String)] = {
     val prefix = "spark.executorEnv."

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d8bcc8e9/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 99dcced..0e47f4e 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -116,7 +116,7 @@ class SparkContext(
     throw new SparkException("An application must be set in your configuration")
   }
 
-  if (conf.get("spark.log-conf", "false").toBoolean) {
+  if (conf.get("spark.logConf", "false").toBoolean) {
     logInfo("Spark configuration:\n" + conf.toDebugString)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d8bcc8e9/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index 5150b7c..1321d92 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -28,7 +28,8 @@ private[spark] class ApplicationInfo(
     val desc: ApplicationDescription,
     val submitDate: Date,
     val driver: ActorRef,
-    val appUiUrl: String)
+    val appUiUrl: String,
+    defaultCores: Int)
   extends Serializable {
 
   @transient var state: ApplicationState.Value = _
@@ -81,7 +82,9 @@ private[spark] class ApplicationInfo(
     }
   }
 
-  def coresLeft: Int = desc.maxCores - coresGranted
+  private val myMaxCores = if (desc.maxCores == Int.MaxValue) defaultCores else desc.maxCores
+
+  def coresLeft: Int = myMaxCores - coresGranted
 
   private var _retryCount = 0
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d8bcc8e9/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 7b696cf..ee01fb1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -88,7 +88,10 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
   // As a temporary workaround before better ways of configuring memory, we allow users to set
   // a flag that will perform round-robin scheduling across the nodes (spreading out each app
   // among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
-  val spreadOutApps = conf.get("spark.deploy.spreadOut", "true").toBoolean
+  val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)
+
+  // Default maxCores for applications that don't specify it (i.e. pass Int.MaxValue)
+  val defaultCores = conf.getInt("spark.deploy.defaultCores", Int.MaxValue)
 
   override def preStart() {
     logInfo("Starting Spark master at " + masterUrl)
@@ -426,7 +429,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
   def createApplication(desc: ApplicationDescription, driver: ActorRef): ApplicationInfo = {
     val now = System.currentTimeMillis()
     val date = new Date(now)
-    new ApplicationInfo(now, newApplicationId(date), desc, date, driver, desc.appUiUrl)
+    new ApplicationInfo(
+      now, newApplicationId(date), desc, date, driver, desc.appUiUrl, defaultCores)
   }
 
   def registerApplication(app: ApplicationInfo): Unit = {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d8bcc8e9/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 1d36ecb..52ed59b 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -77,13 +77,14 @@ there are at least five properties that you will commonly want to control:
 </tr>
 <tr>
   <td>spark.cores.max</td>
-  <td>(infinite)</td>
+  <td>(not set)</td>
   <td>
     When running on a <a href="spark-standalone.html">standalone deploy cluster</a> or a
     <a href="running-on-mesos.html#mesos-run-modes">Mesos cluster in "coarse-grained"
     sharing mode</a>, the maximum amount of CPU cores to request for the application from
-    across the cluster (not from each machine). The default will use all available cores
-    offered by the cluster manager.
+    across the cluster (not from each machine). If not set, the default will be
+    <code>spark.deploy.defaultCores</code> on Spark's standalone cluster manager, or
+    infinite (all available cores) on Mesos.
   </td>
 </tr>
 </table>
@@ -404,12 +405,36 @@ Apart from these, the following properties are also available, and may be useful
   </td>
 </tr>
 <tr>
-  <td>spark.log-conf</td>
+  <td>spark.logConf</td>
   <td>false</td>
   <td>
     Log the supplied SparkConf as INFO at start of spark context.
   </td>
 </tr>
+<tr>
+  <td>spark.deploy.spreadOut</td>
+  <td>true</td>
+  <td>
+    Whether the standalone cluster manager should spread applications out across nodes or try
+    to consolidate them onto as few nodes as possible. Spreading out is usually better for
+    data locality in HDFS, but consolidating is more efficient for compute-intensive workloads. <br/>
+    <b>Note:</b> this setting needs to be configured in the cluster master, not in individual
+    applications; you can set it through <code>SPARK_JAVA_OPTS</code> in <code>spark-env.sh</code>.
+  </td>
+</tr>
+<tr>
+  <td>spark.deploy.defaultCores</td>
+  <td>(infinite)</td>
+  <td>
+    Default number of cores to give to applications in Spark's standalone mode if they don't
+    set <code>spark.cores.max</code>. If not set, applications always get all available
+    cores unless they configure <code>spark.cores.max</code> themselves.
+    Set this lower on a shared cluster to prevent users from grabbing
+    the whole cluster by default. <br/>
+    <b>Note:</b> this setting needs to be configured in the cluster master, not in individual
+    applications; you can set it through <code>SPARK_JAVA_OPTS</code> in <code>spark-env.sh</code>.
+  </td>
+</tr>
 </table>
 
 ## Viewing Spark Properties