You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/10/10 21:20:21 UTC

spark git commit: [SPARK-14082][MESOS] Enable GPU support with Mesos

Repository: spark
Updated Branches:
  refs/heads/master 3f8a0222e -> 29f186bfd


[SPARK-14082][MESOS] Enable GPU support with Mesos

## What changes were proposed in this pull request?

Enable GPU resources to be used when running coarse grain mode with Mesos.

## How was this patch tested?

Manual test with GPU.

Author: Timothy Chen <tn...@gmail.com>

Closes #14644 from tnachen/gpu_mesos.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/29f186bf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/29f186bf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/29f186bf

Branch: refs/heads/master
Commit: 29f186bfdf929b1e8ffd8e33ee37b76d5dc5af53
Parents: 3f8a022
Author: Timothy Chen <tn...@gmail.com>
Authored: Mon Oct 10 23:20:15 2016 +0200
Committer: Sean Owen <so...@cloudera.com>
Committed: Mon Oct 10 23:20:15 2016 +0200

----------------------------------------------------------------------
 docs/running-on-mesos.md                        |  9 +++
 .../MesosCoarseGrainedSchedulerBackend.scala    | 30 ++++++++--
 .../cluster/mesos/MesosSchedulerUtils.scala     |  5 ++
 ...esosCoarseGrainedSchedulerBackendSuite.scala | 61 +++++++++++++++-----
 .../spark/scheduler/cluster/mesos/Utils.scala   | 14 +++--
 5 files changed, 96 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/29f186bf/docs/running-on-mesos.md
----------------------------------------------------------------------
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 173961d..77b06fc 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -498,6 +498,15 @@ See the [configuration page](configuration.html) for information on Spark config
     in the history server.
   </td>
 </tr>
+<tr>
+  <td><code>spark.mesos.gpus.max</code></td>
+  <td><code>0</code></td>
+  <td>
+    Set the maximum number GPU resources to acquire for this job. Note that executors will still launch when no GPU resources are found
+    since this configuration is just a upper limit and not a guaranteed amount.
+  </td>
+</tr>
+
 
 </table>
 

http://git-wip-us.apache.org/repos/asf/spark/blob/29f186bf/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index a64b576..e67bf3e 100644
--- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -59,6 +59,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   // Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
   val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt
 
+  val maxGpus = conf.getInt("spark.mesos.gpus.max", 0)
+
   private[this] val shutdownTimeoutMS =
     conf.getTimeAsMs("spark.mesos.coarse.shutdownTimeout", "10s")
       .ensuring(_ >= 0, "spark.mesos.coarse.shutdownTimeout must be >= 0")
@@ -72,7 +74,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
 
   // Cores we have acquired with each Mesos task ID
   val coresByTaskId = new mutable.HashMap[String, Int]
+  val gpusByTaskId = new mutable.HashMap[String, Int]
   var totalCoresAcquired = 0
+  var totalGpusAcquired = 0
 
   // SlaveID -> Slave
   // This map accumulates entries for the duration of the job.  Slaves are never deleted, because
@@ -396,6 +400,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
           launchTasks = true
           val taskId = newMesosTaskId()
           val offerCPUs = getResource(resources, "cpus").toInt
+          val taskGPUs = Math.min(
+            Math.max(0, maxGpus - totalGpusAcquired), getResource(resources, "gpus").toInt)
 
           val taskCPUs = executorCores(offerCPUs)
           val taskMemory = executorMemory(sc)
@@ -403,7 +409,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
           slaves.getOrElseUpdate(slaveId, new Slave(offer.getHostname)).taskIDs.add(taskId)
 
           val (resourcesLeft, resourcesToUse) =
-            partitionTaskResources(resources, taskCPUs, taskMemory)
+            partitionTaskResources(resources, taskCPUs, taskMemory, taskGPUs)
 
           val taskBuilder = MesosTaskInfo.newBuilder()
             .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
@@ -425,6 +431,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
           remainingResources(offerId) = resourcesLeft.asJava
           totalCoresAcquired += taskCPUs
           coresByTaskId(taskId) = taskCPUs
+          if (taskGPUs > 0) {
+            totalGpusAcquired += taskGPUs
+            gpusByTaskId(taskId) = taskGPUs
+          }
         }
       }
     }
@@ -432,21 +442,28 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   }
 
   /** Extracts task needed resources from a list of available resources. */
