You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/04/14 22:32:31 UTC

spark git commit: SPARK-1706: Allow multiple executors per worker in Standalone mode

Repository: spark
Updated Branches:
  refs/heads/master 25998e4d7 -> 8f8dc45f6


SPARK-1706: Allow multiple executors per worker in Standalone mode

resubmit of https://github.com/apache/spark/pull/636  for a totally different algorithm

https://issues.apache.org/jira/browse/SPARK-1706

In current implementation, the user has to start multiple workers in a server for starting multiple executors in a server, which introduces additional overhead due to the more JVM processes...

In this patch, I changed the scheduling logic in master to enable the user to start multiple executor processes within the same JVM process.

1. user configure spark.executor.maxCoreNumPerExecutor to suggest the maximum core he/she would like to allocate to each executor

2. Master assigns the executors to the workers with the major consideration on the memoryPerExecutor and the worker.freeMemory, and tries to allocate as many as possible cores to the executor ```min(min(memoryPerExecutor, worker.freeCore), maxLeftCoreToAssign)``` where ```maxLeftCoreToAssign = maxExecutorCanAssign * maxCoreNumPerExecutor```

---------------------------------------

Other small changes include

change memoryPerSlave in ApplicationDescription to memoryPerExecutor, as "Slave" is overrided to represent both worker and executor in the documents... (we have some discussion on this before?)

Author: CodingCat <zh...@gmail.com>

Closes #731 from CodingCat/SPARK-1706-2 and squashes the following commits:

6dee808 [CodingCat] change filter predicate
fbeb7e5 [CodingCat] address the comments
940cb42 [CodingCat] avoid unnecessary allocation
b8ca561 [CodingCat] revert a change
45967b4 [CodingCat] remove unused method
2eeff77 [CodingCat] stylistic fixes
12a1b32 [CodingCat] change the semantic of coresPerExecutor to exact core number
f035423 [CodingCat] stylistic fix
d9c1685 [CodingCat] remove unused var
f595bd6 [CodingCat] recover some unintentional changes
63b3df9 [CodingCat] change the description of the parameter in the submit script
4cf61f1 [CodingCat] improve the code and docs
ff011e2 [CodingCat] start multiple executors on the worker by rewriting startExeuctor logic
2c2bcc5 [CodingCat] fix wrong usage info
497ec2c [CodingCat] address andrew's comments
878402c [CodingCat] change the launching executor code
f64a28d [CodingCat] typo fix
387f4ec [CodingCat] bug fix
35c462c [CodingCat] address Andrew's comments
0b64fea [CodingCat] fix compilation issue
19d3da7 [CodingCat] address the comments
5b81466 [CodingCat] remove outdated comments
ec7d421 [CodingCat] test commit
e5efabb [CodingCat] more java docs and consolidate canUse function
a26096d [CodingCat] stylistic fix
a5d629a [CodingCat] java doc
b34ec0c [CodingCat] make master support multiple executors per worker


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8f8dc45f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8f8dc45f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8f8dc45f

Branch: refs/heads/master
Commit: 8f8dc45f6d4c8d7b740eaa3d2ea09d0b531af9dd
Parents: 25998e4
Author: CodingCat <zh...@gmail.com>
Authored: Tue Apr 14 13:32:06 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Tue Apr 14 13:32:06 2015 -0700

----------------------------------------------------------------------
 .../spark/deploy/ApplicationDescription.scala   |   9 +-
 .../org/apache/spark/deploy/JsonProtocol.scala  |   4 +-
 .../org/apache/spark/deploy/SparkSubmit.scala   |   2 +
 .../spark/deploy/SparkSubmitArguments.scala     |   5 +-
 .../spark/deploy/master/ApplicationInfo.scala   |   8 +-
 .../org/apache/spark/deploy/master/Master.scala | 117 ++++++++++---------
 .../deploy/master/ui/ApplicationPage.scala      |   2 +-
 .../spark/deploy/master/ui/MasterPage.scala     |   4 +-
 .../cluster/SparkDeploySchedulerBackend.scala   |   7 +-
 docs/configuration.md                           |  11 ++
 10 files changed, 96 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8f8dc45f/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
