You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/01/08 06:30:40 UTC
[3/6] git commit: Add way to limit default # of cores used by
applications on standalone mode
Add way to limit default # of cores used by applications on standalone mode
Also documents the spark.deploy.spreadOut option.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/d8bcc8e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/d8bcc8e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/d8bcc8e9
Branch: refs/heads/master
Commit: d8bcc8e9a095c1b20dd7a17b6535800d39bff80e
Parents: 15d9534
Author: Matei Zaharia <ma...@databricks.com>
Authored: Tue Jan 7 14:35:52 2014 -0500
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Tue Jan 7 14:35:52 2014 -0500
----------------------------------------------------------------------
.../main/scala/org/apache/spark/SparkConf.scala | 7 ++++-
.../scala/org/apache/spark/SparkContext.scala | 2 +-
.../spark/deploy/master/ApplicationInfo.scala | 7 +++--
.../org/apache/spark/deploy/master/Master.scala | 8 +++--
docs/configuration.md | 33 +++++++++++++++++---
docs/css/bootstrap.min.css | 2 +-
docs/job-scheduling.md | 5 ++-
docs/spark-standalone.md | 10 ++++++
8 files changed, 60 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d8bcc8e9/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index b166527..2de3223 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -67,7 +67,7 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable with
/** Set JAR files to distribute to the cluster. */
def setJars(jars: Seq[String]): SparkConf = {
- for (jar <- jars if (jar == null)) logWarning("null jar passed to SparkContext constructor")
+ for (jar <- jars if (jar == null)) logWarning("null jar passed to SparkContext constructor")
set("spark.jars", jars.filter(_ != null).mkString(","))
}
@@ -165,6 +165,11 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable with
getOption(key).map(_.toDouble).getOrElse(defaultValue)
}
+ /** Get a parameter as a boolean, falling back to a default if not set */
+ def getBoolean(key: String, defaultValue: Boolean): Boolean = {
+ getOption(key).map(_.toBoolean).getOrElse(defaultValue)
+ }
+
/** Get all executor environment variables set on this SparkConf */
def getExecutorEnv: Seq[(String, String)] = {
val prefix = "spark.executorEnv."
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d8bcc8e9/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 99dcced..0e47f4e 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -116,7 +116,7 @@ class SparkContext(
throw new SparkException("An application must be set in your configuration")
}
- if (conf.get("spark.log-conf", "false").toBoolean) {
+ if (conf.get("spark.logConf", "false").toBoolean) {
logInfo("Spark configuration:\n" + conf.toDebugString)
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d8bcc8e9/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index 5150b7c..1321d92 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -28,7 +28,8 @@ private[spark] class ApplicationInfo(
val desc: ApplicationDescription,
val submitDate: Date,
val driver: ActorRef,
- val appUiUrl: String)
+ val appUiUrl: String,
+ defaultCores: Int)
extends Serializable {
@transient var state: ApplicationState.Value = _
@@ -81,7 +82,9 @@ private[spark] class ApplicationInfo(
}
}
- def coresLeft: Int = desc.maxCores - coresGranted
+ private val myMaxCores = if (desc.maxCores == Int.MaxValue) defaultCores else desc.maxCores
+
+ def coresLeft: Int = myMaxCores - coresGranted
private var _retryCount = 0
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d8bcc8e9/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 7b696cf..ee01fb1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -88,7 +88,10 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
// As a temporary workaround before better ways of configuring memory, we allow users to set
// a flag that will perform round-robin scheduling across the nodes (spreading out each app
// among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
- val spreadOutApps = conf.get("spark.deploy.spreadOut", "true").toBoolean
+ val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)
+
+ // Default maxCores for applications that don't specify it (i.e. pass Int.MaxValue)
+ val defaultCores = conf.getInt("spark.deploy.defaultCores", Int.MaxValue)
override def preStart() {
logInfo("Starting Spark master at " + masterUrl)
@@ -426,7 +429,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
def createApplication(desc: ApplicationDescription, driver: ActorRef): ApplicationInfo = {
val now = System.currentTimeMillis()
val date = new Date(now)
- new ApplicationInfo(now, newApplicationId(date), desc, date, driver, desc.appUiUrl)
+ new ApplicationInfo(
+ now, newApplicationId(date), desc, date, driver, desc.appUiUrl, defaultCores)
}
def registerApplication(app: ApplicationInfo): Unit = {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d8bcc8e9/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 1d36ecb..52ed59b 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -77,13 +77,14 @@ there are at least five properties that you will commonly want to control:
</tr>
<tr>
<td>spark.cores.max</td>
- <td>(infinite)</td>
+ <td>(not set)</td>
<td>
When running on a <a href="spark-standalone.html">standalone deploy cluster</a> or a
<a href="running-on-mesos.html#mesos-run-modes">Mesos cluster in "coarse-grained"
sharing mode</a>, the maximum amount of CPU cores to request for the application from
- across the cluster (not from each machine). The default will use all available cores
- offered by the cluster manager.
+ across the cluster (not from each machine). If not set, the default will be
+ <code>spark.deploy.defaultCores</code> on Spark's standalone cluster manager, or
+ infinite (all available cores) on Mesos.
</td>
</tr>
</table>
@@ -404,12 +405,36 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
- <td>spark.log-conf</td>
+ <td>spark.logConf</td>
<td>false</td>
<td>
Log the supplied SparkConf as INFO at start of spark context.
</td>
</tr>
+<tr>
+ <td>spark.deploy.spreadOut</td>
+ <td>true</td>
+ <td>
+ Whether the standalone cluster manager should spread applications out across nodes or try
+ to consolidate them onto as few nodes as possible. Spreading out is usually better for
+ data locality in HDFS, but consolidating is more efficient for compute-intensive workloads. <br/>
+ <b>Note:</b> this setting needs to be configured in the cluster master, not in individual
+ applications; you can set it through <code>SPARK_JAVA_OPTS</code> in <code>spark-env.sh</code>.
+ </td>
+</tr>
+<tr>
+ <td>spark.deploy.defaultCores</td>
+ <td>(infinite)</td>
+ <td>
+ Default number of cores to give to applications in Spark's standalone mode if they don't
+ set <code>spark.cores.max</code>. If not set, applications always get all available
+ cores unless they configure <code>spark.cores.max</code> themselves.
+ Set this lower on a shared cluster to prevent users from grabbing
+ the whole cluster by default. <br/>
+ <b>Note:</b> this setting needs to be configured in the cluster master, not in individual
+ applications; you can set it through <code>SPARK_JAVA_OPTS</code> in <code>spark-env.sh</code>.
+ </td>
+</tr>
</table>
## Viewing Spark Properties