-  private def partitionTaskResources(resources: JList[Resource], taskCPUs: Int, taskMemory: Int)
+  private def partitionTaskResources(
+      resources: JList[Resource],
+      taskCPUs: Int,
+      taskMemory: Int,
+      taskGPUs: Int)
     : (List[Resource], List[Resource]) = {
 
     // partition cpus & mem
     val (afterCPUResources, cpuResourcesToUse) = partitionResources(resources, "cpus", taskCPUs)
     val (afterMemResources, memResourcesToUse) =
       partitionResources(afterCPUResources.asJava, "mem", taskMemory)
+    val (afterGPUResources, gpuResourcesToUse) =
+      partitionResources(afterMemResources.asJava, "gpus", taskGPUs)
 
     // If user specifies port numbers in SparkConfig then consecutive tasks will not be launched
     // on the same host. This essentially means one executor per host.
     // TODO: handle network isolator case
     val (nonPortResources, portResourcesToUse) =
-      partitionPortResources(nonZeroPortValuesFromConfig(sc.conf), afterMemResources)
+      partitionPortResources(nonZeroPortValuesFromConfig(sc.conf), afterGPUResources)
 
-    (nonPortResources, cpuResourcesToUse ++ memResourcesToUse ++ portResourcesToUse)
+    (nonPortResources,
+      cpuResourcesToUse ++ memResourcesToUse ++ portResourcesToUse ++ gpuResourcesToUse)
   }
 
   private def canLaunchTask(slaveId: String, resources: JList[Resource]): Boolean = {
@@ -513,6 +530,11 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
           totalCoresAcquired -= cores
           coresByTaskId -= taskId
         }
+        // Also remove the gpus we have remembered for this task, if it's in the hashmap
+        for (gpus <- gpusByTaskId.get(taskId)) {
+          totalGpusAcquired -= gpus
+          gpusByTaskId -= taskId
+        }
         // If it was a failure, mark the slave as failed for blacklisting purposes
         if (TaskState.isFailed(state)) {
           slave.taskFailures += 1

http://git-wip-us.apache.org/repos/asf/spark/blob/29f186bf/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
----------------------------------------------------------------------
diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 2963d16..73cc241 100644
--- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -27,6 +27,7 @@ import scala.util.control.NonFatal
 import com.google.common.base.Splitter
 import org.apache.mesos.{MesosSchedulerDriver, Protos, Scheduler, SchedulerDriver}
 import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
+import org.apache.mesos.Protos.FrameworkInfo.Capability
 import org.apache.mesos.protobuf.{ByteString, GeneratedMessage}
 
 import org.apache.spark.{SparkConf, SparkContext, SparkException}
@@ -93,6 +94,10 @@ trait MesosSchedulerUtils extends Logging {
     conf.getOption("spark.mesos.role").foreach { role =>
       fwInfoBuilder.setRole(role)
     }
+    val maxGpus = conf.getInt("spark.mesos.gpus.max", 0)
+    if (maxGpus > 0) {
+      fwInfoBuilder.addCapabilities(Capability.newBuilder().setType(Capability.Type.GPU_RESOURCES))
+    }
     if (credBuilder.hasPrincipal) {
       new MesosSchedulerDriver(
         scheduler, fwInfoBuilder.build(), masterUrl, credBuilder.build())

http://git-wip-us.apache.org/repos/asf/spark/blob/29f186bf/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index c3ab488..75ba02e 100644
--- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -67,7 +67,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
 
     val minMem = backend.executorMemory(sc)
     val minCpu = 4
-    val offers = List((minMem, minCpu))
+    val offers = List(Resources(minMem, minCpu))
 
     // launches a task on a valid offer
     offerResources(offers)
@@ -95,8 +95,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
     // launches a task on a valid offer
     val minMem = backend.executorMemory(sc) + 1024
     val minCpu = 4
-    val offer1 = (minMem, minCpu)
-    val offer2 = (minMem, 1)
+    val offer1 = Resources(minMem, minCpu)
+    val offer2 = Resources(minMem, 1)
     offerResources(List(offer1, offer2))
     verifyTaskLaunched(driver, "o1")
 
@@ -115,7 +115,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
     setBackend(Map("spark.executor.cores" -> executorCores.toString))
 
     val executorMemory = backend.executorMemory(sc)
-    val offers = List((executorMemory * 2, executorCores + 1))
+    val offers = List(Resources(executorMemory * 2, executorCores + 1))
     offerResources(offers)
 
     val taskInfos = verifyTaskLaunched(driver, "o1")
@@ -130,7 +130,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
 
     val executorMemory = backend.executorMemory(sc)
     val offerCores = 10
-    offerResources(List((executorMemory * 2, offerCores)))
+    offerResources(List(Resources(executorMemory * 2, offerCores)))
 
     val taskInfos = verifyTaskLaunched(driver, "o1")
     assert(taskInfos.length == 1)
@@ -144,7 +144,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
     setBackend(Map("spark.cores.max" -> maxCores.toString))
 
     val executorMemory = backend.executorMemory(sc)
-    offerResources(List((executorMemory, maxCores + 1)))
+    offerResources(List(Resources(executorMemory, maxCores + 1)))
 
     val taskInfos = verifyTaskLaunched(driver, "o1")
     assert(taskInfos.length == 1)
@@ -153,9 +153,38 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
     assert(cpus == maxCores)
   }
 
+  test("mesos does not acquire gpus if not specified") {
+    setBackend()
+
+    val executorMemory = backend.executorMemory(sc)
+    offerResources(List(Resources(executorMemory, 1, 1)))
+
+    val taskInfos = verifyTaskLaunched(driver, "o1")
+    assert(taskInfos.length == 1)
+
+    val gpus = backend.getResource(taskInfos.head.getResourcesList, "gpus")
+    assert(gpus == 0.0)
+  }
+
+
+  test("mesos does not acquire more than spark.mesos.gpus.max") {
+    val maxGpus = 5
+    setBackend(Map("spark.mesos.gpus.max" -> maxGpus.toString))
+
+    val executorMemory = backend.executorMemory(sc)
+    offerResources(List(Resources(executorMemory, 1, maxGpus + 1)))
+
+    val taskInfos = verifyTaskLaunched(driver, "o1")
+    assert(taskInfos.length == 1)
+
+    val gpus = backend.getResource(taskInfos.head.getResourcesList, "gpus")
+    assert(gpus == maxGpus)
+  }
+
+
   test("mesos declines offers that violate attribute constraints") {
     setBackend(Map("spark.mesos.constraints" -> "x:true"))
-    offerResources(List((backend.executorMemory(sc), 4)))
+    offerResources(List(Resources(backend.executorMemory(sc), 4)))
     verifyDeclinedOffer(driver, createOfferId("o1"), true)
   }
 
@@ -165,8 +194,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
 
     val executorMemory = backend.executorMemory(sc)
     offerResources(List(
-      (executorMemory, maxCores + 1),
-      (executorMemory, maxCores + 1)))
+      Resources(executorMemory, maxCores + 1),
+      Resources(executorMemory, maxCores + 1)))
 
     verifyTaskLaunched(driver, "o1")
     verifyDeclinedOffer(driver, createOfferId("o2"), true)
