You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2017/11/29 22:15:45 UTC

spark git commit: [SPARK-18935][MESOS] Fix dynamic reservations on mesos

Repository: spark
Updated Branches:
  refs/heads/master 284836862 -> 193555f79


[SPARK-18935][MESOS] Fix dynamic reservations on mesos

## What changes were proposed in this pull request?

- Solves the issue described in the ticket by preserving reservation and allocation info in all cases (port handling included).
- upgrades to 1.4
- Adds extra debug level logging to make debugging easier in the future, for example we add reservation info when applicable.
```
 17/09/29 14:53:07 DEBUG MesosCoarseGrainedSchedulerBackend: Accepting offer: f20de49b-dee3-45dd-a3c1-73418b7de891-O32 with attributes: Map() allocation info: role: "spark-prive"
  reservation info: name: "ports"
 type: RANGES
 ranges {
   range {
     begin: 31000
     end: 32000
   }
 }
 role: "spark-prive"
 reservation {
   principal: "test"
 }
 allocation_info {
   role: "spark-prive"
 }
```
- Some style cleanup.

## How was this patch tested?

Manually by running the example in the ticket with and without a principal. Specifically I tested it on a dc/os 1.10 cluster with 7 nodes and played with reservations. From the master node in order to reserve resources I executed:

```for i in 0 1 2 3 4 5 6
do
curl -i \
      -d slaveId=90ec65ea-1f7b-479f-a824-35d2527d6d26-S$i \
      -d resources='[
        {
          "name": "cpus",
          "type": "SCALAR",
          "scalar": { "value": 2 },
          "role": "spark-role",
          "reservation": {
            "principal": ""
          }
        },
        {
          "name": "mem",
          "type": "SCALAR",
          "scalar": { "value": 8026 },
          "role": "spark-role",
          "reservation": {
            "principal": ""
          }
        }
      ]' \
      -X POST http://master.mesos:5050/master/reserve
done
```
Nodes had 4 cpus (m3.xlarge instances)  and I reserved either 2 or 4 cpus (all for a role).
I verified it launches tasks on nodes with reserved resources under `spark-role` role  only if
a) there are remaining resources for (*) default role and the spark driver has no role assigned to it.
b) the spark driver has a role assigned to it and it is the same role used in reservations.
I also tested this locally on my machine.

Author: Stavros Kontopoulos <st...@gmail.com>

Closes #19390 from skonto/fix_dynamic_reservation.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/193555f7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/193555f7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/193555f7

Branch: refs/heads/master
Commit: 193555f79cc73873613674a09a7c371688b6dbc7
Parents: 2848368
Author: Stavros Kontopoulos <st...@gmail.com>
Authored: Wed Nov 29 14:15:35 2017 -0800
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Wed Nov 29 14:15:35 2017 -0800

----------------------------------------------------------------------
 dev/deps/spark-deps-hadoop-2.6                  |  2 +-
 dev/deps/spark-deps-hadoop-2.7                  |  2 +-
 resource-managers/mesos/pom.xml                 |  2 +-
 .../cluster/mesos/MesosClusterScheduler.scala   |  1 -
 .../MesosCoarseGrainedSchedulerBackend.scala    | 17 +++-
 .../cluster/mesos/MesosSchedulerUtils.scala     | 99 +++++++++++++-------
 6 files changed, 80 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/193555f7/dev/deps/spark-deps-hadoop-2.6
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index 21c8a75..50ac6d1 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -138,7 +138,7 @@ lz4-java-1.4.0.jar
 machinist_2.11-0.6.1.jar
 macro-compat_2.11-1.1.1.jar
 mail-1.4.7.jar
-mesos-1.3.0-shaded-protobuf.jar
+mesos-1.4.0-shaded-protobuf.jar
 metrics-core-3.1.5.jar
 metrics-graphite-3.1.5.jar
 metrics-json-3.1.5.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/193555f7/dev/deps/spark-deps-hadoop-2.7
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index 7173426..1b1e316 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -139,7 +139,7 @@ lz4-java-1.4.0.jar
 machinist_2.11-0.6.1.jar
 macro-compat_2.11-1.1.1.jar
 mail-1.4.7.jar