index b7ae9c1..ae99432 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
@@ -22,12 +22,13 @@ import java.net.URI
 private[spark] class ApplicationDescription(
     val name: String,
     val maxCores: Option[Int],
-    val memoryPerSlave: Int,
+    val memoryPerExecutorMB: Int,
     val command: Command,
     var appUiUrl: String,
     val eventLogDir: Option[URI] = None,
     // short name of compression codec used when writing event logs, if any (e.g. lzf)
-    val eventLogCodec: Option[String] = None)
+    val eventLogCodec: Option[String] = None,
+    val coresPerExecutor: Option[Int] = None)
   extends Serializable {
 
   val user = System.getProperty("user.name", "<unknown>")
@@ -35,13 +36,13 @@ private[spark] class ApplicationDescription(
   def copy(
       name: String = name,
       maxCores: Option[Int] = maxCores,
-      memoryPerSlave: Int = memoryPerSlave,
+      memoryPerExecutorMB: Int = memoryPerExecutorMB,
       command: Command = command,
       appUiUrl: String = appUiUrl,
       eventLogDir: Option[URI] = eventLogDir,
       eventLogCodec: Option[String] = eventLogCodec): ApplicationDescription =
     new ApplicationDescription(
-      name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir, eventLogCodec)
+      name, maxCores, memoryPerExecutorMB, command, appUiUrl, eventLogDir, eventLogCodec)
 
   override def toString: String = "ApplicationDescription(" + name + ")"
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8f8dc45f/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
index dfc5b97..2954f93 100644
--- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
@@ -46,7 +46,7 @@ private[deploy] object JsonProtocol {
     ("name" -> obj.desc.name) ~
     ("cores" -> obj.desc.maxCores) ~
     ("user" ->  obj.desc.user) ~
-    ("memoryperslave" -> obj.desc.memoryPerSlave) ~
+    ("memoryperslave" -> obj.desc.memoryPerExecutorMB) ~
     ("submitdate" -> obj.submitDate.toString) ~
     ("state" -> obj.state.toString) ~
     ("duration" -> obj.duration)
@@ -55,7 +55,7 @@ private[deploy] object JsonProtocol {
   def writeApplicationDescription(obj: ApplicationDescription): JObject = {
     ("name" -> obj.name) ~
     ("cores" -> obj.maxCores) ~
-    ("memoryperslave" -> obj.memoryPerSlave) ~
+    ("memoryperslave" -> obj.memoryPerExecutorMB) ~
     ("user" -> obj.user) ~
     ("command" -> obj.command.toString)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/8f8dc45f/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 60bc243..296a076 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -406,6 +406,8 @@ object SparkSubmit {
       OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"),
 
       // Other options
+      OptionAssigner(args.executorCores, STANDALONE, ALL_DEPLOY_MODES,
+        sysProp = "spark.executor.cores"),
       OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES,
         sysProp = "spark.executor.memory"),
       OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES,

http://git-wip-us.apache.org/repos/asf/spark/blob/8f8dc45f/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 03ecf3f..faa8780 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -482,10 +482,13 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
         | Spark standalone and Mesos only:
         |  --total-executor-cores NUM  Total cores for all executors.
         |
+        | Spark standalone and YARN only:
+        |  --executor-cores NUM        Number of cores per executor. (Default: 1 in YARN mode,
+        |                              or all available cores on the worker in standalone mode)
+        |
         | YARN-only:
         |  --driver-cores NUM          Number of cores used by the driver, only in cluster mode
         |                              (Default: 1).
-        |  --executor-cores NUM        Number of cores per executor (Default: 1).
         |  --queue QUEUE_NAME          The YARN queue to submit to (Default: "default").
         |  --num-executors NUM         Number of executors to launch (Default: 2).
         |  --archives ARCHIVES         Comma separated list of archives to be extracted into the

http://git-wip-us.apache.org/repos/asf/spark/blob/8f8dc45f/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index bc5b293..f59d550 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -75,9 +75,11 @@ private[deploy] class ApplicationInfo(
     }
   }
 
-  private[master] def addExecutor(worker: WorkerInfo, cores: Int, useID: Option[Int] = None): 
-  ExecutorDesc = {
-    val exec = new ExecutorDesc(newExecutorId(useID), this, worker, cores, desc.memoryPerSlave)
+  private[master] def addExecutor(
+      worker: WorkerInfo,
+      cores: Int,
+      useID: Option[Int] = None): ExecutorDesc = {
+    val exec = new ExecutorDesc(newExecutorId(useID), this, worker, cores, desc.memoryPerExecutorMB)
     executors(exec.id) = exec
     coresGranted += cores
     exec

http://git-wip-us.apache.org/repos/asf/spark/blob/8f8dc45f/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 9a5d587..c5a6b1b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -524,52 +524,28 @@ private[master] class Master(
   }
 
   /**
-   * Can an app use the given worker? True if the worker has enough memory and we haven't already
-   * launched an executor for the app on it (right now the standalone backend doesn't like having
-   * two executors on the same worker).
-   */
-  private def canUse(app: ApplicationInfo, worker: WorkerInfo): Boolean = {
-    worker.memoryFree >= app.desc.memoryPerSlave && !worker.hasExecutor(app)
-  }
-
-  /**
-   * Schedule the currently available resources among waiting apps. This method will be called
-   * every time a new app joins or resource availability changes.
+   * Schedule executors to be launched on the workers.
+   *
+   * There are two modes of launching executors. The first attempts to spread out an application's
+   * executors on as many workers as possible, while the second does the opposite (i.e. launch them
+   * on as few workers as possible). The former is usually better for data locality purposes and is
+   * the default.
+   *
+   * The number of cores assigned to each executor is configurable. When this is explicitly set,
+   * multiple executors from the same application may be launched on the same worker if the worker
+   * has enough cores and memory. Otherwise, each executor grabs all the cores available on the
+   * worker by default, in which case only one executor may be launched on each worker.
    */
-  private def schedule() {
-    if (state != RecoveryState.ALIVE) { return }
-
-    // First schedule drivers, they take strict precedence over applications
-    // Randomization helps balance drivers
-    val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
-    val numWorkersAlive = shuffledAliveWorkers.size
-    var curPos = 0
-
-    for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
-      // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
-      // start from the last worker that was assigned a driver, and continue onwards until we have
-      // explored all alive workers.
-      var launched = false
-      var numWorkersVisited = 0
-      while (numWorkersVisited < numWorkersAlive && !launched) {
-        val worker = shuffledAliveWorkers(curPos)
-        numWorkersVisited += 1
-        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
-          launchDriver(worker, driver)
-          waitingDrivers -= driver
-          launched = true
-        }
-        curPos = (curPos + 1) % numWorkersAlive
-      }
-    }
-
+  private def startExecutorsOnWorkers(): Unit = {
     // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
     // in the queue, then the second app, etc.
     if (spreadOutApps) {
-      // Try to spread out each app among all the nodes, until it has all its cores
+      // Try to spread out each app among all the workers, until it has all its cores
       for (app <- waitingApps if app.coresLeft > 0) {
         val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
-          .filter(canUse(app, _)).sortBy(_.coresFree).reverse
+          .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
+            worker.coresFree >= app.desc.coresPerExecutor.getOrElse(1))
+          .sortBy(_.coresFree).reverse
         val numUsable = usableWorkers.length
         val assigned = new Array[Int](numUsable) // Number of cores to give on each node
         var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
@@ -582,32 +558,61 @@ private[master] class Master(
           pos = (pos + 1) % numUsable
         }
         // Now that we've decided how many cores to give on each node, let's actually give them
-        for (pos <- 0 until numUsable) {
-          if (assigned(pos) > 0) {
-            val exec = app.addExecutor(usableWorkers(pos), assigned(pos))
-            launchExecutor(usableWorkers(pos), exec)
-            app.state = ApplicationState.RUNNING
-          }
+        for (pos <- 0 until numUsable if assigned(pos) > 0) {
+          allocateWorkerResourceToExecutors(app, assigned(pos), usableWorkers(pos))
         }
       }
     } else {
-      // Pack each app into as few nodes as possible until we've assigned all its cores
+      // Pack each app into as few workers as possible until we've assigned all its cores
       for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
         for (app <- waitingApps if app.coresLeft > 0) {
-          if (canUse(app, worker)) {
-            val coresToUse = math.min(worker.coresFree, app.coresLeft)
-            if (coresToUse > 0) {
-              val exec = app.addExecutor(worker, coresToUse)
-              launchExecutor(worker, exec)
-              app.state = ApplicationState.RUNNING
-            }
-          }
+          allocateWorkerResourceToExecutors(app, app.coresLeft, worker)
+        }
+      }
+    }
+  }
+
+  /**
+   * Allocate a worker's resources to one or more executors.
+   * @param app the info of the application which the executors belong to
+   * @param coresToAllocate cores on this worker to be allocated to this application
+   * @param worker the worker info
+   */
+  private def allocateWorkerResourceToExecutors(
+      app: ApplicationInfo,
+      coresToAllocate: Int,
+      worker: WorkerInfo): Unit = {
+    val memoryPerExecutor = app.desc.memoryPerExecutorMB
+    val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(coresToAllocate)
+    var coresLeft = coresToAllocate
+    while (coresLeft >= coresPerExecutor && worker.memoryFree >= memoryPerExecutor) {
+      val exec = app.addExecutor(worker, coresPerExecutor)
+      coresLeft -= coresPerExecutor
+      launchExecutor(worker, exec)
+      app.state = ApplicationState.RUNNING
+    }
+  }
+
+  /**
+   * Schedule the currently available resources among waiting apps. This method will be called
+   * every time a new app joins or resource availability changes.
+   */
+  private def schedule(): Unit = {
+    if (state != RecoveryState.ALIVE) { return }
+    // Drivers take strict precedence over executors
+    val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers
+    for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
+      for (driver <- waitingDrivers) {
+        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
+          launchDriver(worker, driver)
+          waitingDrivers -= driver
         }
       }
     }
