You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/01/16 18:17:13 UTC

spark git commit: [SPARK-1507][YARN]specify # cores for ApplicationMaster

Repository: spark
Updated Branches:
  refs/heads/master a79a9f923 -> 2be82b1e6


[SPARK-1507][YARN]specify # cores for ApplicationMaster

Based on top of changes in https://github.com/apache/spark/pull/3806.

https://issues.apache.org/jira/browse/SPARK-1507

`--driver-cores` and `spark.driver.cores` for all cluster modes and `spark.yarn.am.cores` for yarn client mode.

Author: WangTaoTheTonic <ba...@aliyun.com>
Author: WangTao <ba...@aliyun.com>

Closes #4018 from WangTaoTheTonic/SPARK-1507 and squashes the following commits:

01419d3 [WangTaoTheTonic] amend the args name
b255795 [WangTaoTheTonic] indet thing
d86557c [WangTaoTheTonic] some comments amend
43c9392 [WangTao] fix compile error
b39a100 [WangTao] specify # cores for ApplicationMaster


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2be82b1e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2be82b1e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2be82b1e

Branch: refs/heads/master
Commit: 2be82b1e66cd188456bbf1e5abb13af04d1629d5
Parents: a79a9f9
Author: WangTaoTheTonic <ba...@aliyun.com>
Authored: Fri Jan 16 09:16:56 2015 -0800
Committer: Andrew Or <an...@databricks.com>
Committed: Fri Jan 16 09:16:56 2015 -0800

----------------------------------------------------------------------
 .../apache/spark/deploy/ClientArguments.scala   |  6 ++---
 .../org/apache/spark/deploy/SparkSubmit.scala   |  1 +
 .../spark/deploy/SparkSubmitArguments.scala     |  5 ++++
 docs/configuration.md                           | 15 ++++++++----
 docs/running-on-yarn.md                         | 17 ++++++++++++++
 .../org/apache/spark/deploy/yarn/Client.scala   |  1 +
 .../spark/deploy/yarn/ClientArguments.scala     | 24 ++++++++++++++++----
 7 files changed, 58 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2be82b1e/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
index 2e1e529..e5873ce 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
@@ -23,7 +23,7 @@ import scala.collection.mutable.ListBuffer
 
 import org.apache.log4j.Level
 
