You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2016/12/07 00:23:50 UTC
[13/18] spark git commit: [SPARK-18662] Move resource managers to
separate directory
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
new file mode 100644
index 0000000..1d742fe
--- /dev/null
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -0,0 +1,524 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.util.{List => JList}
+import java.util.concurrent.CountDownLatch
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+import scala.util.control.NonFatal
+
+import com.google.common.base.Splitter
+import org.apache.mesos.{MesosSchedulerDriver, Protos, Scheduler, SchedulerDriver}
+import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
+import org.apache.mesos.Protos.FrameworkInfo.Capability
+import org.apache.mesos.protobuf.{ByteString, GeneratedMessage}
+
+import org.apache.spark.{SparkConf, SparkContext, SparkException}
+import org.apache.spark.TaskState
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.util.Utils
+
+
+
+/**
+ * Shared trait for implementing a Mesos Scheduler. This holds common state and helper
+ * methods and Mesos scheduler will use.
+ */
+trait MesosSchedulerUtils extends Logging {
+ // Lock used to wait for scheduler to be registered
+ private final val registerLatch = new CountDownLatch(1)
+
+ // Driver for talking to Mesos
+ protected var mesosDriver: SchedulerDriver = null
+
+ /**
+ * Creates a new MesosSchedulerDriver that communicates to the Mesos master.
+ *
+ * @param masterUrl The url to connect to Mesos master
+ * @param scheduler the scheduler class to receive scheduler callbacks
+ * @param sparkUser User to impersonate with when running tasks
+ * @param appName The framework name to display on the Mesos UI
+ * @param conf Spark configuration
+ * @param webuiUrl The WebUI url to link from Mesos UI
+ * @param checkpoint Option to checkpoint tasks for failover
+ * @param failoverTimeout Duration Mesos master expect scheduler to reconnect on disconnect
+ * @param frameworkId The id of the new framework
+ */
+ protected def createSchedulerDriver(
+ masterUrl: String,
+ scheduler: Scheduler,
+ sparkUser: String,
+ appName: String,
+ conf: SparkConf,
+ webuiUrl: Option[String] = None,
+ checkpoint: Option[Boolean] = None,
+ failoverTimeout: Option[Double] = None,
+ frameworkId: Option[String] = None): SchedulerDriver = {
+ val fwInfoBuilder = FrameworkInfo.newBuilder().setUser(sparkUser).setName(appName)
+ val credBuilder = Credential.newBuilder()
+ webuiUrl.foreach { url => fwInfoBuilder.setWebuiUrl(url) }
+ checkpoint.foreach { checkpoint => fwInfoBuilder.setCheckpoint(checkpoint) }
+ failoverTimeout.foreach { timeout => fwInfoBuilder.setFailoverTimeout(timeout) }
+ frameworkId.foreach { id =>
+ fwInfoBuilder.setId(FrameworkID.newBuilder().setValue(id).build())
+ }
+ fwInfoBuilder.setHostname(Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(
+ conf.get(DRIVER_HOST_ADDRESS)))
+ conf.getOption("spark.mesos.principal").foreach { principal =>
+ fwInfoBuilder.setPrincipal(principal)
+ credBuilder.setPrincipal(principal)
+ }
+ conf.getOption("spark.mesos.secret").foreach { secret =>
+ credBuilder.setSecret(secret)
+ }
+ if (credBuilder.hasSecret && !fwInfoBuilder.hasPrincipal) {
+ throw new SparkException(
+ "spark.mesos.principal must be configured when spark.mesos.secret is set")
+ }
+ conf.getOption("spark.mesos.role").foreach { role =>
+ fwInfoBuilder.setRole(role)
+ }
+ val maxGpus = conf.getInt("spark.mesos.gpus.max", 0)
+ if (maxGpus > 0) {
+ fwInfoBuilder.addCapabilities(Capability.newBuilder().setType(Capability.Type.GPU_RESOURCES))
+ }
+ if (credBuilder.hasPrincipal) {
+ new MesosSchedulerDriver(
+ scheduler, fwInfoBuilder.build(), masterUrl, credBuilder.build())
+ } else {
+ new MesosSchedulerDriver(scheduler, fwInfoBuilder.build(), masterUrl)
+ }
+ }
+
+ /**
+ * Starts the MesosSchedulerDriver and stores the current running driver to this new instance.
+ * This driver is expected to not be running.
+ * This method returns only after the scheduler has registered with Mesos.
+ */
+ def startScheduler(newDriver: SchedulerDriver): Unit = {
+ synchronized {
+ if (mesosDriver != null) {
+ registerLatch.await()
+ return
+ }
+ @volatile
+ var error: Option[Exception] = None
+
+ // We create a new thread that will block inside `mesosDriver.run`
+ // until the scheduler exists
+ new Thread(Utils.getFormattedClassName(this) + "-mesos-driver") {
+ setDaemon(true)
+ override def run() {
+ try {
+ mesosDriver = newDriver
+ val ret = mesosDriver.run()
+ logInfo("driver.run() returned with code " + ret)
+ if (ret != null && ret.equals(Status.DRIVER_ABORTED)) {
+ error = Some(new SparkException("Error starting driver, DRIVER_ABORTED"))
+ markErr()
+ }
+ } catch {
+ case e: Exception =>
+ logError("driver.run() failed", e)
+ error = Some(e)
+ markErr()
+ }
+ }
+ }.start()
+
+ registerLatch.await()
+
+ // propagate any error to the calling thread. This ensures that SparkContext creation fails
+ // without leaving a broken context that won't be able to schedule any tasks
+ error.foreach(throw _)
+ }
+ }
+
+ def getResource(res: JList[Resource], name: String): Double = {
+ // A resource can have multiple values in the offer since it can either be from
+ // a specific role or wildcard.
+ res.asScala.filter(_.getName == name).map(_.getScalar.getValue).sum
+ }
+
+ /**
+ * Transforms a range resource to a list of ranges
+ *
+ * @param res the mesos resource list
+ * @param name the name of the resource
+ * @return the list of ranges returned
+ */
+ protected def getRangeResource(res: JList[Resource], name: String): List[(Long, Long)] = {
+ // A resource can have multiple values in the offer since it can either be from
+ // a specific role or wildcard.
+ res.asScala.filter(_.getName == name).flatMap(_.getRanges.getRangeList.asScala
+ .map(r => (r.getBegin, r.getEnd)).toList).toList
+ }
+
+ /**
+ * Signal that the scheduler has registered with Mesos.
+ */
+ protected def markRegistered(): Unit = {
+ registerLatch.countDown()
+ }
+
+ protected def markErr(): Unit = {
+ registerLatch.countDown()
+ }
+
+ def createResource(name: String, amount: Double, role: Option[String] = None): Resource = {
+ val builder = Resource.newBuilder()
+ .setName(name)
+ .setType(Value.Type.SCALAR)
+ .setScalar(Value.Scalar.newBuilder().setValue(amount).build())
+
+ role.foreach { r => builder.setRole(r) }
+
+ builder.build()
+ }
+
+ /**
+ * Partition the existing set of resources into two groups, those remaining to be
+ * scheduled and those requested to be used for a new task.
+ *
+ * @param resources The full list of available resources
+ * @param resourceName The name of the resource to take from the available resources
+ * @param amountToUse The amount of resources to take from the available resources
+ * @return The remaining resources list and the used resources list.
+ */
+ def partitionResources(
+ resources: JList[Resource],
+ resourceName: String,
+ amountToUse: Double): (List[Resource], List[Resource]) = {
+ var remain = amountToUse
+ var requestedResources = new ArrayBuffer[Resource]
+ val remainingResources = resources.asScala.map {
+ case r =>
+ if (remain > 0 &&
+ r.getType == Value.Type.SCALAR &&
+ r.getScalar.getValue > 0.0 &&
+ r.getName == resourceName) {
+ val usage = Math.min(remain, r.getScalar.getValue)
+ requestedResources += createResource(resourceName, usage, Some(r.getRole))
+ remain -= usage
+ createResource(resourceName, r.getScalar.getValue - usage, Some(r.getRole))
+ } else {
+ r
+ }
+ }
+
+ // Filter any resource that has depleted.
+ val filteredResources =
+ remainingResources.filter(r => r.getType != Value.Type.SCALAR || r.getScalar.getValue > 0.0)
+
+ (filteredResources.toList, requestedResources.toList)
+ }
+
+ /** Helper method to get the key,value-set pair for a Mesos Attribute protobuf */
+ protected def getAttribute(attr: Attribute): (String, Set[String]) = {
+ (attr.getName, attr.getText.getValue.split(',').toSet)
+ }
+
+
+ /** Build a Mesos resource protobuf object */
+ protected def createResource(resourceName: String, quantity: Double): Protos.Resource = {
+ Resource.newBuilder()
+ .setName(resourceName)
+ .setType(Value.Type.SCALAR)
+ .setScalar(Value.Scalar.newBuilder().setValue(quantity).build())
+ .build()
+ }
+
+ /**
+ * Converts the attributes from the resource offer into a Map of name -> Attribute Value
+ * The attribute values are the mesos attribute types and they are
+ *
+ * @param offerAttributes the attributes offered
+ * @return
+ */
+ protected def toAttributeMap(offerAttributes: JList[Attribute]): Map[String, GeneratedMessage] = {
+ offerAttributes.asScala.map { attr =>
+ val attrValue = attr.getType match {
+ case Value.Type.SCALAR => attr.getScalar
+ case Value.Type.RANGES => attr.getRanges
+ case Value.Type.SET => attr.getSet
+ case Value.Type.TEXT => attr.getText
+ }
+ (attr.getName, attrValue)
+ }.toMap
+ }
+
+
+ /**
+ * Match the requirements (if any) to the offer attributes.
+ * if attribute requirements are not specified - return true
+ * else if attribute is defined and no values are given, simple attribute presence is performed
+ * else if attribute name and value is specified, subset match is performed on slave attributes
+ */
+ def matchesAttributeRequirements(
+ slaveOfferConstraints: Map[String, Set[String]],
+ offerAttributes: Map[String, GeneratedMessage]): Boolean = {
+ slaveOfferConstraints.forall {
+ // offer has the required attribute and subsumes the required values for that attribute
+ case (name, requiredValues) =>
+ offerAttributes.get(name) match {
+ case None => false
+ case Some(_) if requiredValues.isEmpty => true // empty value matches presence
+ case Some(scalarValue: Value.Scalar) =>
+ // check if provided values is less than equal to the offered values
+ requiredValues.map(_.toDouble).exists(_ <= scalarValue.getValue)
+ case Some(rangeValue: Value.Range) =>
+ val offerRange = rangeValue.getBegin to rangeValue.getEnd
+ // Check if there is some required value that is between the ranges specified
+ // Note: We only support the ability to specify discrete values, in the future
+ // we may expand it to subsume ranges specified with a XX..YY value or something
+ // similar to that.
+ requiredValues.map(_.toLong).exists(offerRange.contains(_))
+ case Some(offeredValue: Value.Set) =>
+ // check if the specified required values is a subset of offered set
+ requiredValues.subsetOf(offeredValue.getItemList.asScala.toSet)
+ case Some(textValue: Value.Text) =>
+ // check if the specified value is equal, if multiple values are specified
+ // we succeed if any of them match.
+ requiredValues.contains(textValue.getValue)
+ }
+ }
+ }
+
+ /**
+ * Parses the attributes constraints provided to spark and build a matching data struct:
+ * Map[<attribute-name>, Set[values-to-match]]
+ * The constraints are specified as ';' separated key-value pairs where keys and values
+ * are separated by ':'. The ':' implies equality (for singular values) and "is one of" for
+ * multiple values (comma separated). For example:
+ * {{{
+ * parseConstraintString("os:centos7;zone:us-east-1a,us-east-1b")
+ * // would result in
+ * <code>
+ * Map(
+ * "os" -> Set("centos7"),
+ * "zone": -> Set("us-east-1a", "us-east-1b")
+ * )
+ * }}}
+ *
+ * Mesos documentation: http://mesos.apache.org/documentation/attributes-resources/
+ * https://github.com/apache/mesos/blob/master/src/common/values.cpp
+ * https://github.com/apache/mesos/blob/master/src/common/attributes.cpp
+ *
+ * @param constraintsVal constaints string consisting of ';' separated key-value pairs (separated
+ * by ':')
+ * @return Map of constraints to match resources offers.
+ */
+ def parseConstraintString(constraintsVal: String): Map[String, Set[String]] = {
+ /*
+ Based on mesos docs:
+ attributes : attribute ( ";" attribute )*
+ attribute : labelString ":" ( labelString | "," )+
+ labelString : [a-zA-Z0-9_/.-]
+ */
+ val splitter = Splitter.on(';').trimResults().withKeyValueSeparator(':')
+ // kv splitter
+ if (constraintsVal.isEmpty) {
+ Map()
+ } else {
+ try {
+ splitter.split(constraintsVal).asScala.toMap.mapValues(v =>
+ if (v == null || v.isEmpty) {
+ Set[String]()
+ } else {
+ v.split(',').toSet
+ }
+ )
+ } catch {
+ case NonFatal(e) =>
+ throw new IllegalArgumentException(s"Bad constraint string: $constraintsVal", e)
+ }
+ }
+ }
+
+ // These defaults copied from YARN
+ private val MEMORY_OVERHEAD_FRACTION = 0.10
+ private val MEMORY_OVERHEAD_MINIMUM = 384
+
+ /**
+ * Return the amount of memory to allocate to each executor, taking into account
+ * container overheads.
+ *
+ * @param sc SparkContext to use to get `spark.mesos.executor.memoryOverhead` value
+ * @return memory requirement as (0.1 * <memoryOverhead>) or MEMORY_OVERHEAD_MINIMUM
+ * (whichever is larger)
+ */
+ def executorMemory(sc: SparkContext): Int = {
+ sc.conf.getInt("spark.mesos.executor.memoryOverhead",
+ math.max(MEMORY_OVERHEAD_FRACTION * sc.executorMemory, MEMORY_OVERHEAD_MINIMUM).toInt) +
+ sc.executorMemory
+ }
+
+ def setupUris(uris: String,
+ builder: CommandInfo.Builder,
+ useFetcherCache: Boolean = false): Unit = {
+ uris.split(",").foreach { uri =>
+ builder.addUris(CommandInfo.URI.newBuilder().setValue(uri.trim()).setCache(useFetcherCache))
+ }
+ }
+
+ protected def getRejectOfferDurationForUnmetConstraints(sc: SparkContext): Long = {
+ sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s")
+ }
+
+ protected def getRejectOfferDurationForReachedMaxCores(sc: SparkContext): Long = {
+ sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", "120s")
+ }
+
+ /**
+ * Checks executor ports if they are within some range of the offered list of ports ranges,
+ *
+ * @param conf the Spark Config
+ * @param ports the list of ports to check
+ * @return true if ports are within range false otherwise
+ */
+ protected def checkPorts(conf: SparkConf, ports: List[(Long, Long)]): Boolean = {
+
+ def checkIfInRange(port: Long, ps: List[(Long, Long)]): Boolean = {
+ ps.exists{case (rangeStart, rangeEnd) => rangeStart <= port & rangeEnd >= port }
+ }
+
+ val portsToCheck = nonZeroPortValuesFromConfig(conf)
+ val withinRange = portsToCheck.forall(p => checkIfInRange(p, ports))
+ // make sure we have enough ports to allocate per offer
+ val enoughPorts =
+ ports.map{case (rangeStart, rangeEnd) => rangeEnd - rangeStart + 1}.sum >= portsToCheck.size
+ enoughPorts && withinRange
+ }
+
+ /**
+ * Partitions port resources.
+ *
+ * @param requestedPorts non-zero ports to assign
+ * @param offeredResources the resources offered
+ * @return resources left, port resources to be used.
+ */
+ def partitionPortResources(requestedPorts: List[Long], offeredResources: List[Resource])
+ : (List[Resource], List[Resource]) = {
+ if (requestedPorts.isEmpty) {
+ (offeredResources, List[Resource]())
+ } else {
+ // partition port offers
+ val (resourcesWithoutPorts, portResources) = filterPortResources(offeredResources)
+
+ val portsAndRoles = requestedPorts.
+ map(x => (x, findPortAndGetAssignedRangeRole(x, portResources)))
+
+ val assignedPortResources = createResourcesFromPorts(portsAndRoles)
+
+ // ignore non-assigned port resources, they will be declined implicitly by mesos
+ // no need for splitting port resources.
+ (resourcesWithoutPorts, assignedPortResources)
+ }
+ }
+
+ val managedPortNames = List("spark.executor.port", BLOCK_MANAGER_PORT.key)
+
+ /**
+ * The values of the non-zero ports to be used by the executor process.
+ * @param conf the spark config to use
+ * @return the ono-zero values of the ports
+ */
+ def nonZeroPortValuesFromConfig(conf: SparkConf): List[Long] = {
+ managedPortNames.map(conf.getLong(_, 0)).filter( _ != 0)
+ }
+
+ /** Creates a mesos resource for a specific port number. */
+ private def createResourcesFromPorts(portsAndRoles: List[(Long, String)]) : List[Resource] = {
+ portsAndRoles.flatMap{ case (port, role) =>
+ createMesosPortResource(List((port, port)), Some(role))}
+ }
+
+ /** Helper to create mesos resources for specific port ranges. */
+ private def createMesosPortResource(
+ ranges: List[(Long, Long)],
+ role: Option[String] = None): List[Resource] = {
+ ranges.map { case (rangeStart, rangeEnd) =>
+ val rangeValue = Value.Range.newBuilder()
+ .setBegin(rangeStart)
+ .setEnd(rangeEnd)
+ val builder = Resource.newBuilder()
+ .setName("ports")
+ .setType(Value.Type.RANGES)
+ .setRanges(Value.Ranges.newBuilder().addRange(rangeValue))
+ role.foreach(r => builder.setRole(r))
+ builder.build()
+ }
+ }
+
+ /**
+ * Helper to assign a port to an offered range and get the latter's role
+ * info to use it later on.
+ */
+ private def findPortAndGetAssignedRangeRole(port: Long, portResources: List[Resource])
+ : String = {
+
+ val ranges = portResources.
+ map(resource =>
+ (resource.getRole, resource.getRanges.getRangeList.asScala
+ .map(r => (r.getBegin, r.getEnd)).toList))
+
+ val rangePortRole = ranges
+ .find { case (role, rangeList) => rangeList
+ .exists{ case (rangeStart, rangeEnd) => rangeStart <= port & rangeEnd >= port}}
+ // this is safe since we have previously checked about the ranges (see checkPorts method)
+ rangePortRole.map{ case (role, rangeList) => role}.get
+ }
+
+ /** Retrieves the port resources from a list of mesos offered resources */
+ private def filterPortResources(resources: List[Resource]): (List[Resource], List[Resource]) = {
+ resources.partition { r => !(r.getType == Value.Type.RANGES && r.getName == "ports") }
+ }
+
+ /**
+ * spark.mesos.driver.frameworkId is set by the cluster dispatcher to correlate driver
+ * submissions with frameworkIDs. However, this causes issues when a driver process launches
+ * more than one framework (more than one SparkContext(, because they all try to register with
+ * the same frameworkID. To enforce that only the first driver registers with the configured
+ * framework ID, the driver calls this method after the first registration.
+ */
+ def unsetFrameworkID(sc: SparkContext) {
+ sc.conf.remove("spark.mesos.driver.frameworkId")
+ System.clearProperty("spark.mesos.driver.frameworkId")
+ }
+
+ def mesosToTaskState(state: MesosTaskState): TaskState.TaskState = state match {
+ case MesosTaskState.TASK_STAGING | MesosTaskState.TASK_STARTING => TaskState.LAUNCHING
+ case MesosTaskState.TASK_RUNNING | MesosTaskState.TASK_KILLING => TaskState.RUNNING
+ case MesosTaskState.TASK_FINISHED => TaskState.FINISHED
+ case MesosTaskState.TASK_FAILED => TaskState.FAILED
+ case MesosTaskState.TASK_KILLED => TaskState.KILLED
+ case MesosTaskState.TASK_LOST | MesosTaskState.TASK_ERROR => TaskState.LOST
+ }
+
+ def taskStateToMesos(state: TaskState.TaskState): MesosTaskState = state match {
+ case TaskState.LAUNCHING => MesosTaskState.TASK_STARTING
+ case TaskState.RUNNING => MesosTaskState.TASK_RUNNING
+ case TaskState.FINISHED => MesosTaskState.TASK_FINISHED
+ case TaskState.FAILED => MesosTaskState.TASK_FAILED
+ case TaskState.KILLED => MesosTaskState.TASK_KILLED
+ case TaskState.LOST => MesosTaskState.TASK_LOST
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala
new file mode 100644
index 0000000..8370b61
--- /dev/null
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.nio.ByteBuffer
+
+import org.apache.mesos.protobuf.ByteString
+
+import org.apache.spark.internal.Logging
+
+/**
+ * Wrapper for serializing the data sent when launching Mesos tasks.
+ */
+private[spark] case class MesosTaskLaunchData(
+ serializedTask: ByteBuffer,
+ attemptNumber: Int) extends Logging {
+
+ def toByteString: ByteString = {
+ val dataBuffer = ByteBuffer.allocate(4 + serializedTask.limit)
+ dataBuffer.putInt(attemptNumber)
+ dataBuffer.put(serializedTask)
+ dataBuffer.rewind
+ logDebug(s"ByteBuffer size: [${dataBuffer.remaining}]")
+ ByteString.copyFrom(dataBuffer)
+ }
+}
+
+private[spark] object MesosTaskLaunchData extends Logging {
+ def fromByteString(byteString: ByteString): MesosTaskLaunchData = {
+ val byteBuffer = byteString.asReadOnlyByteBuffer()
+ logDebug(s"ByteBuffer size: [${byteBuffer.remaining}]")
+ val attemptNumber = byteBuffer.getInt // updates the position by 4 bytes
+ val serializedTask = byteBuffer.slice() // subsequence starting at the current position
+ MesosTaskLaunchData(serializedTask, attemptNumber)
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/test/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArgumentsSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArgumentsSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArgumentsSuite.scala
new file mode 100644
index 0000000..33e7d69
--- /dev/null
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArgumentsSuite.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.mesos
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.TestPrematureExit
+
+class MesosClusterDispatcherArgumentsSuite extends SparkFunSuite
+ with TestPrematureExit {
+
+ test("test if spark config args are passed sucessfully") {
+ val args = Array[String]("--master", "mesos://localhost:5050", "--conf", "key1=value1",
+ "--conf", "spark.mesos.key2=value2", "--verbose")
+ val conf = new SparkConf()
+ new MesosClusterDispatcherArguments(args, conf)
+
+ assert(conf.getOption("key1").isEmpty)
+ assert(conf.get("spark.mesos.key2") == "value2")
+ }
+
+ test("test non conf settings") {
+ val masterUrl = "mesos://localhost:5050"
+ val port = "1212"
+ val zookeeperUrl = "zk://localhost:2181"
+ val host = "localhost"
+ val webUiPort = "2323"
+ val name = "myFramework"
+
+ val args1 = Array("--master", masterUrl, "--verbose", "--name", name)
+ val args2 = Array("-p", port, "-h", host, "-z", zookeeperUrl)
+ val args3 = Array("--webui-port", webUiPort)
+
+ val args = args1 ++ args2 ++ args3
+ val conf = new SparkConf()
+ val mesosDispClusterArgs = new MesosClusterDispatcherArguments(args, conf)
+
+ assert(mesosDispClusterArgs.verbose)
+ assert(mesosDispClusterArgs.confProperties.isEmpty)
+ assert(mesosDispClusterArgs.host == host)
+ assert(Option(mesosDispClusterArgs.masterUrl).isDefined)
+ assert(mesosDispClusterArgs.masterUrl == masterUrl.stripPrefix("mesos://"))
+ assert(Option(mesosDispClusterArgs.zookeeperUrl).isDefined)
+ assert(mesosDispClusterArgs.zookeeperUrl == Some(zookeeperUrl))
+ assert(mesosDispClusterArgs.name == name)
+ assert(mesosDispClusterArgs.webUiPort == webUiPort.toInt)
+ assert(mesosDispClusterArgs.port == port.toInt)
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/test/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherSuite.scala
new file mode 100644
index 0000000..7484e3b
--- /dev/null
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherSuite.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.mesos
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.deploy.TestPrematureExit
+
+class MesosClusterDispatcherSuite extends SparkFunSuite
+ with TestPrematureExit{
+
+ test("prints usage on empty input") {
+ testPrematureExit(Array[String](),
+ "Usage: MesosClusterDispatcher", MesosClusterDispatcher)
+ }
+
+ test("prints usage with only --help") {
+ testPrematureExit(Array("--help"),
+ "Usage: MesosClusterDispatcher", MesosClusterDispatcher)
+ }
+
+ test("prints error with unrecognized options") {
+ testPrematureExit(Array("--blarg"), "Unrecognized option: '--blarg'", MesosClusterDispatcher)
+ testPrematureExit(Array("-bleg"), "Unrecognized option: '-bleg'", MesosClusterDispatcher)
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManagerSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManagerSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManagerSuite.scala
new file mode 100644
index 0000000..a558554
--- /dev/null
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManagerSuite.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import org.apache.spark._
+import org.apache.spark.internal.config._
+
+class MesosClusterManagerSuite extends SparkFunSuite with LocalSparkContext {
+ def testURL(masterURL: String, expectedClass: Class[_], coarse: Boolean) {
+ val conf = new SparkConf().set("spark.mesos.coarse", coarse.toString)
+ sc = new SparkContext("local", "test", conf)
+ val clusterManager = new MesosClusterManager()
+
+ assert(clusterManager.canCreate(masterURL))
+ val taskScheduler = clusterManager.createTaskScheduler(sc, masterURL)
+ val sched = clusterManager.createSchedulerBackend(sc, masterURL, taskScheduler)
+ assert(sched.getClass === expectedClass)
+ }
+
+ test("mesos fine-grained") {
+ testURL("mesos://localhost:1234", classOf[MesosFineGrainedSchedulerBackend], coarse = false)
+ }
+
+ test("mesos coarse-grained") {
+ testURL("mesos://localhost:1234", classOf[MesosCoarseGrainedSchedulerBackend], coarse = true)
+ }
+
+ test("mesos with zookeeper") {
+ testURL("mesos://zk://localhost:1234,localhost:2345",
+ classOf[MesosFineGrainedSchedulerBackend],
+ coarse = false)
+ }
+
+ test("mesos with i/o encryption throws error") {
+ val se = intercept[SparkException] {
+ val conf = new SparkConf().setAppName("test").set(IO_ENCRYPTION_ENABLED, true)
+ sc = new SparkContext("mesos", "test", conf)
+ }
+ assert(se.getCause().isInstanceOf[IllegalArgumentException])
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
new file mode 100644
index 0000000..74e5ce2
--- /dev/null
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.util.{Collection, Collections, Date}
+
+import scala.collection.JavaConverters._
+
+import org.apache.mesos.Protos._
+import org.apache.mesos.Protos.Value.{Scalar, Type}
+import org.apache.mesos.SchedulerDriver
+import org.mockito.{ArgumentCaptor, Matchers}
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.Command
+import org.apache.spark.deploy.mesos.MesosDriverDescription
+
+class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar {
+
+ private val command = new Command("mainClass", Seq("arg"), Map(), Seq(), Seq(), Seq())
+ private var driver: SchedulerDriver = _
+ private var scheduler: MesosClusterScheduler = _
+
+ private def setScheduler(sparkConfVars: Map[String, String] = null): Unit = {
+ val conf = new SparkConf()
+ conf.setMaster("mesos://localhost:5050")
+ conf.setAppName("spark mesos")
+
+ if (sparkConfVars != null) {
+ conf.setAll(sparkConfVars)
+ }
+
+ driver = mock[SchedulerDriver]
+ scheduler = new MesosClusterScheduler(
+ new BlackHoleMesosClusterPersistenceEngineFactory, conf) {
+ override def start(): Unit = { ready = true }
+ }
+ scheduler.start()
+ }
+
+ test("can queue drivers") {
+ setScheduler()
+
+ val response = scheduler.submitDriver(
+ new MesosDriverDescription("d1", "jar", 1000, 1, true,
+ command, Map[String, String](), "s1", new Date()))
+ assert(response.success)
+ val response2 =
+ scheduler.submitDriver(new MesosDriverDescription(
+ "d1", "jar", 1000, 1, true, command, Map[String, String](), "s2", new Date()))
+ assert(response2.success)
+ val state = scheduler.getSchedulerState()
+ val queuedDrivers = state.queuedDrivers.toList
+ assert(queuedDrivers(0).submissionId == response.submissionId)
+ assert(queuedDrivers(1).submissionId == response2.submissionId)
+ }
+
+ test("can kill queued drivers") {
+ setScheduler()
+
+ val response = scheduler.submitDriver(
+ new MesosDriverDescription("d1", "jar", 1000, 1, true,
+ command, Map[String, String](), "s1", new Date()))
+ assert(response.success)
+ val killResponse = scheduler.killDriver(response.submissionId)
+ assert(killResponse.success)
+ val state = scheduler.getSchedulerState()
+ assert(state.queuedDrivers.isEmpty)
+ }
+
+ test("can handle multiple roles") {
+ setScheduler()
+
+ val driver = mock[SchedulerDriver]
+ val response = scheduler.submitDriver(
+ new MesosDriverDescription("d1", "jar", 1200, 1.5, true,
+ command,
+ Map(("spark.mesos.executor.home", "test"), ("spark.app.name", "test")),
+ "s1",
+ new Date()))
+ assert(response.success)
+ val offer = Offer.newBuilder()
+ .addResources(
+ Resource.newBuilder().setRole("*")
+ .setScalar(Scalar.newBuilder().setValue(1).build()).setName("cpus").setType(Type.SCALAR))
+ .addResources(
+ Resource.newBuilder().setRole("*")
+ .setScalar(Scalar.newBuilder().setValue(1000).build())
+ .setName("mem")
+ .setType(Type.SCALAR))
+ .addResources(
+ Resource.newBuilder().setRole("role2")
+ .setScalar(Scalar.newBuilder().setValue(1).build()).setName("cpus").setType(Type.SCALAR))
+ .addResources(
+ Resource.newBuilder().setRole("role2")
+ .setScalar(Scalar.newBuilder().setValue(500).build()).setName("mem").setType(Type.SCALAR))
+ .setId(OfferID.newBuilder().setValue("o1").build())
+ .setFrameworkId(FrameworkID.newBuilder().setValue("f1").build())
+ .setSlaveId(SlaveID.newBuilder().setValue("s1").build())
+ .setHostname("host1")
+ .build()
+
+ val capture = ArgumentCaptor.forClass(classOf[Collection[TaskInfo]])
+
+ when(
+ driver.launchTasks(
+ Matchers.eq(Collections.singleton(offer.getId)),
+ capture.capture())
+ ).thenReturn(Status.valueOf(1))
+
+ scheduler.resourceOffers(driver, Collections.singletonList(offer))
+
+ val taskInfos = capture.getValue
+ assert(taskInfos.size() == 1)
+ val taskInfo = taskInfos.iterator().next()
+ val resources = taskInfo.getResourcesList
+ assert(scheduler.getResource(resources, "cpus") == 1.5)
+ assert(scheduler.getResource(resources, "mem") == 1200)
+ val resourcesSeq: Seq[Resource] = resources.asScala
+ val cpus = resourcesSeq.filter(_.getName.equals("cpus")).toList
+ assert(cpus.size == 2)
+ assert(cpus.exists(_.getRole().equals("role2")))
+ assert(cpus.exists(_.getRole().equals("*")))
+ val mem = resourcesSeq.filter(_.getName.equals("mem")).toList
+ assert(mem.size == 2)
+ assert(mem.exists(_.getRole().equals("role2")))
+ assert(mem.exists(_.getRole().equals("*")))
+
+ verify(driver, times(1)).launchTasks(
+ Matchers.eq(Collections.singleton(offer.getId)),
+ capture.capture()
+ )
+ }
+
+ test("escapes commandline args for the shell") {
+ setScheduler()
+
+ val conf = new SparkConf()
+ conf.setMaster("mesos://localhost:5050")
+ conf.setAppName("spark mesos")
+ val scheduler = new MesosClusterScheduler(
+ new BlackHoleMesosClusterPersistenceEngineFactory, conf) {
+ override def start(): Unit = { ready = true }
+ }
+ val escape = scheduler.shellEscape _
+ def wrapped(str: String): String = "\"" + str + "\""
+
+ // Wrapped in quotes
+ assert(escape("'should be left untouched'") === "'should be left untouched'")
+ assert(escape("\"should be left untouched\"") === "\"should be left untouched\"")
+
+ // Harmless
+ assert(escape("") === "")
+ assert(escape("harmless") === "harmless")
+ assert(escape("har-m.l3ss") === "har-m.l3ss")
+
+ // Special Chars escape
+ assert(escape("should escape this \" quote") === wrapped("should escape this \\\" quote"))
+ assert(escape("shouldescape\"quote") === wrapped("shouldescape\\\"quote"))
+ assert(escape("should escape this $ dollar") === wrapped("should escape this \\$ dollar"))
+ assert(escape("should escape this ` backtick") === wrapped("should escape this \\` backtick"))
+ assert(escape("""should escape this \ backslash""")
+ === wrapped("""should escape this \\ backslash"""))
+ assert(escape("""\"?""") === wrapped("""\\\"?"""))
+
+
+ // Special Chars no escape only wrap
+ List(" ", "'", "<", ">", "&", "|", "?", "*", ";", "!", "#", "(", ")").foreach(char => {
+ assert(escape(s"onlywrap${char}this") === wrapped(s"onlywrap${char}this"))
+ })
+ }
+
+ test("supports spark.mesos.driverEnv.*") {
+ setScheduler()
+
+ val mem = 1000
+ val cpu = 1
+
+ val response = scheduler.submitDriver(
+ new MesosDriverDescription("d1", "jar", mem, cpu, true,
+ command,
+ Map("spark.mesos.executor.home" -> "test",
+ "spark.app.name" -> "test",
+ "spark.mesos.driverEnv.TEST_ENV" -> "TEST_VAL"),
+ "s1",
+ new Date()))
+ assert(response.success)
+
+ val offer = Utils.createOffer("o1", "s1", mem, cpu)
+ scheduler.resourceOffers(driver, List(offer).asJava)
+ val tasks = Utils.verifyTaskLaunched(driver, "o1")
+ val env = tasks.head.getCommand.getEnvironment.getVariablesList.asScala.map(v =>
+ (v.getName, v.getValue)).toMap
+ assert(env.getOrElse("TEST_ENV", null) == "TEST_VAL")
+ }
+
+ test("supports spark.mesos.network.name") {
+ setScheduler()
+
+ val mem = 1000
+ val cpu = 1
+
+ val response = scheduler.submitDriver(
+ new MesosDriverDescription("d1", "jar", mem, cpu, true,
+ command,
+ Map("spark.mesos.executor.home" -> "test",
+ "spark.app.name" -> "test",
+ "spark.mesos.network.name" -> "test-network-name"),
+ "s1",
+ new Date()))
+
+ assert(response.success)
+
+ val offer = Utils.createOffer("o1", "s1", mem, cpu)
+ scheduler.resourceOffers(driver, List(offer).asJava)
+
+ val launchedTasks = Utils.verifyTaskLaunched(driver, "o1")
+ val networkInfos = launchedTasks.head.getContainer.getNetworkInfosList
+ assert(networkInfos.size == 1)
+ assert(networkInfos.get(0).getName == "test-network-name")
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
new file mode 100644
index 0000000..a674da4
--- /dev/null
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration._
+import scala.concurrent.Promise
+import scala.reflect.ClassTag
+
+import org.apache.mesos.{Protos, Scheduler, SchedulerDriver}
+import org.apache.mesos.Protos._
+import org.mockito.Matchers
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.internal.config._
+import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.scheduler.cluster.mesos.Utils._
+
+class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
+ with LocalSparkContext
+ with MockitoSugar
+ with BeforeAndAfter
+ with ScalaFutures {
+
+ private var sparkConf: SparkConf = _
+ private var driver: SchedulerDriver = _
+ private var taskScheduler: TaskSchedulerImpl = _
+ private var backend: MesosCoarseGrainedSchedulerBackend = _
+ private var externalShuffleClient: MesosExternalShuffleClient = _
+ private var driverEndpoint: RpcEndpointRef = _
+ @volatile private var stopCalled = false
+
+ // All 'requests' to the scheduler run immediately on the same thread, so
+ // demand that all futures have their value available immediately.
+ implicit override val patienceConfig = PatienceConfig(timeout = Duration(0, TimeUnit.SECONDS))
+
+ test("mesos supports killing and limiting executors") {
+ setBackend()
+ sparkConf.set("spark.driver.host", "driverHost")
+ sparkConf.set("spark.driver.port", "1234")
+
+ val minMem = backend.executorMemory(sc)
+ val minCpu = 4
+ val offers = List(Resources(minMem, minCpu))
+
+ // launches a task on a valid offer
+ offerResources(offers)
+ verifyTaskLaunched(driver, "o1")
+
+ // kills executors
+ assert(backend.doRequestTotalExecutors(0).futureValue)
+ assert(backend.doKillExecutors(Seq("0")).futureValue)
+ val taskID0 = createTaskId("0")
+ verify(driver, times(1)).killTask(taskID0)
+
+ // doesn't launch a new task when requested executors == 0
+ offerResources(offers, 2)
+ verifyDeclinedOffer(driver, createOfferId("o2"))
+
+ // Launches a new task when requested executors is positive
+ backend.doRequestTotalExecutors(2)
+ offerResources(offers, 2)
+ verifyTaskLaunched(driver, "o2")
+ }
+
+ test("mesos supports killing and relaunching tasks with executors") {
+ setBackend()
+
+ // launches a task on a valid offer
+ val minMem = backend.executorMemory(sc) + 1024
+ val minCpu = 4
+ val offer1 = Resources(minMem, minCpu)
+ val offer2 = Resources(minMem, 1)
+ offerResources(List(offer1, offer2))
+ verifyTaskLaunched(driver, "o1")
+
+ // accounts for a killed task
+ val status = createTaskStatus("0", "s1", TaskState.TASK_KILLED)
+ backend.statusUpdate(driver, status)
+ verify(driver, times(1)).reviveOffers()
+
+ // Launches a new task on a valid offer from the same slave
+ offerResources(List(offer2))
+ verifyTaskLaunched(driver, "o2")
+ }
+
+ test("mesos supports spark.executor.cores") {
+ val executorCores = 4
+ setBackend(Map("spark.executor.cores" -> executorCores.toString))
+
+ val executorMemory = backend.executorMemory(sc)
+ val offers = List(Resources(executorMemory * 2, executorCores + 1))
+ offerResources(offers)
+
+ val taskInfos = verifyTaskLaunched(driver, "o1")
+ assert(taskInfos.length == 1)
+
+ val cpus = backend.getResource(taskInfos.head.getResourcesList, "cpus")
+ assert(cpus == executorCores)
+ }
+
+ test("mesos supports unset spark.executor.cores") {
+ setBackend()
+
+ val executorMemory = backend.executorMemory(sc)
+ val offerCores = 10
+ offerResources(List(Resources(executorMemory * 2, offerCores)))
+
+ val taskInfos = verifyTaskLaunched(driver, "o1")
+ assert(taskInfos.length == 1)
+
+ val cpus = backend.getResource(taskInfos.head.getResourcesList, "cpus")
+ assert(cpus == offerCores)
+ }
+
+ test("mesos does not acquire more than spark.cores.max") {
+ val maxCores = 10
+ setBackend(Map("spark.cores.max" -> maxCores.toString))
+
+ val executorMemory = backend.executorMemory(sc)
+ offerResources(List(Resources(executorMemory, maxCores + 1)))
+
+ val taskInfos = verifyTaskLaunched(driver, "o1")
+ assert(taskInfos.length == 1)
+
+ val cpus = backend.getResource(taskInfos.head.getResourcesList, "cpus")
+ assert(cpus == maxCores)
+ }
+
+ test("mesos does not acquire gpus if not specified") {
+ setBackend()
+
+ val executorMemory = backend.executorMemory(sc)
+ offerResources(List(Resources(executorMemory, 1, 1)))
+
+ val taskInfos = verifyTaskLaunched(driver, "o1")
+ assert(taskInfos.length == 1)
+
+ val gpus = backend.getResource(taskInfos.head.getResourcesList, "gpus")
+ assert(gpus == 0.0)
+ }
+
+
+ test("mesos does not acquire more than spark.mesos.gpus.max") {
+ val maxGpus = 5
+ setBackend(Map("spark.mesos.gpus.max" -> maxGpus.toString))
+
+ val executorMemory = backend.executorMemory(sc)
+ offerResources(List(Resources(executorMemory, 1, maxGpus + 1)))
+
+ val taskInfos = verifyTaskLaunched(driver, "o1")
+ assert(taskInfos.length == 1)
+
+ val gpus = backend.getResource(taskInfos.head.getResourcesList, "gpus")
+ assert(gpus == maxGpus)
+ }
+
+
+ test("mesos declines offers that violate attribute constraints") {
+ setBackend(Map("spark.mesos.constraints" -> "x:true"))
+ offerResources(List(Resources(backend.executorMemory(sc), 4)))
+ verifyDeclinedOffer(driver, createOfferId("o1"), true)
+ }
+
+ test("mesos declines offers with a filter when reached spark.cores.max") {
+ val maxCores = 3
+ setBackend(Map("spark.cores.max" -> maxCores.toString))
+
+ val executorMemory = backend.executorMemory(sc)
+ offerResources(List(
+ Resources(executorMemory, maxCores + 1),
+ Resources(executorMemory, maxCores + 1)))
+
+ verifyTaskLaunched(driver, "o1")
+ verifyDeclinedOffer(driver, createOfferId("o2"), true)
+ }
+
+ test("mesos assigns tasks round-robin on offers") {
+ val executorCores = 4
+ val maxCores = executorCores * 2
+ setBackend(Map("spark.executor.cores" -> executorCores.toString,
+ "spark.cores.max" -> maxCores.toString))
+
+ val executorMemory = backend.executorMemory(sc)
+ offerResources(List(
+ Resources(executorMemory * 2, executorCores * 2),
+ Resources(executorMemory * 2, executorCores * 2)))
+
+ verifyTaskLaunched(driver, "o1")
+ verifyTaskLaunched(driver, "o2")
+ }
+
+ test("mesos creates multiple executors on a single slave") {
+ val executorCores = 4
+ setBackend(Map("spark.executor.cores" -> executorCores.toString))
+
+ // offer with room for two executors
+ val executorMemory = backend.executorMemory(sc)
+ offerResources(List(Resources(executorMemory * 2, executorCores * 2)))
+
+ // verify two executors were started on a single offer
+ val taskInfos = verifyTaskLaunched(driver, "o1")
+ assert(taskInfos.length == 2)
+ }
+
+ test("mesos doesn't register twice with the same shuffle service") {
+ setBackend(Map("spark.shuffle.service.enabled" -> "true"))
+ val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+ val offer1 = createOffer("o1", "s1", mem, cpu)
+ backend.resourceOffers(driver, List(offer1).asJava)
+ verifyTaskLaunched(driver, "o1")
+
+ val offer2 = createOffer("o2", "s1", mem, cpu)
+ backend.resourceOffers(driver, List(offer2).asJava)
+ verifyTaskLaunched(driver, "o2")
+
+ val status1 = createTaskStatus("0", "s1", TaskState.TASK_RUNNING)
+ backend.statusUpdate(driver, status1)
+
+ val status2 = createTaskStatus("1", "s1", TaskState.TASK_RUNNING)
+ backend.statusUpdate(driver, status2)
+ verify(externalShuffleClient, times(1))
+ .registerDriverWithShuffleService(anyString, anyInt, anyLong, anyLong)
+ }
+
+ test("Port offer decline when there is no appropriate range") {
+ setBackend(Map(BLOCK_MANAGER_PORT.key -> "30100"))
+ val offeredPorts = (31100L, 31200L)
+ val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+ val offer1 = createOffer("o1", "s1", mem, cpu, Some(offeredPorts))
+ backend.resourceOffers(driver, List(offer1).asJava)
+ verify(driver, times(1)).declineOffer(offer1.getId)
+ }
+
+ test("Port offer accepted when ephemeral ports are used") {
+ setBackend()
+ val offeredPorts = (31100L, 31200L)
+ val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+ val offer1 = createOffer("o1", "s1", mem, cpu, Some(offeredPorts))
+ backend.resourceOffers(driver, List(offer1).asJava)
+ verifyTaskLaunched(driver, "o1")
+ }
+
+ test("Port offer accepted with user defined port numbers") {
+ val port = 30100
+ setBackend(Map(BLOCK_MANAGER_PORT.key -> s"$port"))
+ val offeredPorts = (30000L, 31000L)
+ val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+ val offer1 = createOffer("o1", "s1", mem, cpu, Some(offeredPorts))
+ backend.resourceOffers(driver, List(offer1).asJava)
+ val taskInfo = verifyTaskLaunched(driver, "o1")
+
+ val taskPortResources = taskInfo.head.getResourcesList.asScala.
+ find(r => r.getType == Value.Type.RANGES && r.getName == "ports")
+
+ val isPortInOffer = (r: Resource) => {
+ r.getRanges().getRangeList
+ .asScala.exists(range => range.getBegin == port && range.getEnd == port)
+ }
+ assert(taskPortResources.exists(isPortInOffer))
+ }
+
+ test("mesos kills an executor when told") {
+ setBackend()
+
+ val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+ val offer1 = createOffer("o1", "s1", mem, cpu)
+ backend.resourceOffers(driver, List(offer1).asJava)
+ verifyTaskLaunched(driver, "o1")
+
+ backend.doKillExecutors(List("0"))
+ verify(driver, times(1)).killTask(createTaskId("0"))
+ }
+
+ test("weburi is set in created scheduler driver") {
+ setBackend()
+ val taskScheduler = mock[TaskSchedulerImpl]
+ when(taskScheduler.sc).thenReturn(sc)
+ val driver = mock[SchedulerDriver]
+ when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
+ val securityManager = mock[SecurityManager]
+
+ val backend = new MesosCoarseGrainedSchedulerBackend(
+ taskScheduler, sc, "master", securityManager) {
+ override protected def createSchedulerDriver(
+ masterUrl: String,
+ scheduler: Scheduler,
+ sparkUser: String,
+ appName: String,
+ conf: SparkConf,
+ webuiUrl: Option[String] = None,
+ checkpoint: Option[Boolean] = None,
+ failoverTimeout: Option[Double] = None,
+ frameworkId: Option[String] = None): SchedulerDriver = {
+ markRegistered()
+ assert(webuiUrl.isDefined)
+ assert(webuiUrl.get.equals("http://webui"))
+ driver
+ }
+ }
+
+ backend.start()
+ }
+
+ test("honors unset spark.mesos.containerizer") {
+ setBackend(Map("spark.mesos.executor.docker.image" -> "test"))
+
+ val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+ val offer1 = createOffer("o1", "s1", mem, cpu)
+ backend.resourceOffers(driver, List(offer1).asJava)
+
+ val taskInfos = verifyTaskLaunched(driver, "o1")
+ assert(taskInfos.head.getContainer.getType == ContainerInfo.Type.DOCKER)
+ }
+
+ test("honors spark.mesos.containerizer=\"mesos\"") {
+ setBackend(Map(
+ "spark.mesos.executor.docker.image" -> "test",
+ "spark.mesos.containerizer" -> "mesos"))
+
+ val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+ val offer1 = createOffer("o1", "s1", mem, cpu)
+ backend.resourceOffers(driver, List(offer1).asJava)
+
+ val taskInfos = verifyTaskLaunched(driver, "o1")
+ assert(taskInfos.head.getContainer.getType == ContainerInfo.Type.MESOS)
+ }
+
+ test("docker settings are reflected in created tasks") {
+ setBackend(Map(
+ "spark.mesos.executor.docker.image" -> "some_image",
+ "spark.mesos.executor.docker.forcePullImage" -> "true",
+ "spark.mesos.executor.docker.volumes" -> "/host_vol:/container_vol:ro",
+ "spark.mesos.executor.docker.portmaps" -> "8080:80:tcp"
+ ))
+
+ val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+ val offer1 = createOffer("o1", "s1", mem, cpu)
+ backend.resourceOffers(driver, List(offer1).asJava)
+
+ val launchedTasks = verifyTaskLaunched(driver, "o1")
+ assert(launchedTasks.size == 1)
+
+ val containerInfo = launchedTasks.head.getContainer
+ assert(containerInfo.getType == ContainerInfo.Type.DOCKER)
+
+ val volumes = containerInfo.getVolumesList.asScala
+ assert(volumes.size == 1)
+
+ val volume = volumes.head
+ assert(volume.getHostPath == "/host_vol")
+ assert(volume.getContainerPath == "/container_vol")
+ assert(volume.getMode == Volume.Mode.RO)
+
+ val dockerInfo = containerInfo.getDocker
+
+ val portMappings = dockerInfo.getPortMappingsList.asScala
+ assert(portMappings.size == 1)
+
+ val portMapping = portMappings.head
+ assert(portMapping.getHostPort == 8080)
+ assert(portMapping.getContainerPort == 80)
+ assert(portMapping.getProtocol == "tcp")
+ }
+
+ test("force-pull-image option is disabled by default") {
+ setBackend(Map(
+ "spark.mesos.executor.docker.image" -> "some_image"
+ ))
+
+ val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+ val offer1 = createOffer("o1", "s1", mem, cpu)
+ backend.resourceOffers(driver, List(offer1).asJava)
+
+ val launchedTasks = verifyTaskLaunched(driver, "o1")
+ assert(launchedTasks.size == 1)
+
+ val containerInfo = launchedTasks.head.getContainer
+ assert(containerInfo.getType == ContainerInfo.Type.DOCKER)
+
+ val dockerInfo = containerInfo.getDocker
+
+ assert(dockerInfo.getImage == "some_image")
+ assert(!dockerInfo.getForcePullImage)
+ }
+
+ test("Do not call removeExecutor() after backend is stopped") {
+ setBackend()
+
+ // launches a task on a valid offer
+ val offers = List(Resources(backend.executorMemory(sc), 1))
+ offerResources(offers)
+ verifyTaskLaunched(driver, "o1")
+
+ // launches a thread simulating status update
+ val statusUpdateThread = new Thread {
+ override def run(): Unit = {
+ while (!stopCalled) {
+ Thread.sleep(100)
+ }
+
+ val status = createTaskStatus("0", "s1", TaskState.TASK_FINISHED)
+ backend.statusUpdate(driver, status)
+ }
+ }.start
+
+ backend.stop()
+ // Any method of the backend involving sending messages to the driver endpoint should not
+ // be called after the backend is stopped.
+ verify(driverEndpoint, never()).askWithRetry(isA(classOf[RemoveExecutor]))(any[ClassTag[_]])
+ }
+
+ test("mesos supports spark.executor.uri") {
+ val url = "spark.spark.spark.com"
+ setBackend(Map(
+ "spark.executor.uri" -> url
+ ), false)
+
+ val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+ val offer1 = createOffer("o1", "s1", mem, cpu)
+ backend.resourceOffers(driver, List(offer1).asJava)
+
+ val launchedTasks = verifyTaskLaunched(driver, "o1")
+ assert(launchedTasks.head.getCommand.getUrisList.asScala(0).getValue == url)
+ }
+
+ test("mesos supports setting fetcher cache") {
+ val url = "spark.spark.spark.com"
+ setBackend(Map(
+ "spark.mesos.fetcherCache.enable" -> "true",
+ "spark.executor.uri" -> url
+ ), false)
+ val offers = List(Resources(backend.executorMemory(sc), 1))
+ offerResources(offers)
+ val launchedTasks = verifyTaskLaunched(driver, "o1")
+ val uris = launchedTasks.head.getCommand.getUrisList
+ assert(uris.size() == 1)
+ assert(uris.asScala.head.getCache)
+ }
+
+ test("mesos supports disabling fetcher cache") {
+ val url = "spark.spark.spark.com"
+ setBackend(Map(
+ "spark.mesos.fetcherCache.enable" -> "false",
+ "spark.executor.uri" -> url
+ ), false)
+ val offers = List(Resources(backend.executorMemory(sc), 1))
+ offerResources(offers)
+ val launchedTasks = verifyTaskLaunched(driver, "o1")
+ val uris = launchedTasks.head.getCommand.getUrisList
+ assert(uris.size() == 1)
+ assert(!uris.asScala.head.getCache)
+ }
+
+ test("mesos supports spark.mesos.network.name") {
+ setBackend(Map(
+ "spark.mesos.network.name" -> "test-network-name"
+ ))
+
+ val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+ val offer1 = createOffer("o1", "s1", mem, cpu)
+ backend.resourceOffers(driver, List(offer1).asJava)
+
+ val launchedTasks = verifyTaskLaunched(driver, "o1")
+ val networkInfos = launchedTasks.head.getContainer.getNetworkInfosList
+ assert(networkInfos.size == 1)
+ assert(networkInfos.get(0).getName == "test-network-name")
+ }
+
+ private case class Resources(mem: Int, cpus: Int, gpus: Int = 0)
+
+ private def verifyDeclinedOffer(driver: SchedulerDriver,
+ offerId: OfferID,
+ filter: Boolean = false): Unit = {
+ if (filter) {
+ verify(driver, times(1)).declineOffer(Matchers.eq(offerId), anyObject[Filters])
+ } else {
+ verify(driver, times(1)).declineOffer(Matchers.eq(offerId))
+ }
+ }
+
+ private def offerResources(offers: List[Resources], startId: Int = 1): Unit = {
+ val mesosOffers = offers.zipWithIndex.map {case (offer, i) =>
+ createOffer(s"o${i + startId}", s"s${i + startId}", offer.mem, offer.cpus, None, offer.gpus)}
+
+ backend.resourceOffers(driver, mesosOffers.asJava)
+ }
+
+ private def createTaskStatus(taskId: String, slaveId: String, state: TaskState): TaskStatus = {
+ TaskStatus.newBuilder()
+ .setTaskId(TaskID.newBuilder().setValue(taskId).build())
+ .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
+ .setState(state)
+ .build
+ }
+
+ private def createSchedulerBackend(
+ taskScheduler: TaskSchedulerImpl,
+ driver: SchedulerDriver,
+ shuffleClient: MesosExternalShuffleClient,
+ endpoint: RpcEndpointRef): MesosCoarseGrainedSchedulerBackend = {
+ val securityManager = mock[SecurityManager]
+
+ val backend = new MesosCoarseGrainedSchedulerBackend(
+ taskScheduler, sc, "master", securityManager) {
+ override protected def createSchedulerDriver(
+ masterUrl: String,
+ scheduler: Scheduler,
+ sparkUser: String,
+ appName: String,
+ conf: SparkConf,
+ webuiUrl: Option[String] = None,
+ checkpoint: Option[Boolean] = None,
+ failoverTimeout: Option[Double] = None,
+ frameworkId: Option[String] = None): SchedulerDriver = driver
+
+ override protected def getShuffleClient(): MesosExternalShuffleClient = shuffleClient
+
+ override protected def createDriverEndpointRef(
+ properties: ArrayBuffer[(String, String)]): RpcEndpointRef = endpoint
+
+ // override to avoid race condition with the driver thread on `mesosDriver`
+ override def startScheduler(newDriver: SchedulerDriver): Unit = {
+ mesosDriver = newDriver
+ }
+
+ override def stopExecutors(): Unit = {
+ stopCalled = true
+ }
+
+ markRegistered()
+ }
+ backend.start()
+ backend
+ }
+
+ private def setBackend(sparkConfVars: Map[String, String] = null,
+ setHome: Boolean = true) {
+ sparkConf = (new SparkConf)
+ .setMaster("local[*]")
+ .setAppName("test-mesos-dynamic-alloc")
+ .set("spark.mesos.driver.webui.url", "http://webui")
+
+ if (setHome) {
+ sparkConf.setSparkHome("/path")
+ }
+
+ if (sparkConfVars != null) {
+ sparkConf.setAll(sparkConfVars)
+ }
+
+ sc = new SparkContext(sparkConf)
+
+ driver = mock[SchedulerDriver]
+ when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
+ taskScheduler = mock[TaskSchedulerImpl]
+ when(taskScheduler.sc).thenReturn(sc)
+ externalShuffleClient = mock[MesosExternalShuffleClient]
+ driverEndpoint = mock[RpcEndpointRef]
+ when(driverEndpoint.ask(any())(any())).thenReturn(Promise().future)
+
+ backend = createSchedulerBackend(taskScheduler, driver, externalShuffleClient, driverEndpoint)
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
new file mode 100644
index 0000000..1d7a86f
--- /dev/null
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.nio.ByteBuffer
+import java.util.Arrays
+import java.util.Collection
+import java.util.Collections
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.mesos.{Protos, Scheduler, SchedulerDriver}
+import org.apache.mesos.Protos._
+import org.apache.mesos.Protos.Value.Scalar
+import org.mockito.{ArgumentCaptor, Matchers}
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.executor.MesosExecutorBackend
+import org.apache.spark.scheduler.{LiveListenerBus, SparkListenerExecutorAdded,
+ TaskDescription, TaskSchedulerImpl, WorkerOffer}
+import org.apache.spark.scheduler.cluster.ExecutorInfo
+
+class MesosFineGrainedSchedulerBackendSuite
+ extends SparkFunSuite with LocalSparkContext with MockitoSugar {
+
+ test("weburi is set in created scheduler driver") {
+ val conf = new SparkConf
+ conf.set("spark.mesos.driver.webui.url", "http://webui")
+ conf.set("spark.app.name", "name1")
+
+ val sc = mock[SparkContext]
+ when(sc.conf).thenReturn(conf)
+ when(sc.sparkUser).thenReturn("sparkUser1")
+ when(sc.appName).thenReturn("appName1")
+
+ val taskScheduler = mock[TaskSchedulerImpl]
+ val driver = mock[SchedulerDriver]
+ when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
+
+ val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master") {
+ override protected def createSchedulerDriver(
+ masterUrl: String,
+ scheduler: Scheduler,
+ sparkUser: String,
+ appName: String,
+ conf: SparkConf,
+ webuiUrl: Option[String] = None,
+ checkpoint: Option[Boolean] = None,
+ failoverTimeout: Option[Double] = None,
+ frameworkId: Option[String] = None): SchedulerDriver = {
+ markRegistered()
+ assert(webuiUrl.isDefined)
+ assert(webuiUrl.get.equals("http://webui"))
+ driver
+ }
+ }
+
+ backend.start()
+ }
+
+ test("Use configured mesosExecutor.cores for ExecutorInfo") {
+ val mesosExecutorCores = 3
+ val conf = new SparkConf
+ conf.set("spark.mesos.mesosExecutor.cores", mesosExecutorCores.toString)
+
+ val listenerBus = mock[LiveListenerBus]
+ listenerBus.post(
+ SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
+
+ val sc = mock[SparkContext]
+ when(sc.getSparkHome()).thenReturn(Option("/spark-home"))
+
+ when(sc.conf).thenReturn(conf)
+ when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
+ when(sc.executorMemory).thenReturn(100)
+ when(sc.listenerBus).thenReturn(listenerBus)
+ val taskScheduler = mock[TaskSchedulerImpl]
+ when(taskScheduler.CPUS_PER_TASK).thenReturn(2)
+
+ val mesosSchedulerBackend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master")
+
+ val resources = Arrays.asList(
+ mesosSchedulerBackend.createResource("cpus", 4),
+ mesosSchedulerBackend.createResource("mem", 1024))
+ // uri is null.
+ val (executorInfo, _) = mesosSchedulerBackend.createExecutorInfo(resources, "test-id")
+ val executorResources = executorInfo.getResourcesList
+ val cpus = executorResources.asScala.find(_.getName.equals("cpus")).get.getScalar.getValue
+
+ assert(cpus === mesosExecutorCores)
+ }
+
+ test("check spark-class location correctly") {
+ val conf = new SparkConf
+ conf.set("spark.mesos.executor.home", "/mesos-home")
+
+ val listenerBus = mock[LiveListenerBus]
+ listenerBus.post(
+ SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
+
+ val sc = mock[SparkContext]
+ when(sc.getSparkHome()).thenReturn(Option("/spark-home"))
+
+ when(sc.conf).thenReturn(conf)
+ when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
+ when(sc.executorMemory).thenReturn(100)
+ when(sc.listenerBus).thenReturn(listenerBus)
+ val taskScheduler = mock[TaskSchedulerImpl]
+ when(taskScheduler.CPUS_PER_TASK).thenReturn(2)
+
+ val mesosSchedulerBackend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master")
+
+ val resources = Arrays.asList(
+ mesosSchedulerBackend.createResource("cpus", 4),
+ mesosSchedulerBackend.createResource("mem", 1024))
+ // uri is null.
+ val (executorInfo, _) = mesosSchedulerBackend.createExecutorInfo(resources, "test-id")
+ assert(executorInfo.getCommand.getValue ===
+ s" /mesos-home/bin/spark-class ${classOf[MesosExecutorBackend].getName}")
+
+ // uri exists.
+ conf.set("spark.executor.uri", "hdfs:///test-app-1.0.0.tgz")
+ val (executorInfo1, _) = mesosSchedulerBackend.createExecutorInfo(resources, "test-id")
+ assert(executorInfo1.getCommand.getValue ===
+ s"cd test-app-1*; ./bin/spark-class ${classOf[MesosExecutorBackend].getName}")
+ }
+
+ test("spark docker properties correctly populate the DockerInfo message") {
+ val taskScheduler = mock[TaskSchedulerImpl]
+
+ val conf = new SparkConf()
+ .set("spark.mesos.executor.docker.image", "spark/mock")
+ .set("spark.mesos.executor.docker.forcePullImage", "true")
+ .set("spark.mesos.executor.docker.volumes", "/a,/b:/b,/c:/c:rw,/d:ro,/e:/e:ro")
+ .set("spark.mesos.executor.docker.portmaps", "80:8080,53:53:tcp")
+
+ val listenerBus = mock[LiveListenerBus]
+ listenerBus.post(
+ SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
+
+ val sc = mock[SparkContext]
+ when(sc.executorMemory).thenReturn(100)
+ when(sc.getSparkHome()).thenReturn(Option("/spark-home"))
+ when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
+ when(sc.conf).thenReturn(conf)
+ when(sc.listenerBus).thenReturn(listenerBus)
+
+ val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master")
+
+ val (execInfo, _) = backend.createExecutorInfo(
+ Arrays.asList(backend.createResource("cpus", 4)), "mockExecutor")
+ assert(execInfo.getContainer.getDocker.getImage.equals("spark/mock"))
+ assert(execInfo.getContainer.getDocker.getForcePullImage.equals(true))
+ val portmaps = execInfo.getContainer.getDocker.getPortMappingsList
+ assert(portmaps.get(0).getHostPort.equals(80))
+ assert(portmaps.get(0).getContainerPort.equals(8080))
+ assert(portmaps.get(0).getProtocol.equals("tcp"))
+ assert(portmaps.get(1).getHostPort.equals(53))
+ assert(portmaps.get(1).getContainerPort.equals(53))
+ assert(portmaps.get(1).getProtocol.equals("tcp"))
+ val volumes = execInfo.getContainer.getVolumesList
+ assert(volumes.get(0).getContainerPath.equals("/a"))
+ assert(volumes.get(0).getMode.equals(Volume.Mode.RW))
+ assert(volumes.get(1).getContainerPath.equals("/b"))
+ assert(volumes.get(1).getHostPath.equals("/b"))
+ assert(volumes.get(1).getMode.equals(Volume.Mode.RW))
+ assert(volumes.get(2).getContainerPath.equals("/c"))
+ assert(volumes.get(2).getHostPath.equals("/c"))
+ assert(volumes.get(2).getMode.equals(Volume.Mode.RW))
+ assert(volumes.get(3).getContainerPath.equals("/d"))
+ assert(volumes.get(3).getMode.equals(Volume.Mode.RO))
+ assert(volumes.get(4).getContainerPath.equals("/e"))
+ assert(volumes.get(4).getHostPath.equals("/e"))
+ assert(volumes.get(4).getMode.equals(Volume.Mode.RO))
+ }
+
+ test("mesos resource offers result in launching tasks") {
+ def createOffer(id: Int, mem: Int, cpu: Int): Offer = {
+ val builder = Offer.newBuilder()
+ builder.addResourcesBuilder()
+ .setName("mem")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Scalar.newBuilder().setValue(mem))
+ builder.addResourcesBuilder()
+ .setName("cpus")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Scalar.newBuilder().setValue(cpu))
+ builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build())
+ .setFrameworkId(FrameworkID.newBuilder().setValue("f1"))
+ .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}"))
+ .setHostname(s"host${id.toString}").build()
+ }
+
+ val driver = mock[SchedulerDriver]
+ val taskScheduler = mock[TaskSchedulerImpl]
+
+ val listenerBus = mock[LiveListenerBus]
+ listenerBus.post(
+ SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
+
+ val sc = mock[SparkContext]
+ when(sc.executorMemory).thenReturn(100)
+ when(sc.getSparkHome()).thenReturn(Option("/path"))
+ when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
+ when(sc.conf).thenReturn(new SparkConf)
+ when(sc.listenerBus).thenReturn(listenerBus)
+
+ val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master")
+
+ val minMem = backend.executorMemory(sc)
+ val minCpu = 4
+
+ val mesosOffers = new java.util.ArrayList[Offer]
+ mesosOffers.add(createOffer(1, minMem, minCpu))
+ mesosOffers.add(createOffer(2, minMem - 1, minCpu))
+ mesosOffers.add(createOffer(3, minMem, minCpu))
+
+ val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](2)
+ expectedWorkerOffers += new WorkerOffer(
+ mesosOffers.get(0).getSlaveId.getValue,
+ mesosOffers.get(0).getHostname,
+ (minCpu - backend.mesosExecutorCores).toInt
+ )
+ expectedWorkerOffers += new WorkerOffer(
+ mesosOffers.get(2).getSlaveId.getValue,
+ mesosOffers.get(2).getHostname,
+ (minCpu - backend.mesosExecutorCores).toInt
+ )
+ val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0)))
+ when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc)))
+ when(taskScheduler.CPUS_PER_TASK).thenReturn(2)
+
+ val capture = ArgumentCaptor.forClass(classOf[Collection[TaskInfo]])
+ when(
+ driver.launchTasks(
+ Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
+ capture.capture(),
+ any(classOf[Filters])
+ )
+ ).thenReturn(Status.valueOf(1))
+ when(driver.declineOffer(mesosOffers.get(1).getId)).thenReturn(Status.valueOf(1))
+ when(driver.declineOffer(mesosOffers.get(2).getId)).thenReturn(Status.valueOf(1))
+
+ backend.resourceOffers(driver, mesosOffers)
+
+ verify(driver, times(1)).launchTasks(
+ Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
+ capture.capture(),
+ any(classOf[Filters])
+ )
+ verify(driver, times(1)).declineOffer(mesosOffers.get(1).getId)
+ verify(driver, times(1)).declineOffer(mesosOffers.get(2).getId)
+ assert(capture.getValue.size() === 1)
+ val taskInfo = capture.getValue.iterator().next()
+ assert(taskInfo.getName.equals("n1"))
+ val cpus = taskInfo.getResourcesList.get(0)
+ assert(cpus.getName.equals("cpus"))
+ assert(cpus.getScalar.getValue.equals(2.0))
+ assert(taskInfo.getSlaveId.getValue.equals("s1"))
+
+ // Unwanted resources offered on an existing node. Make sure they are declined
+ val mesosOffers2 = new java.util.ArrayList[Offer]
+ mesosOffers2.add(createOffer(1, minMem, minCpu))
+ reset(taskScheduler)
+ reset(driver)
+ when(taskScheduler.resourceOffers(any(classOf[IndexedSeq[WorkerOffer]]))).thenReturn(Seq(Seq()))
+ when(taskScheduler.CPUS_PER_TASK).thenReturn(2)
+ when(driver.declineOffer(mesosOffers2.get(0).getId)).thenReturn(Status.valueOf(1))
+
+ backend.resourceOffers(driver, mesosOffers2)
+ verify(driver, times(1)).declineOffer(mesosOffers2.get(0).getId)
+ }
+
+ test("can handle multiple roles") {
+ val driver = mock[SchedulerDriver]
+ val taskScheduler = mock[TaskSchedulerImpl]
+
+ val listenerBus = mock[LiveListenerBus]
+ listenerBus.post(
+ SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
+
+ val sc = mock[SparkContext]
+ when(sc.executorMemory).thenReturn(100)
+ when(sc.getSparkHome()).thenReturn(Option("/path"))
+ when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
+ when(sc.conf).thenReturn(new SparkConf)
+ when(sc.listenerBus).thenReturn(listenerBus)
+
+ val id = 1
+ val builder = Offer.newBuilder()
+ builder.addResourcesBuilder()
+ .setName("mem")
+ .setType(Value.Type.SCALAR)
+ .setRole("prod")
+ .setScalar(Scalar.newBuilder().setValue(500))
+ builder.addResourcesBuilder()
+ .setName("cpus")
+ .setRole("prod")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Scalar.newBuilder().setValue(1))
+ builder.addResourcesBuilder()
+ .setName("mem")
+ .setRole("dev")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Scalar.newBuilder().setValue(600))
+ builder.addResourcesBuilder()
+ .setName("cpus")
+ .setRole("dev")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Scalar.newBuilder().setValue(2))
+ val offer = builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build())
+ .setFrameworkId(FrameworkID.newBuilder().setValue("f1"))
+ .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}"))
+ .setHostname(s"host${id.toString}").build()
+
+ val mesosOffers = new java.util.ArrayList[Offer]
+ mesosOffers.add(offer)
+
+ val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master")
+
+ val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](1)
+ expectedWorkerOffers += new WorkerOffer(
+ mesosOffers.get(0).getSlaveId.getValue,
+ mesosOffers.get(0).getHostname,
+ 2 // Deducting 1 for executor
+ )
+
+ val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0)))
+ when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc)))
+ when(taskScheduler.CPUS_PER_TASK).thenReturn(1)
+
+ val capture = ArgumentCaptor.forClass(classOf[Collection[TaskInfo]])
+ when(
+ driver.launchTasks(
+ Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
+ capture.capture(),
+ any(classOf[Filters])
+ )
+ ).thenReturn(Status.valueOf(1))
+
+ backend.resourceOffers(driver, mesosOffers)
+
+ verify(driver, times(1)).launchTasks(
+ Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
+ capture.capture(),
+ any(classOf[Filters])
+ )
+
+ assert(capture.getValue.size() === 1)
+ val taskInfo = capture.getValue.iterator().next()
+ assert(taskInfo.getName.equals("n1"))
+ assert(taskInfo.getResourcesCount === 1)
+ val cpusDev = taskInfo.getResourcesList.get(0)
+ assert(cpusDev.getName.equals("cpus"))
+ assert(cpusDev.getScalar.getValue.equals(1.0))
+ assert(cpusDev.getRole.equals("dev"))
+ val executorResources = taskInfo.getExecutor.getResourcesList.asScala
+ assert(executorResources.exists { r =>
+ r.getName.equals("mem") && r.getScalar.getValue.equals(484.0) && r.getRole.equals("prod")
+ })
+ assert(executorResources.exists { r =>
+ r.getName.equals("cpus") && r.getScalar.getValue.equals(1.0) && r.getRole.equals("prod")
+ })
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org