-mesos-1.3.0-shaded-protobuf.jar
+mesos-1.4.0-shaded-protobuf.jar
 metrics-core-3.1.5.jar
 metrics-graphite-3.1.5.jar
 metrics-json-3.1.5.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/193555f7/resource-managers/mesos/pom.xml
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/pom.xml b/resource-managers/mesos/pom.xml
index de8f1c9..70d0c17 100644
--- a/resource-managers/mesos/pom.xml
+++ b/resource-managers/mesos/pom.xml
@@ -29,7 +29,7 @@
   <name>Spark Project Mesos</name>
   <properties>
     <sbt.project.name>mesos</sbt.project.name>
-    <mesos.version>1.3.0</mesos.version>
+    <mesos.version>1.4.0</mesos.version>
     <mesos.classifier>shaded-protobuf</mesos.classifier>
   </properties>
 

http://git-wip-us.apache.org/repos/asf/spark/blob/193555f7/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index c41283e..d224a73 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -36,7 +36,6 @@ import org.apache.spark.deploy.rest.{CreateSubmissionResponse, KillSubmissionRes
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.util.Utils
 
-
 /**
  * Tracks the current state of a Mesos Task that runs a Spark driver.
  * @param driverDescription Submitted driver description from

http://git-wip-us.apache.org/repos/asf/spark/blob/193555f7/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index c392061..191415a 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -400,13 +400,20 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
       val offerMem = getResource(offer.getResourcesList, "mem")
       val offerCpus = getResource(offer.getResourcesList, "cpus")
       val offerPorts = getRangeResource(offer.getResourcesList, "ports")
+      val offerReservationInfo = offer
+        .getResourcesList
+        .asScala
+        .find { r => r.getReservation != null }
       val id = offer.getId.getValue
 
       if (tasks.contains(offer.getId)) { // accept
         val offerTasks = tasks(offer.getId)
 
         logDebug(s"Accepting offer: $id with attributes: $offerAttributes " +
-          s"mem: $offerMem cpu: $offerCpus ports: $offerPorts." +
+          offerReservationInfo.map(resInfo =>
+            s"reservation info: ${resInfo.getReservation.toString}").getOrElse("") +
+          s"mem: $offerMem cpu: $offerCpus ports: $offerPorts " +
+          s"resources: ${offer.getResourcesList.asScala.mkString(",")}." +
           s"  Launching ${offerTasks.size} Mesos tasks.")
 
         for (task <- offerTasks) {
@@ -416,7 +423,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
           val ports = getRangeResource(task.getResourcesList, "ports").mkString(",")
 
           logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus" +
-            s" ports: $ports")
+            s" ports: $ports" + s" on slave with slave id: ${task.getSlaveId.getValue} ")
         }
 
         driver.launchTasks(
@@ -431,7 +438,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
       } else {
         declineOffer(
           driver,
-          offer)
+          offer,
+          Some("Offer was declined due to unmet task launch constraints."))
       }
     }
   }
@@ -513,6 +521,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
             totalGpusAcquired += taskGPUs
             gpusByTaskId(taskId) = taskGPUs
           }
+        } else {
+          logDebug(s"Cannot launch a task for offer with id: $offerId on slave " +
+            s"with id: $slaveId. Requirements were not met for this offer.")
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/193555f7/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 6fcb30a..e754503 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -28,7 +28,8 @@ import com.google.common.base.Splitter
 import org.apache.mesos.{MesosSchedulerDriver, Protos, Scheduler, SchedulerDriver}
 import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
 import org.apache.mesos.Protos.FrameworkInfo.Capability
-import org.apache.mesos.protobuf.{ByteString, GeneratedMessage}
+import org.apache.mesos.Protos.Resource.ReservationInfo
+import org.apache.mesos.protobuf.{ByteString, GeneratedMessageV3}
 
 import org.apache.spark.{SparkConf, SparkContext, SparkException}
 import org.apache.spark.TaskState
@@ -36,8 +37,6 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.util.Utils
 
-
-
 /**
  * Shared trait for implementing a Mesos Scheduler. This holds common state and helper
  * methods and Mesos scheduler will use.
@@ -46,6 +45,8 @@ trait MesosSchedulerUtils extends Logging {
   // Lock used to wait for scheduler to be registered
   private final val registerLatch = new CountDownLatch(1)
 
+  private final val ANY_ROLE = "*"
+
   /**
    * Creates a new MesosSchedulerDriver that communicates to the Mesos master.
    *
@@ -175,17 +176,36 @@ trait MesosSchedulerUtils extends Logging {
     registerLatch.countDown()
   }
 
-  def createResource(name: String, amount: Double, role: Option[String] = None): Resource = {
+  private def setReservationInfo(
+       reservationInfo: Option[ReservationInfo],
+       role: Option[String],
+       builder: Resource.Builder): Unit = {
+    if (!role.contains(ANY_ROLE)) {
+      reservationInfo.foreach { res => builder.setReservation(res) }
+    }
+  }
+
+  def createResource(
+       name: String,
+       amount: Double,
+       role: Option[String] = None,
+       reservationInfo: Option[ReservationInfo] = None): Resource = {
     val builder = Resource.newBuilder()
       .setName(name)
       .setType(Value.Type.SCALAR)
       .setScalar(Value.Scalar.newBuilder().setValue(amount).build())
-
     role.foreach { r => builder.setRole(r) }
-
+    setReservationInfo(reservationInfo, role, builder)
     builder.build()
   }
 
+  private def getReservation(resource: Resource): Option[ReservationInfo] = {
+    if (resource.hasReservation) {
+      Some(resource.getReservation)
+    } else {
+      None
+    }
+  }
   /**
    * Partition the existing set of resources into two groups, those remaining to be
    * scheduled and those requested to be used for a new task.
@@ -203,14 +223,17 @@ trait MesosSchedulerUtils extends Logging {
     var requestedResources = new ArrayBuffer[Resource]
     val remainingResources = resources.asScala.map {
       case r =>
+        val reservation = getReservation(r)
         if (remain > 0 &&
           r.getType == Value.Type.SCALAR &&
           r.getScalar.getValue > 0.0 &&
           r.getName == resourceName) {
           val usage = Math.min(remain, r.getScalar.getValue)
-          requestedResources += createResource(resourceName, usage, Some(r.getRole))
+          requestedResources += createResource(resourceName, usage,
+            Option(r.getRole), reservation)
           remain -= usage
-          createResource(resourceName, r.getScalar.getValue - usage, Some(r.getRole))
+          createResource(resourceName, r.getScalar.getValue - usage,
+            Option(r.getRole), reservation)
         } else {
           r
         }
@@ -228,16 +251,6 @@ trait MesosSchedulerUtils extends Logging {
     (attr.getName, attr.getText.getValue.split(',').toSet)
   }
 
-
-  /** Build a Mesos resource protobuf object */
-  protected def createResource(resourceName: String, quantity: Double): Protos.Resource = {
-    Resource.newBuilder()
-      .setName(resourceName)
-      .setType(Value.Type.SCALAR)
-      .setScalar(Value.Scalar.newBuilder().setValue(quantity).build())
-      .build()
-  }
-
   /**
    * Converts the attributes from the resource offer into a Map of name to Attribute Value
    * The attribute values are the mesos attribute types and they are
@@ -245,7 +258,8 @@ trait MesosSchedulerUtils extends Logging {
    * @param offerAttributes the attributes offered
    * @return
    */