-import org.apache.spark.util.MemoryParam
+import org.apache.spark.util.{IntParam, MemoryParam}
 
 /**
  * Command-line parser for the driver client.
@@ -51,8 +51,8 @@ private[spark] class ClientArguments(args: Array[String]) {
   parse(args.toList)
 
   def parse(args: List[String]): Unit = args match {
-    case ("--cores" | "-c") :: value :: tail =>
-      cores = value.toInt
+    case ("--cores" | "-c") :: IntParam(value) :: tail =>
+      cores = value
       parse(tail)
 
     case ("--memory" | "-m") :: MemoryParam(value) :: tail =>

http://git-wip-us.apache.org/repos/asf/spark/blob/2be82b1e/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 955cbd6..050ba91 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -200,6 +200,7 @@ object SparkSubmit {
       // Yarn cluster only
       OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name"),
       OptionAssigner(args.driverMemory, YARN, CLUSTER, clOption = "--driver-memory"),
+      OptionAssigner(args.driverCores, YARN, CLUSTER, clOption = "--driver-cores"),
       OptionAssigner(args.queue, YARN, CLUSTER, clOption = "--queue"),
       OptionAssigner(args.numExecutors, YARN, CLUSTER, clOption = "--num-executors"),
       OptionAssigner(args.executorMemory, YARN, CLUSTER, clOption = "--executor-memory"),

http://git-wip-us.apache.org/repos/asf/spark/blob/2be82b1e/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 47059b0..81ec08c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -108,6 +108,9 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
       .orElse(sparkProperties.get("spark.driver.memory"))
       .orElse(env.get("SPARK_DRIVER_MEMORY"))
       .orNull
+    driverCores = Option(driverCores)
+      .orElse(sparkProperties.get("spark.driver.cores"))
+      .orNull
     executorMemory = Option(executorMemory)
       .orElse(sparkProperties.get("spark.executor.memory"))
       .orElse(env.get("SPARK_EXECUTOR_MEMORY"))
@@ -406,6 +409,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
         |  --total-executor-cores NUM  Total cores for all executors.
         |
         | YARN-only:
+        |  --driver-cores NUM          Number of cores used by the driver, only in cluster mode
+        |                              (Default: 1).
         |  --executor-cores NUM        Number of cores per executor (Default: 1).
         |  --queue QUEUE_NAME          The YARN queue to submit to (Default: "default").
         |  --num-executors NUM         Number of executors to launch (Default: 2).

http://git-wip-us.apache.org/repos/asf/spark/blob/2be82b1e/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 673cdb3..efbab40 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -102,11 +102,10 @@ of the most common options to set are:
   </td>
 </tr>
 <tr>
-  <td><code>spark.executor.memory</code></td>
-  <td>512m</td>
+  <td><code>spark.driver.cores</code></td>
+  <td>1</td>
   <td>
-    Amount of memory to use per executor process, in the same format as JVM memory strings
-    (e.g. <code>512m</code>, <code>2g</code>).
+    Number of cores to use for the driver process, only in cluster mode.
   </td>
 </tr>
 <tr>
@@ -118,6 +117,14 @@ of the most common options to set are:
   </td>
 </tr>
 <tr>
+  <td><code>spark.executor.memory</code></td>
+  <td>512m</td>
+  <td>
+    Amount of memory to use per executor process, in the same format as JVM memory strings
+    (e.g. <code>512m</code>, <code>2g</code>).
+  </td>
+</tr>
+<tr>
   <td><code>spark.driver.maxResultSize</code></td>
   <td>1g</td>
   <td>

http://git-wip-us.apache.org/repos/asf/spark/blob/2be82b1e/docs/running-on-yarn.md
----------------------------------------------------------------------
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 4f27309..68ab127 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -30,6 +30,23 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
   </td>
 </tr>
 <tr>
+  <td><code>spark.driver.cores</code></td>
+  <td>1</td>
+  <td>
+    Number of cores used by the driver in YARN cluster mode.
+    Since the driver is run in the same JVM as the YARN Application Master in cluster mode, this also controls the cores used by the YARN AM.
+    In client mode, use <code>spark.yarn.am.cores</code> to control the number of cores used by the YARN AM instead.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.yarn.am.cores</code></td>
+  <td>1</td>
+  <td>
+    Number of cores to use for the YARN Application Master in client mode.
+    In cluster mode, use <code>spark.driver.cores</code> instead.
+  </td>
+</tr>
+<tr>
   <td><code>spark.yarn.am.waitTime</code></td>
   <td>100000</td>
   <td>

http://git-wip-us.apache.org/repos/asf/spark/blob/2be82b1e/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 0321063..d4eeccf 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -127,6 +127,7 @@ private[spark] class Client(
     }
     val capability = Records.newRecord(classOf[Resource])
     capability.setMemory(args.amMemory + amMemoryOverhead)
+    capability.setVirtualCores(args.amCores)
     appContext.setResource(capability)
     appContext
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/2be82b1e/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 461a9cc..79bead7 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -36,14 +36,18 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
   var numExecutors = DEFAULT_NUMBER_EXECUTORS
   var amQueue = sparkConf.get("spark.yarn.queue", "default")
   var amMemory: Int = 512 // MB
+  var amCores: Int = 1
   var appName: String = "Spark"
   var priority = 0
   def isClusterMode: Boolean = userClass != null
 
   private var driverMemory: Int = 512 // MB
+  private var driverCores: Int = 1
   private val driverMemOverheadKey = "spark.yarn.driver.memoryOverhead"
   private val amMemKey = "spark.yarn.am.memory"
   private val amMemOverheadKey = "spark.yarn.am.memoryOverhead"
+  private val driverCoresKey = "spark.driver.cores"
+  private val amCoresKey = "spark.yarn.am.cores"
   private val isDynamicAllocationEnabled =
     sparkConf.getBoolean("spark.dynamicAllocation.enabled", false)
 
@@ -92,19 +96,25 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
         "You must specify at least 1 executor!\n" + getUsageMessage())
     }
     if (isClusterMode) {
-      for (key <- Seq(amMemKey, amMemOverheadKey)) {
+      for (key <- Seq(amMemKey, amMemOverheadKey, amCoresKey)) {
         if (sparkConf.contains(key)) {
           println(s"$key is set but does not apply in cluster mode.")
         }
       }
       amMemory = driverMemory
+      amCores = driverCores
     } else {
-      if (sparkConf.contains(driverMemOverheadKey)) {
-        println(s"$driverMemOverheadKey is set but does not apply in client mode.")
+      for (key <- Seq(driverMemOverheadKey, driverCoresKey)) {
+        if (sparkConf.contains(key)) {
+          println(s"$key is set but does not apply in client mode.")
+        }
       }
       sparkConf.getOption(amMemKey)
         .map(Utils.memoryStringToMb)
         .foreach { mem => amMemory = mem }
+      sparkConf.getOption(amCoresKey)
+        .map(_.toInt)
+        .foreach { cores => amCores = cores }
     }
   }
 
@@ -140,6 +150,10 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
           driverMemory = value
           args = tail
 
+        case ("--driver-cores") :: IntParam(value) :: tail =>
+          driverCores = value
+          args = tail
+
         case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail =>
           if (args(0) == "--num-workers") {
             println("--num-workers is deprecated. Use --num-executors instead.")
@@ -198,7 +212,8 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
 
   private def getUsageMessage(unknownParam: List[String] = null): String = {
     val message = if (unknownParam != null) s"Unknown/unsupported param $unknownParam\n" else ""
-    message + """
+    message +
+      """
       |Usage: org.apache.spark.deploy.yarn.Client [options]
       |Options:
       |  --jar JAR_PATH           Path to your application's JAR file (required in yarn-cluster
@@ -209,6 +224,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
       |  --num-executors NUM      Number of executors to start (Default: 2)
       |  --executor-cores NUM     Number of cores for the executors (Default: 1).
       |  --driver-memory MEM      Memory for driver (e.g. 1000M, 2G) (Default: 512 Mb)
+      |  --driver-cores NUM       Number of cores used by the driver (Default: 1).
       |  --executor-memory MEM    Memory per executor (e.g. 1000M, 2G) (Default: 1G)
       |  --name NAME              The name of your application (Default: Spark)
       |  --queue QUEUE            The hadoop queue to use for allocation requests (Default:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org