You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2017/11/29 22:15:45 UTC
spark git commit: [SPARK-18935][MESOS] Fix dynamic reservations on
mesos
Repository: spark
Updated Branches:
refs/heads/master 284836862 -> 193555f79
[SPARK-18935][MESOS] Fix dynamic reservations on mesos
## What changes were proposed in this pull request?
- Solves the issue described in the ticket by preserving reservation and allocation info in all cases (port handling included).
- upgrades to 1.4
- Adds extra debug level logging to make debugging easier in the future, for example we add reservation info when applicable.
```
17/09/29 14:53:07 DEBUG MesosCoarseGrainedSchedulerBackend: Accepting offer: f20de49b-dee3-45dd-a3c1-73418b7de891-O32 with attributes: Map() allocation info: role: "spark-prive"
reservation info: name: "ports"
type: RANGES
ranges {
range {
begin: 31000
end: 32000
}
}
role: "spark-prive"
reservation {
principal: "test"
}
allocation_info {
role: "spark-prive"
}
```
- Some style cleanup.
## How was this patch tested?
Manually by running the example in the ticket with and without a principal. Specifically I tested it on a dc/os 1.10 cluster with 7 nodes and played with reservations. From the master node in order to reserve resources I executed:
```for i in 0 1 2 3 4 5 6
do
curl -i \
-d slaveId=90ec65ea-1f7b-479f-a824-35d2527d6d26-S$i \
-d resources='[
{
"name": "cpus",
"type": "SCALAR",
"scalar": { "value": 2 },
"role": "spark-role",
"reservation": {
"principal": ""
}
},
{
"name": "mem",
"type": "SCALAR",
"scalar": { "value": 8026 },
"role": "spark-role",
"reservation": {
"principal": ""
}
}
]' \
-X POST http://master.mesos:5050/master/reserve
done
```
Nodes had 4 cpus (m3.xlarge instances) and I reserved either 2 or 4 cpus (all for a role).
I verified it launches tasks on nodes with reserved resources under `spark-role` role only if
a) there are remaining resources for (*) default role and the spark driver has no role assigned to it.
b) the spark driver has a role assigned to it and it is the same role used in reservations.
I also tested this locally on my machine.
Author: Stavros Kontopoulos <st...@gmail.com>
Closes #19390 from skonto/fix_dynamic_reservation.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/193555f7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/193555f7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/193555f7
Branch: refs/heads/master
Commit: 193555f79cc73873613674a09a7c371688b6dbc7
Parents: 2848368
Author: Stavros Kontopoulos <st...@gmail.com>
Authored: Wed Nov 29 14:15:35 2017 -0800
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Wed Nov 29 14:15:35 2017 -0800
----------------------------------------------------------------------
dev/deps/spark-deps-hadoop-2.6 | 2 +-
dev/deps/spark-deps-hadoop-2.7 | 2 +-
resource-managers/mesos/pom.xml | 2 +-
.../cluster/mesos/MesosClusterScheduler.scala | 1 -
.../MesosCoarseGrainedSchedulerBackend.scala | 17 +++-
.../cluster/mesos/MesosSchedulerUtils.scala | 99 +++++++++++++-------
6 files changed, 80 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/193555f7/dev/deps/spark-deps-hadoop-2.6
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index 21c8a75..50ac6d1 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -138,7 +138,7 @@ lz4-java-1.4.0.jar
machinist_2.11-0.6.1.jar
macro-compat_2.11-1.1.1.jar
mail-1.4.7.jar
-mesos-1.3.0-shaded-protobuf.jar
+mesos-1.4.0-shaded-protobuf.jar
metrics-core-3.1.5.jar
metrics-graphite-3.1.5.jar
metrics-json-3.1.5.jar
http://git-wip-us.apache.org/repos/asf/spark/blob/193555f7/dev/deps/spark-deps-hadoop-2.7
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index 7173426..1b1e316 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -139,7 +139,7 @@ lz4-java-1.4.0.jar
machinist_2.11-0.6.1.jar
macro-compat_2.11-1.1.1.jar
mail-1.4.7.jar
-mesos-1.3.0-shaded-protobuf.jar
+mesos-1.4.0-shaded-protobuf.jar
metrics-core-3.1.5.jar
metrics-graphite-3.1.5.jar
metrics-json-3.1.5.jar
http://git-wip-us.apache.org/repos/asf/spark/blob/193555f7/resource-managers/mesos/pom.xml
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/pom.xml b/resource-managers/mesos/pom.xml
index de8f1c9..70d0c17 100644
--- a/resource-managers/mesos/pom.xml
+++ b/resource-managers/mesos/pom.xml
@@ -29,7 +29,7 @@
<name>Spark Project Mesos</name>
<properties>
<sbt.project.name>mesos</sbt.project.name>
- <mesos.version>1.3.0</mesos.version>
+ <mesos.version>1.4.0</mesos.version>
<mesos.classifier>shaded-protobuf</mesos.classifier>
</properties>
http://git-wip-us.apache.org/repos/asf/spark/blob/193555f7/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index c41283e..d224a73 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -36,7 +36,6 @@ import org.apache.spark.deploy.rest.{CreateSubmissionResponse, KillSubmissionRes
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.Utils
-
/**
* Tracks the current state of a Mesos Task that runs a Spark driver.
* @param driverDescription Submitted driver description from
http://git-wip-us.apache.org/repos/asf/spark/blob/193555f7/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index c392061..191415a 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -400,13 +400,20 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
val offerMem = getResource(offer.getResourcesList, "mem")
val offerCpus = getResource(offer.getResourcesList, "cpus")
val offerPorts = getRangeResource(offer.getResourcesList, "ports")
+ val offerReservationInfo = offer
+ .getResourcesList
+ .asScala
+ .find { r => r.getReservation != null }
val id = offer.getId.getValue
if (tasks.contains(offer.getId)) { // accept
val offerTasks = tasks(offer.getId)
logDebug(s"Accepting offer: $id with attributes: $offerAttributes " +
- s"mem: $offerMem cpu: $offerCpus ports: $offerPorts." +
+ offerReservationInfo.map(resInfo =>
+ s"reservation info: ${resInfo.getReservation.toString}").getOrElse("") +
+ s"mem: $offerMem cpu: $offerCpus ports: $offerPorts " +
+ s"resources: ${offer.getResourcesList.asScala.mkString(",")}." +
s" Launching ${offerTasks.size} Mesos tasks.")
for (task <- offerTasks) {
@@ -416,7 +423,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
val ports = getRangeResource(task.getResourcesList, "ports").mkString(",")
logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus" +
- s" ports: $ports")
+ s" ports: $ports" + s" on slave with slave id: ${task.getSlaveId.getValue} ")
}
driver.launchTasks(
@@ -431,7 +438,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
} else {
declineOffer(
driver,
- offer)
+ offer,
+ Some("Offer was declined due to unmet task launch constraints."))
}
}
}
@@ -513,6 +521,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
totalGpusAcquired += taskGPUs
gpusByTaskId(taskId) = taskGPUs
}
+ } else {
+ logDebug(s"Cannot launch a task for offer with id: $offerId on slave " +
+ s"with id: $slaveId. Requirements were not met for this offer.")
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/193555f7/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 6fcb30a..e754503 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -28,7 +28,8 @@ import com.google.common.base.Splitter
import org.apache.mesos.{MesosSchedulerDriver, Protos, Scheduler, SchedulerDriver}
import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
import org.apache.mesos.Protos.FrameworkInfo.Capability
-import org.apache.mesos.protobuf.{ByteString, GeneratedMessage}
+import org.apache.mesos.Protos.Resource.ReservationInfo
+import org.apache.mesos.protobuf.{ByteString, GeneratedMessageV3}
import org.apache.spark.{SparkConf, SparkContext, SparkException}
import org.apache.spark.TaskState
@@ -36,8 +37,6 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.util.Utils
-
-
/**
* Shared trait for implementing a Mesos Scheduler. This holds common state and helper
* methods and Mesos scheduler will use.
@@ -46,6 +45,8 @@ trait MesosSchedulerUtils extends Logging {
// Lock used to wait for scheduler to be registered
private final val registerLatch = new CountDownLatch(1)
+ private final val ANY_ROLE = "*"
+
/**
* Creates a new MesosSchedulerDriver that communicates to the Mesos master.
*
@@ -175,17 +176,36 @@ trait MesosSchedulerUtils extends Logging {
registerLatch.countDown()
}
- def createResource(name: String, amount: Double, role: Option[String] = None): Resource = {
+ private def setReservationInfo(
+ reservationInfo: Option[ReservationInfo],
+ role: Option[String],
+ builder: Resource.Builder): Unit = {
+ if (!role.contains(ANY_ROLE)) {
+ reservationInfo.foreach { res => builder.setReservation(res) }
+ }
+ }
+
+ def createResource(
+ name: String,
+ amount: Double,
+ role: Option[String] = None,
+ reservationInfo: Option[ReservationInfo] = None): Resource = {
val builder = Resource.newBuilder()
.setName(name)
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(amount).build())
-
role.foreach { r => builder.setRole(r) }
-
+ setReservationInfo(reservationInfo, role, builder)
builder.build()
}
+ private def getReservation(resource: Resource): Option[ReservationInfo] = {
+ if (resource.hasReservation) {
+ Some(resource.getReservation)
+ } else {
+ None
+ }
+ }
/**
* Partition the existing set of resources into two groups, those remaining to be
* scheduled and those requested to be used for a new task.
@@ -203,14 +223,17 @@ trait MesosSchedulerUtils extends Logging {
var requestedResources = new ArrayBuffer[Resource]
val remainingResources = resources.asScala.map {
case r =>
+ val reservation = getReservation(r)
if (remain > 0 &&
r.getType == Value.Type.SCALAR &&
r.getScalar.getValue > 0.0 &&
r.getName == resourceName) {
val usage = Math.min(remain, r.getScalar.getValue)
- requestedResources += createResource(resourceName, usage, Some(r.getRole))
+ requestedResources += createResource(resourceName, usage,
+ Option(r.getRole), reservation)
remain -= usage
- createResource(resourceName, r.getScalar.getValue - usage, Some(r.getRole))
+ createResource(resourceName, r.getScalar.getValue - usage,
+ Option(r.getRole), reservation)
} else {
r
}
@@ -228,16 +251,6 @@ trait MesosSchedulerUtils extends Logging {
(attr.getName, attr.getText.getValue.split(',').toSet)
}
-
- /** Build a Mesos resource protobuf object */
- protected def createResource(resourceName: String, quantity: Double): Protos.Resource = {
- Resource.newBuilder()
- .setName(resourceName)
- .setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(quantity).build())
- .build()
- }
-
/**
* Converts the attributes from the resource offer into a Map of name to Attribute Value
* The attribute values are the mesos attribute types and they are
@@ -245,7 +258,8 @@ trait MesosSchedulerUtils extends Logging {
* @param offerAttributes the attributes offered
* @return
*/
- protected def toAttributeMap(offerAttributes: JList[Attribute]): Map[String, GeneratedMessage] = {
+ protected def toAttributeMap(offerAttributes: JList[Attribute])
+ : Map[String, GeneratedMessageV3] = {
offerAttributes.asScala.map { attr =>
val attrValue = attr.getType match {
case Value.Type.SCALAR => attr.getScalar
@@ -266,7 +280,7 @@ trait MesosSchedulerUtils extends Logging {
*/
def matchesAttributeRequirements(
slaveOfferConstraints: Map[String, Set[String]],
- offerAttributes: Map[String, GeneratedMessage]): Boolean = {
+ offerAttributes: Map[String, GeneratedMessageV3]): Boolean = {
slaveOfferConstraints.forall {
// offer has the required attribute and subsumes the required values for that attribute
case (name, requiredValues) =>
@@ -427,10 +441,10 @@ trait MesosSchedulerUtils extends Logging {
// partition port offers
val (resourcesWithoutPorts, portResources) = filterPortResources(offeredResources)
- val portsAndRoles = requestedPorts.
- map(x => (x, findPortAndGetAssignedRangeRole(x, portResources)))
+ val portsAndResourceInfo = requestedPorts.
+ map { x => (x, findPortAndGetAssignedResourceInfo(x, portResources)) }
- val assignedPortResources = createResourcesFromPorts(portsAndRoles)
+ val assignedPortResources = createResourcesFromPorts(portsAndResourceInfo)
// ignore non-assigned port resources, they will be declined implicitly by mesos
// no need for splitting port resources.
@@ -450,16 +464,25 @@ trait MesosSchedulerUtils extends Logging {
managedPortNames.map(conf.getLong(_, 0)).filter( _ != 0)
}
+ private case class RoleResourceInfo(
+ role: String,
+ resInfo: Option[ReservationInfo])
+
/** Creates a mesos resource for a specific port number. */
- private def createResourcesFromPorts(portsAndRoles: List[(Long, String)]) : List[Resource] = {
- portsAndRoles.flatMap{ case (port, role) =>
- createMesosPortResource(List((port, port)), Some(role))}
+ private def createResourcesFromPorts(
+ portsAndResourcesInfo: List[(Long, RoleResourceInfo)])
+ : List[Resource] = {
+ portsAndResourcesInfo.flatMap { case (port, rInfo) =>
+ createMesosPortResource(List((port, port)), Option(rInfo.role), rInfo.resInfo)}
}
/** Helper to create mesos resources for specific port ranges. */
private def createMesosPortResource(
ranges: List[(Long, Long)],
- role: Option[String] = None): List[Resource] = {
+ role: Option[String] = None,
+ reservationInfo: Option[ReservationInfo] = None): List[Resource] = {
+ // for ranges we are going to use (user defined ports fall in there) create mesos resources
+ // for each range there is a role associated with it.
ranges.map { case (rangeStart, rangeEnd) =>
val rangeValue = Value.Range.newBuilder()
.setBegin(rangeStart)
@@ -468,7 +491,8 @@ trait MesosSchedulerUtils extends Logging {
.setName("ports")
.setType(Value.Type.RANGES)
.setRanges(Value.Ranges.newBuilder().addRange(rangeValue))
- role.foreach(r => builder.setRole(r))
+ role.foreach { r => builder.setRole(r) }
+ setReservationInfo(reservationInfo, role, builder)
builder.build()
}
}
@@ -477,19 +501,21 @@ trait MesosSchedulerUtils extends Logging {
* Helper to assign a port to an offered range and get the latter's role
* info to use it later on.
*/
- private def findPortAndGetAssignedRangeRole(port: Long, portResources: List[Resource])
- : String = {
+ private def findPortAndGetAssignedResourceInfo(port: Long, portResources: List[Resource])
+ : RoleResourceInfo = {
val ranges = portResources.
- map(resource =>
- (resource.getRole, resource.getRanges.getRangeList.asScala
- .map(r => (r.getBegin, r.getEnd)).toList))
+ map { resource =>
+ val reservation = getReservation(resource)
+ (RoleResourceInfo(resource.getRole, reservation),
+ resource.getRanges.getRangeList.asScala.map(r => (r.getBegin, r.getEnd)).toList)
+ }
- val rangePortRole = ranges
- .find { case (role, rangeList) => rangeList
+ val rangePortResourceInfo = ranges
+ .find { case (resourceInfo, rangeList) => rangeList
.exists{ case (rangeStart, rangeEnd) => rangeStart <= port & rangeEnd >= port}}
// this is safe since we have previously checked about the ranges (see checkPorts method)
- rangePortRole.map{ case (role, rangeList) => role}.get
+ rangePortResourceInfo.map{ case (resourceInfo, rangeList) => resourceInfo}.get
}
/** Retrieves the port resources from a list of mesos offered resources */
@@ -564,3 +590,4 @@ trait MesosSchedulerUtils extends Logging {
}
}
}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org