-  protected def toAttributeMap(offerAttributes: JList[Attribute]): Map[String, GeneratedMessage] = {
+  protected def toAttributeMap(offerAttributes: JList[Attribute])
+    : Map[String, GeneratedMessageV3] = {
     offerAttributes.asScala.map { attr =>
       val attrValue = attr.getType match {
         case Value.Type.SCALAR => attr.getScalar
@@ -266,7 +280,7 @@ trait MesosSchedulerUtils extends Logging {
    */
   def matchesAttributeRequirements(
       slaveOfferConstraints: Map[String, Set[String]],
-      offerAttributes: Map[String, GeneratedMessage]): Boolean = {
+      offerAttributes: Map[String, GeneratedMessageV3]): Boolean = {
     slaveOfferConstraints.forall {
       // offer has the required attribute and subsumes the required values for that attribute
       case (name, requiredValues) =>
@@ -427,10 +441,10 @@ trait MesosSchedulerUtils extends Logging {
       // partition port offers
       val (resourcesWithoutPorts, portResources) = filterPortResources(offeredResources)
 
-      val portsAndRoles = requestedPorts.
-        map(x => (x, findPortAndGetAssignedRangeRole(x, portResources)))
+      val portsAndResourceInfo = requestedPorts.
+        map { x => (x, findPortAndGetAssignedResourceInfo(x, portResources)) }
 
-      val assignedPortResources = createResourcesFromPorts(portsAndRoles)
+      val assignedPortResources = createResourcesFromPorts(portsAndResourceInfo)
 
       // ignore non-assigned port resources, they will be declined implicitly by mesos
       // no need for splitting port resources.
@@ -450,16 +464,25 @@ trait MesosSchedulerUtils extends Logging {
     managedPortNames.map(conf.getLong(_, 0)).filter( _ != 0)
   }
 
+  private case class RoleResourceInfo(
+      role: String,
+      resInfo: Option[ReservationInfo])
+
   /** Creates a mesos resource for a specific port number. */
-  private def createResourcesFromPorts(portsAndRoles: List[(Long, String)]) : List[Resource] = {
-    portsAndRoles.flatMap{ case (port, role) =>
-      createMesosPortResource(List((port, port)), Some(role))}
+  private def createResourcesFromPorts(
+      portsAndResourcesInfo: List[(Long, RoleResourceInfo)])
+    : List[Resource] = {
+    portsAndResourcesInfo.flatMap { case (port, rInfo) =>
+      createMesosPortResource(List((port, port)), Option(rInfo.role), rInfo.resInfo)}
   }
 
   /** Helper to create mesos resources for specific port ranges. */
   private def createMesosPortResource(
       ranges: List[(Long, Long)],
-      role: Option[String] = None): List[Resource] = {
+      role: Option[String] = None,
+      reservationInfo: Option[ReservationInfo] = None): List[Resource] = {
+    // for ranges we are going to use (user defined ports fall in there) create mesos resources
+    // for each range there is a role associated with it.
     ranges.map { case (rangeStart, rangeEnd) =>
       val rangeValue = Value.Range.newBuilder()
         .setBegin(rangeStart)
@@ -468,7 +491,8 @@ trait MesosSchedulerUtils extends Logging {
         .setName("ports")
         .setType(Value.Type.RANGES)
         .setRanges(Value.Ranges.newBuilder().addRange(rangeValue))
-      role.foreach(r => builder.setRole(r))
+      role.foreach { r => builder.setRole(r) }
+      setReservationInfo(reservationInfo, role, builder)
       builder.build()
     }
   }
@@ -477,19 +501,21 @@ trait MesosSchedulerUtils extends Logging {
   * Helper to assign a port to an offered range and get the latter's role
   * info to use it later on.
   */
-  private def findPortAndGetAssignedRangeRole(port: Long, portResources: List[Resource])
-    : String = {
+  private def findPortAndGetAssignedResourceInfo(port: Long, portResources: List[Resource])
+    : RoleResourceInfo = {
 
     val ranges = portResources.
-      map(resource =>
-        (resource.getRole, resource.getRanges.getRangeList.asScala
-          .map(r => (r.getBegin, r.getEnd)).toList))
+      map { resource =>
+        val reservation = getReservation(resource)
+        (RoleResourceInfo(resource.getRole, reservation),
+          resource.getRanges.getRangeList.asScala.map(r => (r.getBegin, r.getEnd)).toList)
+      }
 
-    val rangePortRole = ranges
-      .find { case (role, rangeList) => rangeList
+    val rangePortResourceInfo = ranges
+      .find { case (resourceInfo, rangeList) => rangeList
         .exists{ case (rangeStart, rangeEnd) => rangeStart <= port & rangeEnd >= port}}
     // this is safe since we have previously checked about the ranges (see checkPorts method)
-    rangePortRole.map{ case (role, rangeList) => role}.get
+    rangePortResourceInfo.map{ case (resourceInfo, rangeList) => resourceInfo}.get
   }
 
   /** Retrieves the port resources from a list of mesos offered resources */
@@ -564,3 +590,4 @@ trait MesosSchedulerUtils extends Logging {
     }
   }
 }
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org