@@ -180,8 +209,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
 
     val executorMemory = backend.executorMemory(sc)
     offerResources(List(
-      (executorMemory * 2, executorCores * 2),
-      (executorMemory * 2, executorCores * 2)))
+      Resources(executorMemory * 2, executorCores * 2),
+      Resources(executorMemory * 2, executorCores * 2)))
 
     verifyTaskLaunched(driver, "o1")
     verifyTaskLaunched(driver, "o2")
@@ -193,7 +222,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
 
     // offer with room for two executors
     val executorMemory = backend.executorMemory(sc)
-    offerResources(List((executorMemory * 2, executorCores * 2)))
+    offerResources(List(Resources(executorMemory * 2, executorCores * 2)))
 
     // verify two executors were started on a single offer
     val taskInfos = verifyTaskLaunched(driver, "o1")
@@ -397,7 +426,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
     setBackend()
 
     // launches a task on a valid offer
-    val offers = List((backend.executorMemory(sc), 1))
+    val offers = List(Resources(backend.executorMemory(sc), 1))
     offerResources(offers)
     verifyTaskLaunched(driver, "o1")
 
@@ -434,6 +463,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
     assert(launchedTasks.head.getCommand.getUrisList.asScala(0).getValue == url)
   }
 
+  private case class Resources(mem: Int, cpus: Int, gpus: Int = 0)
+
   private def verifyDeclinedOffer(driver: SchedulerDriver,
       offerId: OfferID,
       filter: Boolean = false): Unit = {
@@ -444,9 +475,9 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
     }
   }
 
-  private def offerResources(offers: List[(Int, Int)], startId: Int = 1): Unit = {
+  private def offerResources(offers: List[Resources], startId: Int = 1): Unit = {
     val mesosOffers = offers.zipWithIndex.map {case (offer, i) =>
-      createOffer(s"o${i + startId}", s"s${i + startId}", offer._1, offer._2)}
+      createOffer(s"o${i + startId}", s"s${i + startId}", offer.mem, offer.cpus, None, offer.gpus)}
 
     backend.resourceOffers(driver, mesosOffers.asJava)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/29f186bf/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
----------------------------------------------------------------------
diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
index fa9406f..7ebb294 100644
--- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
+++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
@@ -32,8 +32,9 @@ object Utils {
       offerId: String,
       slaveId: String,
       mem: Int,
-      cpu: Int,
-      ports: Option[(Long, Long)] = None): Offer = {
+      cpus: Int,
+      ports: Option[(Long, Long)] = None,
+      gpus: Int = 0): Offer = {
     val builder = Offer.newBuilder()
     builder.addResourcesBuilder()
       .setName("mem")
@@ -42,7 +43,7 @@ object Utils {
     builder.addResourcesBuilder()
       .setName("cpus")
       .setType(Value.Type.SCALAR)
-      .setScalar(Scalar.newBuilder().setValue(cpu))
+      .setScalar(Scalar.newBuilder().setValue(cpus))
     ports.foreach { resourcePorts =>
       builder.addResourcesBuilder()
         .setName("ports")
@@ -50,6 +51,12 @@ object Utils {
         .setRanges(Ranges.newBuilder().addRange(MesosRange.newBuilder()
           .setBegin(resourcePorts._1).setEnd(resourcePorts._2).build()))
     }
+    if (gpus > 0) {
+      builder.addResourcesBuilder()
+        .setName("gpus")
+        .setType(Value.Type.SCALAR)
+        .setScalar(Scalar.newBuilder().setValue(gpus))
+    }
     builder.setId(createOfferId(offerId))
       .setFrameworkId(FrameworkID.newBuilder()
         .setValue("f1"))
@@ -82,4 +89,3 @@ object Utils {
     TaskID.newBuilder().setValue(taskId).build()
   }
 }
-


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org