+    startExecutorsOnWorkers()
   }
 
-  private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) {
+  private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
     logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
     worker.addExecutor(exec)
     worker.actor ! LaunchExecutor(masterUrl,

http://git-wip-us.apache.org/repos/asf/spark/blob/8f8dc45f/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
index 761aa8f..273f077 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
@@ -94,7 +94,7 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app")
             </li>
             <li>
               <strong>Executor Memory:</strong>
-              {Utils.megabytesToString(app.desc.memoryPerSlave)}
+              {Utils.megabytesToString(app.desc.memoryPerExecutorMB)}
             </li>
             <li><strong>Submit Date:</strong> {app.submitDate}</li>
             <li><strong>State:</strong> {app.state}</li>

http://git-wip-us.apache.org/repos/asf/spark/blob/8f8dc45f/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
index 45412a3..399f073 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
@@ -208,8 +208,8 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
       <td>
         {app.coresGranted}
       </td>
-      <td sorttable_customkey={app.desc.memoryPerSlave.toString}>
-        {Utils.megabytesToString(app.desc.memoryPerSlave)}
+      <td sorttable_customkey={app.desc.memoryPerExecutorMB.toString}>
+        {Utils.megabytesToString(app.desc.memoryPerExecutorMB)}
       </td>
       <td>{UIUtils.formatDate(app.submitDate)}</td>
       <td>{app.desc.user}</td>

http://git-wip-us.apache.org/repos/asf/spark/blob/8f8dc45f/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 7eb3fdc..ed5b7c1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -82,12 +82,11 @@ private[spark] class SparkDeploySchedulerBackend(
     val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
       args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
     val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
-    val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
-      appUIAddress, sc.eventLogDir, sc.eventLogCodec)
-
+    val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
+    val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,
+      command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)
     client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
     client.start()
-
     waitForRegistration()
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8f8dc45f/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 7169ec2..d9e9e67 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -724,6 +724,17 @@ Apart from these, the following properties are also available, and may be useful
   </td>
 </tr>
 <tr>
+  <td><code>spark.executor.cores</code></td>
+  <td>1 in YARN mode, all the available cores on the worker in standalone mode.</td>
+  <td>
+    The number of cores to use on each executor. For YARN and standalone mode only.
+    
+    In standalone mode, setting this parameter allows an application to run multiple executors on 
+    the same worker, provided that there are enough cores on that worker. Otherwise, only one 
+    executor per application will run on each worker.
+  </td>
+</tr>
+<tr>
   <td><code>spark.default.parallelism</code></td>
   <td>
     For distributed shuffle operations like <code>reduceByKey</code> and <code>join</code>, the


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org