You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/01/16 18:17:13 UTC
spark git commit: [SPARK-1507][YARN]specify # cores for
ApplicationMaster
Repository: spark
Updated Branches:
refs/heads/master a79a9f923 -> 2be82b1e6
[SPARK-1507][YARN]specify # cores for ApplicationMaster
Based on top of changes in https://github.com/apache/spark/pull/3806.
https://issues.apache.org/jira/browse/SPARK-1507
`--driver-cores` and `spark.driver.cores` for all cluster modes and `spark.yarn.am.cores` for yarn client mode.
Author: WangTaoTheTonic <ba...@aliyun.com>
Author: WangTao <ba...@aliyun.com>
Closes #4018 from WangTaoTheTonic/SPARK-1507 and squashes the following commits:
01419d3 [WangTaoTheTonic] amend the args name
b255795 [WangTaoTheTonic] indet thing
d86557c [WangTaoTheTonic] some comments amend
43c9392 [WangTao] fix compile error
b39a100 [WangTao] specify # cores for ApplicationMaster
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2be82b1e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2be82b1e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2be82b1e
Branch: refs/heads/master
Commit: 2be82b1e66cd188456bbf1e5abb13af04d1629d5
Parents: a79a9f9
Author: WangTaoTheTonic <ba...@aliyun.com>
Authored: Fri Jan 16 09:16:56 2015 -0800
Committer: Andrew Or <an...@databricks.com>
Committed: Fri Jan 16 09:16:56 2015 -0800
----------------------------------------------------------------------
.../apache/spark/deploy/ClientArguments.scala | 6 ++---
.../org/apache/spark/deploy/SparkSubmit.scala | 1 +
.../spark/deploy/SparkSubmitArguments.scala | 5 ++++
docs/configuration.md | 15 ++++++++----
docs/running-on-yarn.md | 17 ++++++++++++++
.../org/apache/spark/deploy/yarn/Client.scala | 1 +
.../spark/deploy/yarn/ClientArguments.scala | 24 ++++++++++++++++----
7 files changed, 58 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/2be82b1e/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
index 2e1e529..e5873ce 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
@@ -23,7 +23,7 @@ import scala.collection.mutable.ListBuffer
import org.apache.log4j.Level
-import org.apache.spark.util.MemoryParam
+import org.apache.spark.util.{IntParam, MemoryParam}
/**
* Command-line parser for the driver client.
@@ -51,8 +51,8 @@ private[spark] class ClientArguments(args: Array[String]) {
parse(args.toList)
def parse(args: List[String]): Unit = args match {
- case ("--cores" | "-c") :: value :: tail =>
- cores = value.toInt
+ case ("--cores" | "-c") :: IntParam(value) :: tail =>
+ cores = value
parse(tail)
case ("--memory" | "-m") :: MemoryParam(value) :: tail =>
http://git-wip-us.apache.org/repos/asf/spark/blob/2be82b1e/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 955cbd6..050ba91 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -200,6 +200,7 @@ object SparkSubmit {
// Yarn cluster only
OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name"),
OptionAssigner(args.driverMemory, YARN, CLUSTER, clOption = "--driver-memory"),
+ OptionAssigner(args.driverCores, YARN, CLUSTER, clOption = "--driver-cores"),
OptionAssigner(args.queue, YARN, CLUSTER, clOption = "--queue"),
OptionAssigner(args.numExecutors, YARN, CLUSTER, clOption = "--num-executors"),
OptionAssigner(args.executorMemory, YARN, CLUSTER, clOption = "--executor-memory"),
http://git-wip-us.apache.org/repos/asf/spark/blob/2be82b1e/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 47059b0..81ec08c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -108,6 +108,9 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
.orElse(sparkProperties.get("spark.driver.memory"))
.orElse(env.get("SPARK_DRIVER_MEMORY"))
.orNull
+ driverCores = Option(driverCores)
+ .orElse(sparkProperties.get("spark.driver.cores"))
+ .orNull
executorMemory = Option(executorMemory)
.orElse(sparkProperties.get("spark.executor.memory"))
.orElse(env.get("SPARK_EXECUTOR_MEMORY"))
@@ -406,6 +409,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
| --total-executor-cores NUM Total cores for all executors.
|
| YARN-only:
+ | --driver-cores NUM Number of cores used by the driver, only in cluster mode
+ | (Default: 1).
| --executor-cores NUM Number of cores per executor (Default: 1).
| --queue QUEUE_NAME The YARN queue to submit to (Default: "default").
| --num-executors NUM Number of executors to launch (Default: 2).
http://git-wip-us.apache.org/repos/asf/spark/blob/2be82b1e/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 673cdb3..efbab40 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -102,11 +102,10 @@ of the most common options to set are:
</td>
</tr>
<tr>
- <td><code>spark.executor.memory</code></td>
- <td>512m</td>
+ <td><code>spark.driver.cores</code></td>
+ <td>1</td>
<td>
- Amount of memory to use per executor process, in the same format as JVM memory strings
- (e.g. <code>512m</code>, <code>2g</code>).
+ Number of cores to use for the driver process, only in cluster mode.
</td>
</tr>
<tr>
@@ -118,6 +117,14 @@ of the most common options to set are:
</td>
</tr>
<tr>
+ <td><code>spark.executor.memory</code></td>
+ <td>512m</td>
+ <td>
+ Amount of memory to use per executor process, in the same format as JVM memory strings
+ (e.g. <code>512m</code>, <code>2g</code>).
+ </td>
+</tr>
+<tr>
<td><code>spark.driver.maxResultSize</code></td>
<td>1g</td>
<td>
http://git-wip-us.apache.org/repos/asf/spark/blob/2be82b1e/docs/running-on-yarn.md
----------------------------------------------------------------------
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 4f27309..68ab127 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -30,6 +30,23 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
</td>
</tr>
<tr>
+ <td><code>spark.driver.cores</code></td>
+ <td>1</td>
+ <td>
+ Number of cores used by the driver in YARN cluster mode.
+ Since the driver is run in the same JVM as the YARN Application Master in cluster mode, this also controls the cores used by the YARN AM.
+ In client mode, use <code>spark.yarn.am.cores</code> to control the number of cores used by the YARN AM instead.
+ </td>
+</tr>
+<tr>
+ <td><code>spark.yarn.am.cores</code></td>
+ <td>1</td>
+ <td>
+ Number of cores to use for the YARN Application Master in client mode.
+ In cluster mode, use <code>spark.driver.cores</code> instead.
+ </td>
+</tr>
+<tr>
<td><code>spark.yarn.am.waitTime</code></td>
<td>100000</td>
<td>
http://git-wip-us.apache.org/repos/asf/spark/blob/2be82b1e/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 0321063..d4eeccf 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -127,6 +127,7 @@ private[spark] class Client(
}
val capability = Records.newRecord(classOf[Resource])
capability.setMemory(args.amMemory + amMemoryOverhead)
+ capability.setVirtualCores(args.amCores)
appContext.setResource(capability)
appContext
}
http://git-wip-us.apache.org/repos/asf/spark/blob/2be82b1e/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 461a9cc..79bead7 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -36,14 +36,18 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
var numExecutors = DEFAULT_NUMBER_EXECUTORS
var amQueue = sparkConf.get("spark.yarn.queue", "default")
var amMemory: Int = 512 // MB
+ var amCores: Int = 1
var appName: String = "Spark"
var priority = 0
def isClusterMode: Boolean = userClass != null
private var driverMemory: Int = 512 // MB
+ private var driverCores: Int = 1
private val driverMemOverheadKey = "spark.yarn.driver.memoryOverhead"
private val amMemKey = "spark.yarn.am.memory"
private val amMemOverheadKey = "spark.yarn.am.memoryOverhead"
+ private val driverCoresKey = "spark.driver.cores"
+ private val amCoresKey = "spark.yarn.am.cores"
private val isDynamicAllocationEnabled =
sparkConf.getBoolean("spark.dynamicAllocation.enabled", false)
@@ -92,19 +96,25 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
"You must specify at least 1 executor!\n" + getUsageMessage())
}
if (isClusterMode) {
- for (key <- Seq(amMemKey, amMemOverheadKey)) {
+ for (key <- Seq(amMemKey, amMemOverheadKey, amCoresKey)) {
if (sparkConf.contains(key)) {
println(s"$key is set but does not apply in cluster mode.")
}
}
amMemory = driverMemory
+ amCores = driverCores
} else {
- if (sparkConf.contains(driverMemOverheadKey)) {
- println(s"$driverMemOverheadKey is set but does not apply in client mode.")
+ for (key <- Seq(driverMemOverheadKey, driverCoresKey)) {
+ if (sparkConf.contains(key)) {
+ println(s"$key is set but does not apply in client mode.")
+ }
}
sparkConf.getOption(amMemKey)
.map(Utils.memoryStringToMb)
.foreach { mem => amMemory = mem }
+ sparkConf.getOption(amCoresKey)
+ .map(_.toInt)
+ .foreach { cores => amCores = cores }
}
}
@@ -140,6 +150,10 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
driverMemory = value
args = tail
+ case ("--driver-cores") :: IntParam(value) :: tail =>
+ driverCores = value
+ args = tail
+
case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail =>
if (args(0) == "--num-workers") {
println("--num-workers is deprecated. Use --num-executors instead.")
@@ -198,7 +212,8 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
private def getUsageMessage(unknownParam: List[String] = null): String = {
val message = if (unknownParam != null) s"Unknown/unsupported param $unknownParam\n" else ""
- message + """
+ message +
+ """
|Usage: org.apache.spark.deploy.yarn.Client [options]
|Options:
| --jar JAR_PATH Path to your application's JAR file (required in yarn-cluster
@@ -209,6 +224,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
| --num-executors NUM Number of executors to start (Default: 2)
| --executor-cores NUM Number of cores for the executors (Default: 1).
| --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 512 Mb)
+ | --driver-cores NUM Number of cores used by the driver (Default: 1).
| --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G)
| --name NAME The name of your application (Default: Spark)
| --queue QUEUE The hadoop queue to use for allocation requests (Default:
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org