You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/10/10 21:20:21 UTC
spark git commit: [SPARK-14082][MESOS] Enable GPU support with Mesos
Repository: spark
Updated Branches:
refs/heads/master 3f8a0222e -> 29f186bfd
[SPARK-14082][MESOS] Enable GPU support with Mesos
## What changes were proposed in this pull request?
Enable GPU resources to be used when running coarse grain mode with Mesos.
## How was this patch tested?
Manual test with GPU.
Author: Timothy Chen <tn...@gmail.com>
Closes #14644 from tnachen/gpu_mesos.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/29f186bf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/29f186bf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/29f186bf
Branch: refs/heads/master
Commit: 29f186bfdf929b1e8ffd8e33ee37b76d5dc5af53
Parents: 3f8a022
Author: Timothy Chen <tn...@gmail.com>
Authored: Mon Oct 10 23:20:15 2016 +0200
Committer: Sean Owen <so...@cloudera.com>
Committed: Mon Oct 10 23:20:15 2016 +0200
----------------------------------------------------------------------
docs/running-on-mesos.md | 9 +++
.../MesosCoarseGrainedSchedulerBackend.scala | 30 ++++++++--
.../cluster/mesos/MesosSchedulerUtils.scala | 5 ++
...esosCoarseGrainedSchedulerBackendSuite.scala | 61 +++++++++++++++-----
.../spark/scheduler/cluster/mesos/Utils.scala | 14 +++--
5 files changed, 96 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/29f186bf/docs/running-on-mesos.md
----------------------------------------------------------------------
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 173961d..77b06fc 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -498,6 +498,15 @@ See the [configuration page](configuration.html) for information on Spark config
in the history server.
</td>
</tr>
+<tr>
+ <td><code>spark.mesos.gpus.max</code></td>
+ <td><code>0</code></td>
+ <td>
+ Set the maximum number GPU resources to acquire for this job. Note that executors will still launch when no GPU resources are found
+ since this configuration is just a upper limit and not a guaranteed amount.
+ </td>
+</tr>
+
</table>
http://git-wip-us.apache.org/repos/asf/spark/blob/29f186bf/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index a64b576..e67bf3e 100644
--- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -59,6 +59,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
// Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt
+ val maxGpus = conf.getInt("spark.mesos.gpus.max", 0)
+
private[this] val shutdownTimeoutMS =
conf.getTimeAsMs("spark.mesos.coarse.shutdownTimeout", "10s")
.ensuring(_ >= 0, "spark.mesos.coarse.shutdownTimeout must be >= 0")
@@ -72,7 +74,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
// Cores we have acquired with each Mesos task ID
val coresByTaskId = new mutable.HashMap[String, Int]
+ val gpusByTaskId = new mutable.HashMap[String, Int]
var totalCoresAcquired = 0
+ var totalGpusAcquired = 0
// SlaveID -> Slave
// This map accumulates entries for the duration of the job. Slaves are never deleted, because
@@ -396,6 +400,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
launchTasks = true
val taskId = newMesosTaskId()
val offerCPUs = getResource(resources, "cpus").toInt
+ val taskGPUs = Math.min(
+ Math.max(0, maxGpus - totalGpusAcquired), getResource(resources, "gpus").toInt)
val taskCPUs = executorCores(offerCPUs)
val taskMemory = executorMemory(sc)
@@ -403,7 +409,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
slaves.getOrElseUpdate(slaveId, new Slave(offer.getHostname)).taskIDs.add(taskId)
val (resourcesLeft, resourcesToUse) =
- partitionTaskResources(resources, taskCPUs, taskMemory)
+ partitionTaskResources(resources, taskCPUs, taskMemory, taskGPUs)
val taskBuilder = MesosTaskInfo.newBuilder()
.setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
@@ -425,6 +431,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
remainingResources(offerId) = resourcesLeft.asJava
totalCoresAcquired += taskCPUs
coresByTaskId(taskId) = taskCPUs
+ if (taskGPUs > 0) {
+ totalGpusAcquired += taskGPUs
+ gpusByTaskId(taskId) = taskGPUs
+ }
}
}
}
@@ -432,21 +442,28 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
}
/** Extracts task needed resources from a list of available resources. */
- private def partitionTaskResources(resources: JList[Resource], taskCPUs: Int, taskMemory: Int)
+ private def partitionTaskResources(
+ resources: JList[Resource],
+ taskCPUs: Int,
+ taskMemory: Int,
+ taskGPUs: Int)
: (List[Resource], List[Resource]) = {
// partition cpus & mem
val (afterCPUResources, cpuResourcesToUse) = partitionResources(resources, "cpus", taskCPUs)
val (afterMemResources, memResourcesToUse) =
partitionResources(afterCPUResources.asJava, "mem", taskMemory)
+ val (afterGPUResources, gpuResourcesToUse) =
+ partitionResources(afterMemResources.asJava, "gpus", taskGPUs)
// If user specifies port numbers in SparkConfig then consecutive tasks will not be launched
// on the same host. This essentially means one executor per host.
// TODO: handle network isolator case
val (nonPortResources, portResourcesToUse) =
- partitionPortResources(nonZeroPortValuesFromConfig(sc.conf), afterMemResources)
+ partitionPortResources(nonZeroPortValuesFromConfig(sc.conf), afterGPUResources)
- (nonPortResources, cpuResourcesToUse ++ memResourcesToUse ++ portResourcesToUse)
+ (nonPortResources,
+ cpuResourcesToUse ++ memResourcesToUse ++ portResourcesToUse ++ gpuResourcesToUse)
}
private def canLaunchTask(slaveId: String, resources: JList[Resource]): Boolean = {
@@ -513,6 +530,11 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
totalCoresAcquired -= cores
coresByTaskId -= taskId
}
+ // Also remove the gpus we have remembered for this task, if it's in the hashmap
+ for (gpus <- gpusByTaskId.get(taskId)) {
+ totalGpusAcquired -= gpus
+ gpusByTaskId -= taskId
+ }
// If it was a failure, mark the slave as failed for blacklisting purposes
if (TaskState.isFailed(state)) {
slave.taskFailures += 1
http://git-wip-us.apache.org/repos/asf/spark/blob/29f186bf/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
----------------------------------------------------------------------
diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 2963d16..73cc241 100644
--- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -27,6 +27,7 @@ import scala.util.control.NonFatal
import com.google.common.base.Splitter
import org.apache.mesos.{MesosSchedulerDriver, Protos, Scheduler, SchedulerDriver}
import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
+import org.apache.mesos.Protos.FrameworkInfo.Capability
import org.apache.mesos.protobuf.{ByteString, GeneratedMessage}
import org.apache.spark.{SparkConf, SparkContext, SparkException}
@@ -93,6 +94,10 @@ trait MesosSchedulerUtils extends Logging {
conf.getOption("spark.mesos.role").foreach { role =>
fwInfoBuilder.setRole(role)
}
+ val maxGpus = conf.getInt("spark.mesos.gpus.max", 0)
+ if (maxGpus > 0) {
+ fwInfoBuilder.addCapabilities(Capability.newBuilder().setType(Capability.Type.GPU_RESOURCES))
+ }
if (credBuilder.hasPrincipal) {
new MesosSchedulerDriver(
scheduler, fwInfoBuilder.build(), masterUrl, credBuilder.build())
http://git-wip-us.apache.org/repos/asf/spark/blob/29f186bf/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index c3ab488..75ba02e 100644
--- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -67,7 +67,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
val minMem = backend.executorMemory(sc)
val minCpu = 4
- val offers = List((minMem, minCpu))
+ val offers = List(Resources(minMem, minCpu))
// launches a task on a valid offer
offerResources(offers)
@@ -95,8 +95,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
// launches a task on a valid offer
val minMem = backend.executorMemory(sc) + 1024
val minCpu = 4
- val offer1 = (minMem, minCpu)
- val offer2 = (minMem, 1)
+ val offer1 = Resources(minMem, minCpu)
+ val offer2 = Resources(minMem, 1)
offerResources(List(offer1, offer2))
verifyTaskLaunched(driver, "o1")
@@ -115,7 +115,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
setBackend(Map("spark.executor.cores" -> executorCores.toString))
val executorMemory = backend.executorMemory(sc)
- val offers = List((executorMemory * 2, executorCores + 1))
+ val offers = List(Resources(executorMemory * 2, executorCores + 1))
offerResources(offers)
val taskInfos = verifyTaskLaunched(driver, "o1")
@@ -130,7 +130,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
val executorMemory = backend.executorMemory(sc)
val offerCores = 10
- offerResources(List((executorMemory * 2, offerCores)))
+ offerResources(List(Resources(executorMemory * 2, offerCores)))
val taskInfos = verifyTaskLaunched(driver, "o1")
assert(taskInfos.length == 1)
@@ -144,7 +144,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
setBackend(Map("spark.cores.max" -> maxCores.toString))
val executorMemory = backend.executorMemory(sc)
- offerResources(List((executorMemory, maxCores + 1)))
+ offerResources(List(Resources(executorMemory, maxCores + 1)))
val taskInfos = verifyTaskLaunched(driver, "o1")
assert(taskInfos.length == 1)
@@ -153,9 +153,38 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
assert(cpus == maxCores)
}
+ test("mesos does not acquire gpus if not specified") {
+ setBackend()
+
+ val executorMemory = backend.executorMemory(sc)
+ offerResources(List(Resources(executorMemory, 1, 1)))
+
+ val taskInfos = verifyTaskLaunched(driver, "o1")
+ assert(taskInfos.length == 1)
+
+ val gpus = backend.getResource(taskInfos.head.getResourcesList, "gpus")
+ assert(gpus == 0.0)
+ }
+
+
+ test("mesos does not acquire more than spark.mesos.gpus.max") {
+ val maxGpus = 5
+ setBackend(Map("spark.mesos.gpus.max" -> maxGpus.toString))
+
+ val executorMemory = backend.executorMemory(sc)
+ offerResources(List(Resources(executorMemory, 1, maxGpus + 1)))
+
+ val taskInfos = verifyTaskLaunched(driver, "o1")
+ assert(taskInfos.length == 1)
+
+ val gpus = backend.getResource(taskInfos.head.getResourcesList, "gpus")
+ assert(gpus == maxGpus)
+ }
+
+
test("mesos declines offers that violate attribute constraints") {
setBackend(Map("spark.mesos.constraints" -> "x:true"))
- offerResources(List((backend.executorMemory(sc), 4)))
+ offerResources(List(Resources(backend.executorMemory(sc), 4)))
verifyDeclinedOffer(driver, createOfferId("o1"), true)
}
@@ -165,8 +194,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
val executorMemory = backend.executorMemory(sc)
offerResources(List(
- (executorMemory, maxCores + 1),
- (executorMemory, maxCores + 1)))
+ Resources(executorMemory, maxCores + 1),
+ Resources(executorMemory, maxCores + 1)))
verifyTaskLaunched(driver, "o1")
verifyDeclinedOffer(driver, createOfferId("o2"), true)
@@ -180,8 +209,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
val executorMemory = backend.executorMemory(sc)
offerResources(List(
- (executorMemory * 2, executorCores * 2),
- (executorMemory * 2, executorCores * 2)))
+ Resources(executorMemory * 2, executorCores * 2),
+ Resources(executorMemory * 2, executorCores * 2)))
verifyTaskLaunched(driver, "o1")
verifyTaskLaunched(driver, "o2")
@@ -193,7 +222,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
// offer with room for two executors
val executorMemory = backend.executorMemory(sc)
- offerResources(List((executorMemory * 2, executorCores * 2)))
+ offerResources(List(Resources(executorMemory * 2, executorCores * 2)))
// verify two executors were started on a single offer
val taskInfos = verifyTaskLaunched(driver, "o1")
@@ -397,7 +426,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
setBackend()
// launches a task on a valid offer
- val offers = List((backend.executorMemory(sc), 1))
+ val offers = List(Resources(backend.executorMemory(sc), 1))
offerResources(offers)
verifyTaskLaunched(driver, "o1")
@@ -434,6 +463,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
assert(launchedTasks.head.getCommand.getUrisList.asScala(0).getValue == url)
}
+ private case class Resources(mem: Int, cpus: Int, gpus: Int = 0)
+
private def verifyDeclinedOffer(driver: SchedulerDriver,
offerId: OfferID,
filter: Boolean = false): Unit = {
@@ -444,9 +475,9 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
}
}
- private def offerResources(offers: List[(Int, Int)], startId: Int = 1): Unit = {
+ private def offerResources(offers: List[Resources], startId: Int = 1): Unit = {
val mesosOffers = offers.zipWithIndex.map {case (offer, i) =>
- createOffer(s"o${i + startId}", s"s${i + startId}", offer._1, offer._2)}
+ createOffer(s"o${i + startId}", s"s${i + startId}", offer.mem, offer.cpus, None, offer.gpus)}
backend.resourceOffers(driver, mesosOffers.asJava)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/29f186bf/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
----------------------------------------------------------------------
diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
index fa9406f..7ebb294 100644
--- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
+++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
@@ -32,8 +32,9 @@ object Utils {
offerId: String,
slaveId: String,
mem: Int,
- cpu: Int,
- ports: Option[(Long, Long)] = None): Offer = {
+ cpus: Int,
+ ports: Option[(Long, Long)] = None,
+ gpus: Int = 0): Offer = {
val builder = Offer.newBuilder()
builder.addResourcesBuilder()
.setName("mem")
@@ -42,7 +43,7 @@ object Utils {
builder.addResourcesBuilder()
.setName("cpus")
.setType(Value.Type.SCALAR)
- .setScalar(Scalar.newBuilder().setValue(cpu))
+ .setScalar(Scalar.newBuilder().setValue(cpus))
ports.foreach { resourcePorts =>
builder.addResourcesBuilder()
.setName("ports")
@@ -50,6 +51,12 @@ object Utils {
.setRanges(Ranges.newBuilder().addRange(MesosRange.newBuilder()
.setBegin(resourcePorts._1).setEnd(resourcePorts._2).build()))
}
+ if (gpus > 0) {
+ builder.addResourcesBuilder()
+ .setName("gpus")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Scalar.newBuilder().setValue(gpus))
+ }
builder.setId(createOfferId(offerId))
.setFrameworkId(FrameworkID.newBuilder()
.setValue("f1"))
@@ -82,4 +89,3 @@ object Utils {
TaskID.newBuilder().setValue(taskId).build()
}
}
-
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org