You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2016/08/26 19:25:37 UTC

[2/7] spark git commit: [SPARK-16967] move mesos to module

http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
----------------------------------------------------------------------
diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
new file mode 100644
index 0000000..e19d445
--- /dev/null
+++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -0,0 +1,514 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.util.{List => JList}
+import java.util.concurrent.CountDownLatch
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+import scala.util.control.NonFatal
+
+import com.google.common.base.Splitter
+import org.apache.mesos.{MesosSchedulerDriver, Protos, Scheduler, SchedulerDriver}
+import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
+import org.apache.mesos.protobuf.{ByteString, GeneratedMessage}
+
+import org.apache.spark.{SparkConf, SparkContext, SparkException}
+import org.apache.spark.TaskState
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+
+
+/**
+ * Shared trait for implementing a Mesos Scheduler. This holds common state and helper
+ * methods and Mesos scheduler will use.
+ */
+trait MesosSchedulerUtils extends Logging {
+  // Lock used to wait for scheduler to be registered
+  private final val registerLatch = new CountDownLatch(1)
+
+  // Driver for talking to Mesos
+  protected var mesosDriver: SchedulerDriver = null
+
+  /**
+   * Creates a new MesosSchedulerDriver that communicates to the Mesos master.
+   *
+   * @param masterUrl The url to connect to Mesos master
+   * @param scheduler the scheduler class to receive scheduler callbacks
+   * @param sparkUser User to impersonate with when running tasks
+   * @param appName The framework name to display on the Mesos UI
+   * @param conf Spark configuration
+   * @param webuiUrl The WebUI url to link from Mesos UI
+   * @param checkpoint Option to checkpoint tasks for failover
+   * @param failoverTimeout Duration Mesos master expect scheduler to reconnect on disconnect
+   * @param frameworkId The id of the new framework
+   */
+  protected def createSchedulerDriver(
+      masterUrl: String,
+      scheduler: Scheduler,
+      sparkUser: String,
+      appName: String,
+      conf: SparkConf,
+      webuiUrl: Option[String] = None,
+      checkpoint: Option[Boolean] = None,
+      failoverTimeout: Option[Double] = None,
+      frameworkId: Option[String] = None): SchedulerDriver = {
+    val fwInfoBuilder = FrameworkInfo.newBuilder().setUser(sparkUser).setName(appName)
+    val credBuilder = Credential.newBuilder()
+    webuiUrl.foreach { url => fwInfoBuilder.setWebuiUrl(url) }
+    checkpoint.foreach { checkpoint => fwInfoBuilder.setCheckpoint(checkpoint) }
+    failoverTimeout.foreach { timeout => fwInfoBuilder.setFailoverTimeout(timeout) }
+    frameworkId.foreach { id =>
+      fwInfoBuilder.setId(FrameworkID.newBuilder().setValue(id).build())
+    }
+    conf.getOption("spark.mesos.principal").foreach { principal =>
+      fwInfoBuilder.setPrincipal(principal)
+      credBuilder.setPrincipal(principal)
+    }
+    conf.getOption("spark.mesos.secret").foreach { secret =>
+      credBuilder.setSecret(secret)
+    }
+    if (credBuilder.hasSecret && !fwInfoBuilder.hasPrincipal) {
+      throw new SparkException(
+        "spark.mesos.principal must be configured when spark.mesos.secret is set")
+    }
+    conf.getOption("spark.mesos.role").foreach { role =>
+      fwInfoBuilder.setRole(role)
+    }
+    if (credBuilder.hasPrincipal) {
+      new MesosSchedulerDriver(
+        scheduler, fwInfoBuilder.build(), masterUrl, credBuilder.build())
+    } else {
+      new MesosSchedulerDriver(scheduler, fwInfoBuilder.build(), masterUrl)
+    }
+  }
+
+  /**
+   * Starts the MesosSchedulerDriver and stores the current running driver to this new instance.
+   * This driver is expected to not be running.
+   * This method returns only after the scheduler has registered with Mesos.
+   */
+  def startScheduler(newDriver: SchedulerDriver): Unit = {
+    synchronized {
+      if (mesosDriver != null) {
+        registerLatch.await()
+        return
+      }
+      @volatile
+      var error: Option[Exception] = None
+
+      // We create a new thread that will block inside `mesosDriver.run`
+      // until the scheduler exists
+      new Thread(Utils.getFormattedClassName(this) + "-mesos-driver") {
+        setDaemon(true)
+        override def run() {
+          try {
+            mesosDriver = newDriver
+            val ret = mesosDriver.run()
+            logInfo("driver.run() returned with code " + ret)
+            if (ret != null && ret.equals(Status.DRIVER_ABORTED)) {
+              error = Some(new SparkException("Error starting driver, DRIVER_ABORTED"))
+              markErr()
+            }
+          } catch {
+            case e: Exception =>
+              logError("driver.run() failed", e)
+              error = Some(e)
+              markErr()
+          }
+        }
+      }.start()
+
+      registerLatch.await()
+
+      // propagate any error to the calling thread. This ensures that SparkContext creation fails
+      // without leaving a broken context that won't be able to schedule any tasks
+      error.foreach(throw _)
+    }
+  }
+
+  def getResource(res: JList[Resource], name: String): Double = {
+    // A resource can have multiple values in the offer since it can either be from
+    // a specific role or wildcard.
+    res.asScala.filter(_.getName == name).map(_.getScalar.getValue).sum
+  }
+
+  /**
+   * Transforms a range resource to a list of ranges
+   *
+   * @param res the mesos resource list
+   * @param name the name of the resource
+   * @return the list of ranges returned
+   */
+  protected def getRangeResource(res: JList[Resource], name: String): List[(Long, Long)] = {
+    // A resource can have multiple values in the offer since it can either be from
+    // a specific role or wildcard.
+    res.asScala.filter(_.getName == name).flatMap(_.getRanges.getRangeList.asScala
+      .map(r => (r.getBegin, r.getEnd)).toList).toList
+  }
+
+  /**
+   * Signal that the scheduler has registered with Mesos.
+   */
+  protected def markRegistered(): Unit = {
+    registerLatch.countDown()
+  }
+
+  protected def markErr(): Unit = {
+    registerLatch.countDown()
+  }
+
+  def createResource(name: String, amount: Double, role: Option[String] = None): Resource = {
+    val builder = Resource.newBuilder()
+      .setName(name)
+      .setType(Value.Type.SCALAR)
+      .setScalar(Value.Scalar.newBuilder().setValue(amount).build())
+
+    role.foreach { r => builder.setRole(r) }
+
+    builder.build()
+  }
+
+  /**
+   * Partition the existing set of resources into two groups, those remaining to be
+   * scheduled and those requested to be used for a new task.
+   *
+   * @param resources The full list of available resources
+   * @param resourceName The name of the resource to take from the available resources
+   * @param amountToUse The amount of resources to take from the available resources
+   * @return The remaining resources list and the used resources list.
+   */
+  def partitionResources(
+      resources: JList[Resource],
+      resourceName: String,
+      amountToUse: Double): (List[Resource], List[Resource]) = {
+    var remain = amountToUse
+    var requestedResources = new ArrayBuffer[Resource]
+    val remainingResources = resources.asScala.map {
+      case r =>
+        if (remain > 0 &&
+          r.getType == Value.Type.SCALAR &&
+          r.getScalar.getValue > 0.0 &&
+          r.getName == resourceName) {
+          val usage = Math.min(remain, r.getScalar.getValue)
+          requestedResources += createResource(resourceName, usage, Some(r.getRole))
+          remain -= usage
+          createResource(resourceName, r.getScalar.getValue - usage, Some(r.getRole))
+        } else {
+          r
+        }
+    }
+
+    // Filter any resource that has depleted.
+    val filteredResources =
+      remainingResources.filter(r => r.getType != Value.Type.SCALAR || r.getScalar.getValue > 0.0)
+
+    (filteredResources.toList, requestedResources.toList)
+  }
+
+  /** Helper method to get the key,value-set pair for a Mesos Attribute protobuf */
+  protected def getAttribute(attr: Attribute): (String, Set[String]) = {
+    (attr.getName, attr.getText.getValue.split(',').toSet)
+  }
+
+
+  /** Build a Mesos resource protobuf object */
+  protected def createResource(resourceName: String, quantity: Double): Protos.Resource = {
+    Resource.newBuilder()
+      .setName(resourceName)
+      .setType(Value.Type.SCALAR)
+      .setScalar(Value.Scalar.newBuilder().setValue(quantity).build())
+      .build()
+  }
+
+  /**
+   * Converts the attributes from the resource offer into a Map of name -> Attribute Value
+   * The attribute values are the mesos attribute types and they are
+   *
+   * @param offerAttributes the attributes offered
+   * @return
+   */
+  protected def toAttributeMap(offerAttributes: JList[Attribute]): Map[String, GeneratedMessage] = {
+    offerAttributes.asScala.map { attr =>
+      val attrValue = attr.getType match {
+        case Value.Type.SCALAR => attr.getScalar
+        case Value.Type.RANGES => attr.getRanges
+        case Value.Type.SET => attr.getSet
+        case Value.Type.TEXT => attr.getText
+      }
+      (attr.getName, attrValue)
+    }.toMap
+  }
+
+
+  /**
+   * Match the requirements (if any) to the offer attributes.
+   * if attribute requirements are not specified - return true
+   * else if attribute is defined and no values are given, simple attribute presence is performed
+   * else if attribute name and value is specified, subset match is performed on slave attributes
+   */
+  def matchesAttributeRequirements(
+      slaveOfferConstraints: Map[String, Set[String]],
+      offerAttributes: Map[String, GeneratedMessage]): Boolean = {
+    slaveOfferConstraints.forall {
+      // offer has the required attribute and subsumes the required values for that attribute
+      case (name, requiredValues) =>
+        offerAttributes.get(name) match {
+          case None => false
+          case Some(_) if requiredValues.isEmpty => true // empty value matches presence
+          case Some(scalarValue: Value.Scalar) =>
+            // check if provided values is less than equal to the offered values
+            requiredValues.map(_.toDouble).exists(_ <= scalarValue.getValue)
+          case Some(rangeValue: Value.Range) =>
+            val offerRange = rangeValue.getBegin to rangeValue.getEnd
+            // Check if there is some required value that is between the ranges specified
+            // Note: We only support the ability to specify discrete values, in the future
+            // we may expand it to subsume ranges specified with a XX..YY value or something
+            // similar to that.
+            requiredValues.map(_.toLong).exists(offerRange.contains(_))
+          case Some(offeredValue: Value.Set) =>
+            // check if the specified required values is a subset of offered set
+            requiredValues.subsetOf(offeredValue.getItemList.asScala.toSet)
+          case Some(textValue: Value.Text) =>
+            // check if the specified value is equal, if multiple values are specified
+            // we succeed if any of them match.
+            requiredValues.contains(textValue.getValue)
+        }
+    }
+  }
+
+  /**
+   * Parses the attributes constraints provided to spark and build a matching data struct:
+   *  Map[<attribute-name>, Set[values-to-match]]
+   *  The constraints are specified as ';' separated key-value pairs where keys and values
+   *  are separated by ':'. The ':' implies equality (for singular values) and "is one of" for
+   *  multiple values (comma separated). For example:
+   *  {{{
+   *  parseConstraintString("os:centos7;zone:us-east-1a,us-east-1b")
+   *  // would result in
+   *  <code>
+   *  Map(
+   *    "os" -> Set("centos7"),
+   *    "zone":   -> Set("us-east-1a", "us-east-1b")
+   *  )
+   *  }}}
+   *
+   *  Mesos documentation: http://mesos.apache.org/documentation/attributes-resources/
+   *                       https://github.com/apache/mesos/blob/master/src/common/values.cpp
+   *                       https://github.com/apache/mesos/blob/master/src/common/attributes.cpp
+   *
+   * @param constraintsVal constaints string consisting of ';' separated key-value pairs (separated
+   *                       by ':')
+   * @return  Map of constraints to match resources offers.
+   */
+  def parseConstraintString(constraintsVal: String): Map[String, Set[String]] = {
+    /*
+      Based on mesos docs:
+      attributes : attribute ( ";" attribute )*
+      attribute : labelString ":" ( labelString | "," )+
+      labelString : [a-zA-Z0-9_/.-]
+    */
+    val splitter = Splitter.on(';').trimResults().withKeyValueSeparator(':')
+    // kv splitter
+    if (constraintsVal.isEmpty) {
+      Map()
+    } else {
+      try {
+        splitter.split(constraintsVal).asScala.toMap.mapValues(v =>
+          if (v == null || v.isEmpty) {
+            Set[String]()
+          } else {
+            v.split(',').toSet
+          }
+        )
+      } catch {
+        case NonFatal(e) =>
+          throw new IllegalArgumentException(s"Bad constraint string: $constraintsVal", e)
+      }
+    }
+  }
+
+  // These defaults copied from YARN
+  private val MEMORY_OVERHEAD_FRACTION = 0.10
+  private val MEMORY_OVERHEAD_MINIMUM = 384
+
+  /**
+   * Return the amount of memory to allocate to each executor, taking into account
+   * container overheads.
+   *
+   * @param sc SparkContext to use to get `spark.mesos.executor.memoryOverhead` value
+   * @return memory requirement as (0.1 * <memoryOverhead>) or MEMORY_OVERHEAD_MINIMUM
+   *         (whichever is larger)
+   */
+  def executorMemory(sc: SparkContext): Int = {
+    sc.conf.getInt("spark.mesos.executor.memoryOverhead",
+      math.max(MEMORY_OVERHEAD_FRACTION * sc.executorMemory, MEMORY_OVERHEAD_MINIMUM).toInt) +
+      sc.executorMemory
+  }
+
+  def setupUris(uris: String, builder: CommandInfo.Builder): Unit = {
+    uris.split(",").foreach { uri =>
+      builder.addUris(CommandInfo.URI.newBuilder().setValue(uri.trim()))
+    }
+  }
+
+  protected def getRejectOfferDurationForUnmetConstraints(sc: SparkContext): Long = {
+    sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s")
+  }
+
+  protected def getRejectOfferDurationForReachedMaxCores(sc: SparkContext): Long = {
+    sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", "120s")
+  }
+
+  /**
+   * Checks executor ports if they are within some range of the offered list of ports ranges,
+   *
+   * @param conf the Spark Config
+   * @param ports the list of ports to check
+   * @return true if ports are within range false otherwise
+   */
+  protected def checkPorts(conf: SparkConf, ports: List[(Long, Long)]): Boolean = {
+
+    def checkIfInRange(port: Long, ps: List[(Long, Long)]): Boolean = {
+      ps.exists{case (rangeStart, rangeEnd) => rangeStart <= port & rangeEnd >= port }
+    }
+
+    val portsToCheck = nonZeroPortValuesFromConfig(conf)
+    val withinRange = portsToCheck.forall(p => checkIfInRange(p, ports))
+    // make sure we have enough ports to allocate per offer
+    val enoughPorts =
+    ports.map{case (rangeStart, rangeEnd) => rangeEnd - rangeStart + 1}.sum >= portsToCheck.size
+    enoughPorts && withinRange
+  }
+
+  /**
+   * Partitions port resources.
+   *
+   * @param requestedPorts non-zero ports to assign
+   * @param offeredResources the resources offered
+   * @return resources left, port resources to be used.
+   */
+  def partitionPortResources(requestedPorts: List[Long], offeredResources: List[Resource])
+    : (List[Resource], List[Resource]) = {
+    if (requestedPorts.isEmpty) {
+      (offeredResources, List[Resource]())
+    } else {
+      // partition port offers
+      val (resourcesWithoutPorts, portResources) = filterPortResources(offeredResources)
+
+      val portsAndRoles = requestedPorts.
+        map(x => (x, findPortAndGetAssignedRangeRole(x, portResources)))
+
+      val assignedPortResources = createResourcesFromPorts(portsAndRoles)
+
+      // ignore non-assigned port resources, they will be declined implicitly by mesos
+      // no need for splitting port resources.
+      (resourcesWithoutPorts, assignedPortResources)
+    }
+  }
+
+  val managedPortNames = List("spark.executor.port", "spark.blockManager.port")
+
+  /**
+   * The values of the non-zero ports to be used by the executor process.
+   * @param conf the spark config to use
+   * @return the ono-zero values of the ports
+   */
+  def nonZeroPortValuesFromConfig(conf: SparkConf): List[Long] = {
+    managedPortNames.map(conf.getLong(_, 0)).filter( _ != 0)
+  }
+
+  /** Creates a mesos resource for a specific port number. */
+  private def createResourcesFromPorts(portsAndRoles: List[(Long, String)]) : List[Resource] = {
+    portsAndRoles.flatMap{ case (port, role) =>
+      createMesosPortResource(List((port, port)), Some(role))}
+  }
+
+  /** Helper to create mesos resources for specific port ranges. */
+  private def createMesosPortResource(
+      ranges: List[(Long, Long)],
+      role: Option[String] = None): List[Resource] = {
+    ranges.map { case (rangeStart, rangeEnd) =>
+      val rangeValue = Value.Range.newBuilder()
+        .setBegin(rangeStart)
+        .setEnd(rangeEnd)
+      val builder = Resource.newBuilder()
+        .setName("ports")
+        .setType(Value.Type.RANGES)
+        .setRanges(Value.Ranges.newBuilder().addRange(rangeValue))
+      role.foreach(r => builder.setRole(r))
+      builder.build()
+    }
+  }
+
+ /**
+  * Helper to assign a port to an offered range and get the latter's role
+  * info to use it later on.
+  */
+  private def findPortAndGetAssignedRangeRole(port: Long, portResources: List[Resource])
+    : String = {
+
+    val ranges = portResources.
+      map(resource =>
+        (resource.getRole, resource.getRanges.getRangeList.asScala
+          .map(r => (r.getBegin, r.getEnd)).toList))
+
+    val rangePortRole = ranges
+      .find { case (role, rangeList) => rangeList
+        .exists{ case (rangeStart, rangeEnd) => rangeStart <= port & rangeEnd >= port}}
+    // this is safe since we have previously checked about the ranges (see checkPorts method)
+    rangePortRole.map{ case (role, rangeList) => role}.get
+  }
+
+  /** Retrieves the port resources from a list of mesos offered resources */
+  private def filterPortResources(resources: List[Resource]): (List[Resource], List[Resource]) = {
+    resources.partition { r => !(r.getType == Value.Type.RANGES && r.getName == "ports") }
+  }
+
+  /**
+   * spark.mesos.driver.frameworkId is set by the cluster dispatcher to correlate driver
+   * submissions with frameworkIDs.  However, this causes issues when a driver process launches
+   * more than one framework (more than one SparkContext(, because they all try to register with
+   * the same frameworkID.  To enforce that only the first driver registers with the configured
+   * framework ID, the driver calls this method after the first registration.
+   */
+  def unsetFrameworkID(sc: SparkContext) {
+    sc.conf.remove("spark.mesos.driver.frameworkId")
+    System.clearProperty("spark.mesos.driver.frameworkId")
+  }
+
+  def mesosToTaskState(state: MesosTaskState): TaskState.TaskState = state match {
+    case MesosTaskState.TASK_STAGING | MesosTaskState.TASK_STARTING => TaskState.LAUNCHING
+    case MesosTaskState.TASK_RUNNING | MesosTaskState.TASK_KILLING => TaskState.RUNNING
+    case MesosTaskState.TASK_FINISHED => TaskState.FINISHED
+    case MesosTaskState.TASK_FAILED => TaskState.FAILED
+    case MesosTaskState.TASK_KILLED => TaskState.KILLED
+    case MesosTaskState.TASK_LOST | MesosTaskState.TASK_ERROR => TaskState.LOST
+  }
+
+  def taskStateToMesos(state: TaskState.TaskState): MesosTaskState = state match {
+    case TaskState.LAUNCHING => MesosTaskState.TASK_STARTING
+    case TaskState.RUNNING => MesosTaskState.TASK_RUNNING
+    case TaskState.FINISHED => MesosTaskState.TASK_FINISHED
+    case TaskState.FAILED => MesosTaskState.TASK_FAILED
+    case TaskState.KILLED => MesosTaskState.TASK_KILLED
+    case TaskState.LOST => MesosTaskState.TASK_LOST
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala
----------------------------------------------------------------------
diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala
new file mode 100644
index 0000000..8370b61
--- /dev/null
+++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.nio.ByteBuffer
+
+import org.apache.mesos.protobuf.ByteString
+
+import org.apache.spark.internal.Logging
+
+/**
+ * Wrapper for serializing the data sent when launching Mesos tasks.
+ */
+private[spark] case class MesosTaskLaunchData(
+  serializedTask: ByteBuffer,
+  attemptNumber: Int) extends Logging {
+
+  def toByteString: ByteString = {
+    val dataBuffer = ByteBuffer.allocate(4 + serializedTask.limit)
+    dataBuffer.putInt(attemptNumber)
+    dataBuffer.put(serializedTask)
+    dataBuffer.rewind
+    logDebug(s"ByteBuffer size: [${dataBuffer.remaining}]")
+    ByteString.copyFrom(dataBuffer)
+  }
+}
+
+private[spark] object MesosTaskLaunchData extends Logging {
+  def fromByteString(byteString: ByteString): MesosTaskLaunchData = {
+    val byteBuffer = byteString.asReadOnlyByteBuffer()
+    logDebug(s"ByteBuffer size: [${byteBuffer.remaining}]")
+    val attemptNumber = byteBuffer.getInt // updates the position by 4 bytes
+    val serializedTask = byteBuffer.slice() // subsequence starting at the current position
+    MesosTaskLaunchData(serializedTask, attemptNumber)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManagerSuite.scala
----------------------------------------------------------------------
diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManagerSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManagerSuite.scala
new file mode 100644
index 0000000..6fce066
--- /dev/null
+++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManagerSuite.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
+
+class MesosClusterManagerSuite extends SparkFunSuite with LocalSparkContext {
+    def testURL(masterURL: String, expectedClass: Class[_], coarse: Boolean) {
+      val conf = new SparkConf().set("spark.mesos.coarse", coarse.toString)
+      sc = new SparkContext("local", "test", conf)
+      val clusterManager = new MesosClusterManager()
+
+      assert(clusterManager.canCreate(masterURL))
+      val taskScheduler = clusterManager.createTaskScheduler(sc, masterURL)
+      val sched = clusterManager.createSchedulerBackend(sc, masterURL, taskScheduler)
+      assert(sched.getClass === expectedClass)
+    }
+
+    test("mesos fine-grained") {
+      testURL("mesos://localhost:1234", classOf[MesosFineGrainedSchedulerBackend], coarse = false)
+    }
+
+    test("mesos coarse-grained") {
+      testURL("mesos://localhost:1234", classOf[MesosCoarseGrainedSchedulerBackend], coarse = true)
+    }
+
+    test("mesos with zookeeper") {
+      testURL("mesos://zk://localhost:1234,localhost:2345",
+          classOf[MesosFineGrainedSchedulerBackend],
+          coarse = false)
+    }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
new file mode 100644
index 0000000..87d9080
--- /dev/null
+++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.util.{Collection, Collections, Date}
+
+import scala.collection.JavaConverters._
+
+import org.apache.mesos.Protos._
+import org.apache.mesos.Protos.Value.{Scalar, Type}
+import org.apache.mesos.SchedulerDriver
+import org.mockito.{ArgumentCaptor, Matchers}
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.Command
+import org.apache.spark.deploy.mesos.MesosDriverDescription
+
+class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar {
+
+  private val command = new Command("mainClass", Seq("arg"), Map(), Seq(), Seq(), Seq())
+  private var driver: SchedulerDriver = _
+  private var scheduler: MesosClusterScheduler = _
+
+  private def setScheduler(sparkConfVars: Map[String, String] = null): Unit = {
+    val conf = new SparkConf()
+    conf.setMaster("mesos://localhost:5050")
+    conf.setAppName("spark mesos")
+
+    if (sparkConfVars != null) {
+      conf.setAll(sparkConfVars)
+    }
+
+    driver = mock[SchedulerDriver]
+    scheduler = new MesosClusterScheduler(
+      new BlackHoleMesosClusterPersistenceEngineFactory, conf) {
+      override def start(): Unit = { ready = true }
+    }
+    scheduler.start()
+  }
+
+  test("can queue drivers") {
+    setScheduler()
+
+    val response = scheduler.submitDriver(
+      new MesosDriverDescription("d1", "jar", 1000, 1, true,
+        command, Map[String, String](), "s1", new Date()))
+    assert(response.success)
+    val response2 =
+      scheduler.submitDriver(new MesosDriverDescription(
+        "d1", "jar", 1000, 1, true, command, Map[String, String](), "s2", new Date()))
+    assert(response2.success)
+    val state = scheduler.getSchedulerState()
+    val queuedDrivers = state.queuedDrivers.toList
+    assert(queuedDrivers(0).submissionId == response.submissionId)
+    assert(queuedDrivers(1).submissionId == response2.submissionId)
+  }
+
+  test("can kill queued drivers") {
+    setScheduler()
+
+    val response = scheduler.submitDriver(
+        new MesosDriverDescription("d1", "jar", 1000, 1, true,
+          command, Map[String, String](), "s1", new Date()))
+    assert(response.success)
+    val killResponse = scheduler.killDriver(response.submissionId)
+    assert(killResponse.success)
+    val state = scheduler.getSchedulerState()
+    assert(state.queuedDrivers.isEmpty)
+  }
+
+  test("can handle multiple roles") {
+    setScheduler()
+
+    val driver = mock[SchedulerDriver]
+    val response = scheduler.submitDriver(
+      new MesosDriverDescription("d1", "jar", 1200, 1.5, true,
+        command,
+        Map(("spark.mesos.executor.home", "test"), ("spark.app.name", "test")),
+        "s1",
+        new Date()))
+    assert(response.success)
+    val offer = Offer.newBuilder()
+      .addResources(
+        Resource.newBuilder().setRole("*")
+          .setScalar(Scalar.newBuilder().setValue(1).build()).setName("cpus").setType(Type.SCALAR))
+      .addResources(
+        Resource.newBuilder().setRole("*")
+          .setScalar(Scalar.newBuilder().setValue(1000).build())
+          .setName("mem")
+          .setType(Type.SCALAR))
+      .addResources(
+        Resource.newBuilder().setRole("role2")
+          .setScalar(Scalar.newBuilder().setValue(1).build()).setName("cpus").setType(Type.SCALAR))
+      .addResources(
+        Resource.newBuilder().setRole("role2")
+          .setScalar(Scalar.newBuilder().setValue(500).build()).setName("mem").setType(Type.SCALAR))
+      .setId(OfferID.newBuilder().setValue("o1").build())
+      .setFrameworkId(FrameworkID.newBuilder().setValue("f1").build())
+      .setSlaveId(SlaveID.newBuilder().setValue("s1").build())
+      .setHostname("host1")
+      .build()
+
+    val capture = ArgumentCaptor.forClass(classOf[Collection[TaskInfo]])
+
+    when(
+      driver.launchTasks(
+        Matchers.eq(Collections.singleton(offer.getId)),
+        capture.capture())
+    ).thenReturn(Status.valueOf(1))
+
+    scheduler.resourceOffers(driver, Collections.singletonList(offer))
+
+    val taskInfos = capture.getValue
+    assert(taskInfos.size() == 1)
+    val taskInfo = taskInfos.iterator().next()
+    val resources = taskInfo.getResourcesList
+    assert(scheduler.getResource(resources, "cpus") == 1.5)
+    assert(scheduler.getResource(resources, "mem") == 1200)
+    val resourcesSeq: Seq[Resource] = resources.asScala
+    val cpus = resourcesSeq.filter(_.getName.equals("cpus")).toList
+    assert(cpus.size == 2)
+    assert(cpus.exists(_.getRole().equals("role2")))
+    assert(cpus.exists(_.getRole().equals("*")))
+    val mem = resourcesSeq.filter(_.getName.equals("mem")).toList
+    assert(mem.size == 2)
+    assert(mem.exists(_.getRole().equals("role2")))
+    assert(mem.exists(_.getRole().equals("*")))
+
+    verify(driver, times(1)).launchTasks(
+      Matchers.eq(Collections.singleton(offer.getId)),
+      capture.capture()
+    )
+  }
+
+  test("escapes commandline args for the shell") {
+    setScheduler()
+
+    val conf = new SparkConf()
+    conf.setMaster("mesos://localhost:5050")
+    conf.setAppName("spark mesos")
+    val scheduler = new MesosClusterScheduler(
+      new BlackHoleMesosClusterPersistenceEngineFactory, conf) {
+      override def start(): Unit = { ready = true }
+    }
+    val escape = scheduler.shellEscape _
+    def wrapped(str: String): String = "\"" + str + "\""
+
+    // Wrapped in quotes
+    assert(escape("'should be left untouched'") === "'should be left untouched'")
+    assert(escape("\"should be left untouched\"") === "\"should be left untouched\"")
+
+    // Harmless
+    assert(escape("") === "")
+    assert(escape("harmless") === "harmless")
+    assert(escape("har-m.l3ss") === "har-m.l3ss")
+
+    // Special Chars escape
+    assert(escape("should escape this \" quote") === wrapped("should escape this \\\" quote"))
+    assert(escape("shouldescape\"quote") === wrapped("shouldescape\\\"quote"))
+    assert(escape("should escape this $ dollar") === wrapped("should escape this \\$ dollar"))
+    assert(escape("should escape this ` backtick") === wrapped("should escape this \\` backtick"))
+    assert(escape("""should escape this \ backslash""")
+      === wrapped("""should escape this \\ backslash"""))
+    assert(escape("""\"?""") === wrapped("""\\\"?"""))
+
+
+    // Special Chars no escape only wrap
+    List(" ", "'", "<", ">", "&", "|", "?", "*", ";", "!", "#", "(", ")").foreach(char => {
+      assert(escape(s"onlywrap${char}this") === wrapped(s"onlywrap${char}this"))
+    })
+  }
+
+  test("supports spark.mesos.driverEnv.*") {
+    setScheduler()
+
+    val mem = 1000
+    val cpu = 1
+
+    val response = scheduler.submitDriver(
+      new MesosDriverDescription("d1", "jar", mem, cpu, true,
+        command,
+        Map("spark.mesos.executor.home" -> "test",
+          "spark.app.name" -> "test",
+          "spark.mesos.driverEnv.TEST_ENV" -> "TEST_VAL"),
+        "s1",
+        new Date()))
+    assert(response.success)
+
+    val offer = Utils.createOffer("o1", "s1", mem, cpu)
+    scheduler.resourceOffers(driver, List(offer).asJava)
+    val tasks = Utils.verifyTaskLaunched(driver, "o1")
+    val env = tasks.head.getCommand.getEnvironment.getVariablesList.asScala.map(v =>
+      (v.getName, v.getValue)).toMap
+    assert(env.getOrElse("TEST_ENV", null) == "TEST_VAL")
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
new file mode 100644
index 0000000..c063797
--- /dev/null
+++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -0,0 +1,517 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import org.apache.mesos.{Protos, Scheduler, SchedulerDriver}
+import org.apache.mesos.Protos._
+import org.mockito.Matchers
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.scheduler.cluster.mesos.Utils._
+
+class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
+    with LocalSparkContext
+    with MockitoSugar
+    with BeforeAndAfter {
+
+  private var sparkConf: SparkConf = _
+  private var driver: SchedulerDriver = _
+  private var taskScheduler: TaskSchedulerImpl = _
+  private var backend: MesosCoarseGrainedSchedulerBackend = _
+  private var externalShuffleClient: MesosExternalShuffleClient = _
+  private var driverEndpoint: RpcEndpointRef = _
+  @volatile private var stopCalled = false
+
+  test("mesos supports killing and limiting executors") {
+    setBackend()
+    sparkConf.set("spark.driver.host", "driverHost")
+    sparkConf.set("spark.driver.port", "1234")
+
+    val minMem = backend.executorMemory(sc)
+    val minCpu = 4
+    val offers = List((minMem, minCpu))
+
+    // launches a task on a valid offer
+    offerResources(offers)
+    verifyTaskLaunched(driver, "o1")
+
+    // kills executors
+    backend.doRequestTotalExecutors(0)
+    assert(backend.doKillExecutors(Seq("0")))
+    val taskID0 = createTaskId("0")
+    verify(driver, times(1)).killTask(taskID0)
+
+    // doesn't launch a new task when requested executors == 0
+    offerResources(offers, 2)
+    verifyDeclinedOffer(driver, createOfferId("o2"))
+
+    // Launches a new task when requested executors is positive
+    backend.doRequestTotalExecutors(2)
+    offerResources(offers, 2)
+    verifyTaskLaunched(driver, "o2")
+  }
+
+  test("mesos supports killing and relaunching tasks with executors") {
+    setBackend()
+
+    // launches a task on a valid offer
+    val minMem = backend.executorMemory(sc) + 1024
+    val minCpu = 4
+    val offer1 = (minMem, minCpu)
+    val offer2 = (minMem, 1)
+    offerResources(List(offer1, offer2))
+    verifyTaskLaunched(driver, "o1")
+
+    // accounts for a killed task
+    val status = createTaskStatus("0", "s1", TaskState.TASK_KILLED)
+    backend.statusUpdate(driver, status)
+    verify(driver, times(1)).reviveOffers()
+
+    // Launches a new task on a valid offer from the same slave
+    offerResources(List(offer2))
+    verifyTaskLaunched(driver, "o2")
+  }
+
+  test("mesos supports spark.executor.cores") {
+    val executorCores = 4
+    setBackend(Map("spark.executor.cores" -> executorCores.toString))
+
+    val executorMemory = backend.executorMemory(sc)
+    val offers = List((executorMemory * 2, executorCores + 1))
+    offerResources(offers)
+
+    val taskInfos = verifyTaskLaunched(driver, "o1")
+    assert(taskInfos.length == 1)
+
+    val cpus = backend.getResource(taskInfos.head.getResourcesList, "cpus")
+    assert(cpus == executorCores)
+  }
+
+  test("mesos supports unset spark.executor.cores") {
+    setBackend()
+
+    val executorMemory = backend.executorMemory(sc)
+    val offerCores = 10
+    offerResources(List((executorMemory * 2, offerCores)))
+
+    val taskInfos = verifyTaskLaunched(driver, "o1")
+    assert(taskInfos.length == 1)
+
+    val cpus = backend.getResource(taskInfos.head.getResourcesList, "cpus")
+    assert(cpus == offerCores)
+  }
+
+  test("mesos does not acquire more than spark.cores.max") {
+    val maxCores = 10
+    setBackend(Map("spark.cores.max" -> maxCores.toString))
+
+    val executorMemory = backend.executorMemory(sc)
+    offerResources(List((executorMemory, maxCores + 1)))
+
+    val taskInfos = verifyTaskLaunched(driver, "o1")
+    assert(taskInfos.length == 1)
+
+    val cpus = backend.getResource(taskInfos.head.getResourcesList, "cpus")
+    assert(cpus == maxCores)
+  }
+
+  test("mesos declines offers that violate attribute constraints") {
+    setBackend(Map("spark.mesos.constraints" -> "x:true"))
+    offerResources(List((backend.executorMemory(sc), 4)))
+    verifyDeclinedOffer(driver, createOfferId("o1"), true)
+  }
+
+  test("mesos declines offers with a filter when reached spark.cores.max") {
+    val maxCores = 3
+    setBackend(Map("spark.cores.max" -> maxCores.toString))
+
+    val executorMemory = backend.executorMemory(sc)
+    offerResources(List(
+      (executorMemory, maxCores + 1),
+      (executorMemory, maxCores + 1)))
+
+    verifyTaskLaunched(driver, "o1")
+    verifyDeclinedOffer(driver, createOfferId("o2"), true)
+  }
+
+  test("mesos assigns tasks round-robin on offers") {
+    val executorCores = 4
+    val maxCores = executorCores * 2
+    setBackend(Map("spark.executor.cores" -> executorCores.toString,
+      "spark.cores.max" -> maxCores.toString))
+
+    val executorMemory = backend.executorMemory(sc)
+    offerResources(List(
+      (executorMemory * 2, executorCores * 2),
+      (executorMemory * 2, executorCores * 2)))
+
+    verifyTaskLaunched(driver, "o1")
+    verifyTaskLaunched(driver, "o2")
+  }
+
+  test("mesos creates multiple executors on a single slave") {
+    val executorCores = 4
+    setBackend(Map("spark.executor.cores" -> executorCores.toString))
+
+    // offer with room for two executors
+    val executorMemory = backend.executorMemory(sc)
+    offerResources(List((executorMemory * 2, executorCores * 2)))
+
+    // verify two executors were started on a single offer
+    val taskInfos = verifyTaskLaunched(driver, "o1")
+    assert(taskInfos.length == 2)
+  }
+
+  test("mesos doesn't register twice with the same shuffle service") {
+    setBackend(Map("spark.shuffle.service.enabled" -> "true"))
+    val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+    val offer1 = createOffer("o1", "s1", mem, cpu)
+    backend.resourceOffers(driver, List(offer1).asJava)
+    verifyTaskLaunched(driver, "o1")
+
+    val offer2 = createOffer("o2", "s1", mem, cpu)
+    backend.resourceOffers(driver, List(offer2).asJava)
+    verifyTaskLaunched(driver, "o2")
+
+    val status1 = createTaskStatus("0", "s1", TaskState.TASK_RUNNING)
+    backend.statusUpdate(driver, status1)
+
+    val status2 = createTaskStatus("1", "s1", TaskState.TASK_RUNNING)
+    backend.statusUpdate(driver, status2)
+    verify(externalShuffleClient, times(1))
+      .registerDriverWithShuffleService(anyString, anyInt, anyLong, anyLong)
+  }
+
+  test("Port offer decline when there is no appropriate range") {
+    setBackend(Map("spark.blockManager.port" -> "30100"))
+    val offeredPorts = (31100L, 31200L)
+    val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+    val offer1 = createOffer("o1", "s1", mem, cpu, Some(offeredPorts))
+    backend.resourceOffers(driver, List(offer1).asJava)
+    verify(driver, times(1)).declineOffer(offer1.getId)
+  }
+
+  test("Port offer accepted when ephemeral ports are used") {
+    setBackend()
+    val offeredPorts = (31100L, 31200L)
+    val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+    val offer1 = createOffer("o1", "s1", mem, cpu, Some(offeredPorts))
+    backend.resourceOffers(driver, List(offer1).asJava)
+    verifyTaskLaunched(driver, "o1")
+  }
+
+  test("Port offer accepted with user defined port numbers") {
+    val port = 30100
+    setBackend(Map("spark.blockManager.port" -> s"$port"))
+    val offeredPorts = (30000L, 31000L)
+    val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+    val offer1 = createOffer("o1", "s1", mem, cpu, Some(offeredPorts))
+    backend.resourceOffers(driver, List(offer1).asJava)
+    val taskInfo = verifyTaskLaunched(driver, "o1")
+
+    val taskPortResources = taskInfo.head.getResourcesList.asScala.
+    find(r => r.getType == Value.Type.RANGES && r.getName == "ports")
+
+    val isPortInOffer = (r: Resource) => {
+      r.getRanges().getRangeList
+        .asScala.exists(range => range.getBegin == port && range.getEnd == port)
+    }
+    assert(taskPortResources.exists(isPortInOffer))
+  }
+
+  test("mesos kills an executor when told") {
+    setBackend()
+
+    val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+    val offer1 = createOffer("o1", "s1", mem, cpu)
+    backend.resourceOffers(driver, List(offer1).asJava)
+    verifyTaskLaunched(driver, "o1")
+
+    backend.doKillExecutors(List("0"))
+    verify(driver, times(1)).killTask(createTaskId("0"))
+  }
+
+  test("weburi is set in created scheduler driver") {
+    setBackend()
+    val taskScheduler = mock[TaskSchedulerImpl]
+    when(taskScheduler.sc).thenReturn(sc)
+    val driver = mock[SchedulerDriver]
+    when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
+    val securityManager = mock[SecurityManager]
+
+    val backend = new MesosCoarseGrainedSchedulerBackend(
+        taskScheduler, sc, "master", securityManager) {
+      override protected def createSchedulerDriver(
+        masterUrl: String,
+        scheduler: Scheduler,
+        sparkUser: String,
+        appName: String,
+        conf: SparkConf,
+        webuiUrl: Option[String] = None,
+        checkpoint: Option[Boolean] = None,
+        failoverTimeout: Option[Double] = None,
+        frameworkId: Option[String] = None): SchedulerDriver = {
+        markRegistered()
+        assert(webuiUrl.isDefined)
+        assert(webuiUrl.get.equals("http://webui"))
+        driver
+      }
+    }
+
+    backend.start()
+  }
+
+  test("honors unset spark.mesos.containerizer") {
+    setBackend(Map("spark.mesos.executor.docker.image" -> "test"))
+
+    val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+    val offer1 = createOffer("o1", "s1", mem, cpu)
+    backend.resourceOffers(driver, List(offer1).asJava)
+
+    val taskInfos = verifyTaskLaunched(driver, "o1")
+    assert(taskInfos.head.getContainer.getType == ContainerInfo.Type.DOCKER)
+  }
+
+  test("honors spark.mesos.containerizer=\"mesos\"") {
+    setBackend(Map(
+      "spark.mesos.executor.docker.image" -> "test",
+      "spark.mesos.containerizer" -> "mesos"))
+
+    val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+    val offer1 = createOffer("o1", "s1", mem, cpu)
+    backend.resourceOffers(driver, List(offer1).asJava)
+
+    val taskInfos = verifyTaskLaunched(driver, "o1")
+    assert(taskInfos.head.getContainer.getType == ContainerInfo.Type.MESOS)
+  }
+
+  test("docker settings are reflected in created tasks") {
+    setBackend(Map(
+      "spark.mesos.executor.docker.image" -> "some_image",
+      "spark.mesos.executor.docker.forcePullImage" -> "true",
+      "spark.mesos.executor.docker.volumes" -> "/host_vol:/container_vol:ro",
+      "spark.mesos.executor.docker.portmaps" -> "8080:80:tcp"
+    ))
+
+    val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+    val offer1 = createOffer("o1", "s1", mem, cpu)
+    backend.resourceOffers(driver, List(offer1).asJava)
+
+    val launchedTasks = verifyTaskLaunched(driver, "o1")
+    assert(launchedTasks.size == 1)
+
+    val containerInfo = launchedTasks.head.getContainer
+    assert(containerInfo.getType == ContainerInfo.Type.DOCKER)
+
+    val volumes = containerInfo.getVolumesList.asScala
+    assert(volumes.size == 1)
+
+    val volume = volumes.head
+    assert(volume.getHostPath == "/host_vol")
+    assert(volume.getContainerPath == "/container_vol")
+    assert(volume.getMode == Volume.Mode.RO)
+
+    val dockerInfo = containerInfo.getDocker
+
+    assert(dockerInfo.getImage == "some_image")
+    assert(dockerInfo.getForcePullImage)
+
+    val portMappings = dockerInfo.getPortMappingsList.asScala
+    assert(portMappings.size == 1)
+
+    val portMapping = portMappings.head
+    assert(portMapping.getHostPort == 8080)
+    assert(portMapping.getContainerPort == 80)
+    assert(portMapping.getProtocol == "tcp")
+  }
+
+  test("force-pull-image option is disabled by default") {
+    setBackend(Map(
+      "spark.mesos.executor.docker.image" -> "some_image"
+    ))
+
+    val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+    val offer1 = createOffer("o1", "s1", mem, cpu)
+    backend.resourceOffers(driver, List(offer1).asJava)
+
+    val launchedTasks = verifyTaskLaunched(driver, "o1")
+    assert(launchedTasks.size == 1)
+
+    val containerInfo = launchedTasks.head.getContainer
+    assert(containerInfo.getType == ContainerInfo.Type.DOCKER)
+
+    val dockerInfo = containerInfo.getDocker
+
+    assert(dockerInfo.getImage == "some_image")
+    assert(!dockerInfo.getForcePullImage)
+  }
+
+  test("Do not call removeExecutor() after backend is stopped") {
+    setBackend()
+
+    // launches a task on a valid offer
+    val offers = List((backend.executorMemory(sc), 1))
+    offerResources(offers)
+    verifyTaskLaunched(driver, "o1")
+
+    // launches a thread simulating status update
+    val statusUpdateThread = new Thread {
+      override def run(): Unit = {
+        while (!stopCalled) {
+          Thread.sleep(100)
+        }
+
+        val status = createTaskStatus("0", "s1", TaskState.TASK_FINISHED)
+        backend.statusUpdate(driver, status)
+      }
+    }.start
+
+    backend.stop()
+    // Any method of the backend involving sending messages to the driver endpoint should not
+    // be called after the backend is stopped.
+    verify(driverEndpoint, never()).askWithRetry(isA(classOf[RemoveExecutor]))(any[ClassTag[_]])
+  }
+
+  test("mesos supports spark.executor.uri") {
+    val url = "spark.spark.spark.com"
+    setBackend(Map(
+      "spark.executor.uri" -> url
+    ), false)
+
+    val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+    val offer1 = createOffer("o1", "s1", mem, cpu)
+    backend.resourceOffers(driver, List(offer1).asJava)
+
+    val launchedTasks = verifyTaskLaunched(driver, "o1")
+    assert(launchedTasks.head.getCommand.getUrisList.asScala(0).getValue == url)
+  }
+
+  private def verifyDeclinedOffer(driver: SchedulerDriver,
+      offerId: OfferID,
+      filter: Boolean = false): Unit = {
+    if (filter) {
+      verify(driver, times(1)).declineOffer(Matchers.eq(offerId), anyObject[Filters])
+    } else {
+      verify(driver, times(1)).declineOffer(Matchers.eq(offerId))
+    }
+  }
+
+  private def offerResources(offers: List[(Int, Int)], startId: Int = 1): Unit = {
+    val mesosOffers = offers.zipWithIndex.map {case (offer, i) =>
+      createOffer(s"o${i + startId}", s"s${i + startId}", offer._1, offer._2)}
+
+    backend.resourceOffers(driver, mesosOffers.asJava)
+  }
+
+  private def createTaskStatus(taskId: String, slaveId: String, state: TaskState): TaskStatus = {
+    TaskStatus.newBuilder()
+      .setTaskId(TaskID.newBuilder().setValue(taskId).build())
+      .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
+      .setState(state)
+      .build
+  }
+
+  private def createSchedulerBackend(
+      taskScheduler: TaskSchedulerImpl,
+      driver: SchedulerDriver,
+      shuffleClient: MesosExternalShuffleClient,
+      endpoint: RpcEndpointRef): MesosCoarseGrainedSchedulerBackend = {
+    val securityManager = mock[SecurityManager]
+
+    val backend = new MesosCoarseGrainedSchedulerBackend(
+        taskScheduler, sc, "master", securityManager) {
+      override protected def createSchedulerDriver(
+          masterUrl: String,
+          scheduler: Scheduler,
+          sparkUser: String,
+          appName: String,
+          conf: SparkConf,
+          webuiUrl: Option[String] = None,
+          checkpoint: Option[Boolean] = None,
+          failoverTimeout: Option[Double] = None,
+          frameworkId: Option[String] = None): SchedulerDriver = driver
+
+      override protected def getShuffleClient(): MesosExternalShuffleClient = shuffleClient
+
+      override protected def createDriverEndpointRef(
+          properties: ArrayBuffer[(String, String)]): RpcEndpointRef = endpoint
+
+      // override to avoid race condition with the driver thread on `mesosDriver`
+      override def startScheduler(newDriver: SchedulerDriver): Unit = {
+        mesosDriver = newDriver
+      }
+
+      override def stopExecutors(): Unit = {
+        stopCalled = true
+      }
+
+      markRegistered()
+    }
+    backend.start()
+    backend
+  }
+
+  private def setBackend(sparkConfVars: Map[String, String] = null,
+      setHome: Boolean = true) {
+    sparkConf = (new SparkConf)
+      .setMaster("local[*]")
+      .setAppName("test-mesos-dynamic-alloc")
+      .set("spark.mesos.driver.webui.url", "http://webui")
+
+    if (setHome) {
+      sparkConf.setSparkHome("/path")
+    }
+
+    if (sparkConfVars != null) {
+      sparkConf.setAll(sparkConfVars)
+    }
+
+    sc = new SparkContext(sparkConf)
+
+    driver = mock[SchedulerDriver]
+    when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
+    taskScheduler = mock[TaskSchedulerImpl]
+    when(taskScheduler.sc).thenReturn(sc)
+    externalShuffleClient = mock[MesosExternalShuffleClient]
+    driverEndpoint = mock[RpcEndpointRef]
+
+    backend = createSchedulerBackend(taskScheduler, driver, externalShuffleClient, driverEndpoint)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
new file mode 100644
index 0000000..fcf39f6
--- /dev/null
+++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.nio.ByteBuffer
+import java.util.Arrays
+import java.util.Collection
+import java.util.Collections
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.mesos.{Protos, Scheduler, SchedulerDriver}
+import org.apache.mesos.Protos._
+import org.apache.mesos.Protos.Value.Scalar
+import org.mockito.{ArgumentCaptor, Matchers}
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.executor.MesosExecutorBackend
+import org.apache.spark.scheduler.{LiveListenerBus, SparkListenerExecutorAdded,
+  TaskDescription, TaskSchedulerImpl, WorkerOffer}
+import org.apache.spark.scheduler.cluster.ExecutorInfo
+
+class MesosFineGrainedSchedulerBackendSuite
+  extends SparkFunSuite with LocalSparkContext with MockitoSugar {
+
+  test("weburi is set in created scheduler driver") {
+    val conf = new SparkConf
+    conf.set("spark.mesos.driver.webui.url", "http://webui")
+    conf.set("spark.app.name", "name1")
+
+    val sc = mock[SparkContext]
+    when(sc.conf).thenReturn(conf)
+    when(sc.sparkUser).thenReturn("sparkUser1")
+    when(sc.appName).thenReturn("appName1")
+
+    val taskScheduler = mock[TaskSchedulerImpl]
+    val driver = mock[SchedulerDriver]
+    when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
+
+    val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master") {
+      override protected def createSchedulerDriver(
+        masterUrl: String,
+        scheduler: Scheduler,
+        sparkUser: String,
+        appName: String,
+        conf: SparkConf,
+        webuiUrl: Option[String] = None,
+        checkpoint: Option[Boolean] = None,
+        failoverTimeout: Option[Double] = None,
+        frameworkId: Option[String] = None): SchedulerDriver = {
+        markRegistered()
+        assert(webuiUrl.isDefined)
+        assert(webuiUrl.get.equals("http://webui"))
+        driver
+      }
+    }
+
+    backend.start()
+  }
+
+  test("Use configured mesosExecutor.cores for ExecutorInfo") {
+    val mesosExecutorCores = 3
+    val conf = new SparkConf
+    conf.set("spark.mesos.mesosExecutor.cores", mesosExecutorCores.toString)
+
+    val listenerBus = mock[LiveListenerBus]
+    listenerBus.post(
+      SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
+
+    val sc = mock[SparkContext]
+    when(sc.getSparkHome()).thenReturn(Option("/spark-home"))
+
+    when(sc.conf).thenReturn(conf)
+    when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
+    when(sc.executorMemory).thenReturn(100)
+    when(sc.listenerBus).thenReturn(listenerBus)
+    val taskScheduler = mock[TaskSchedulerImpl]
+    when(taskScheduler.CPUS_PER_TASK).thenReturn(2)
+
+    val mesosSchedulerBackend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master")
+
+    val resources = Arrays.asList(
+      mesosSchedulerBackend.createResource("cpus", 4),
+      mesosSchedulerBackend.createResource("mem", 1024))
+    // uri is null.
+    val (executorInfo, _) = mesosSchedulerBackend.createExecutorInfo(resources, "test-id")
+    val executorResources = executorInfo.getResourcesList
+    val cpus = executorResources.asScala.find(_.getName.equals("cpus")).get.getScalar.getValue
+
+    assert(cpus === mesosExecutorCores)
+  }
+
+  test("check spark-class location correctly") {
+    val conf = new SparkConf
+    conf.set("spark.mesos.executor.home", "/mesos-home")
+
+    val listenerBus = mock[LiveListenerBus]
+    listenerBus.post(
+      SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
+
+    val sc = mock[SparkContext]
+    when(sc.getSparkHome()).thenReturn(Option("/spark-home"))
+
+    when(sc.conf).thenReturn(conf)
+    when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
+    when(sc.executorMemory).thenReturn(100)
+    when(sc.listenerBus).thenReturn(listenerBus)
+    val taskScheduler = mock[TaskSchedulerImpl]
+    when(taskScheduler.CPUS_PER_TASK).thenReturn(2)
+
+    val mesosSchedulerBackend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master")
+
+    val resources = Arrays.asList(
+      mesosSchedulerBackend.createResource("cpus", 4),
+      mesosSchedulerBackend.createResource("mem", 1024))
+    // uri is null.
+    val (executorInfo, _) = mesosSchedulerBackend.createExecutorInfo(resources, "test-id")
+    assert(executorInfo.getCommand.getValue ===
+      s" /mesos-home/bin/spark-class ${classOf[MesosExecutorBackend].getName}")
+
+    // uri exists.
+    conf.set("spark.executor.uri", "hdfs:///test-app-1.0.0.tgz")
+    val (executorInfo1, _) = mesosSchedulerBackend.createExecutorInfo(resources, "test-id")
+    assert(executorInfo1.getCommand.getValue ===
+      s"cd test-app-1*;  ./bin/spark-class ${classOf[MesosExecutorBackend].getName}")
+  }
+
+  test("spark docker properties correctly populate the DockerInfo message") {
+    val taskScheduler = mock[TaskSchedulerImpl]
+
+    val conf = new SparkConf()
+      .set("spark.mesos.executor.docker.image", "spark/mock")
+      .set("spark.mesos.executor.docker.forcePullImage", "true")
+      .set("spark.mesos.executor.docker.volumes", "/a,/b:/b,/c:/c:rw,/d:ro,/e:/e:ro")
+      .set("spark.mesos.executor.docker.portmaps", "80:8080,53:53:tcp")
+
+    val listenerBus = mock[LiveListenerBus]
+    listenerBus.post(
+      SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
+
+    val sc = mock[SparkContext]
+    when(sc.executorMemory).thenReturn(100)
+    when(sc.getSparkHome()).thenReturn(Option("/spark-home"))
+    when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
+    when(sc.conf).thenReturn(conf)
+    when(sc.listenerBus).thenReturn(listenerBus)
+
+    val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master")
+
+    val (execInfo, _) = backend.createExecutorInfo(
+      Arrays.asList(backend.createResource("cpus", 4)), "mockExecutor")
+    assert(execInfo.getContainer.getDocker.getImage.equals("spark/mock"))
+    assert(execInfo.getContainer.getDocker.getForcePullImage.equals(true))
+    val portmaps = execInfo.getContainer.getDocker.getPortMappingsList
+    assert(portmaps.get(0).getHostPort.equals(80))
+    assert(portmaps.get(0).getContainerPort.equals(8080))
+    assert(portmaps.get(0).getProtocol.equals("tcp"))
+    assert(portmaps.get(1).getHostPort.equals(53))
+    assert(portmaps.get(1).getContainerPort.equals(53))
+    assert(portmaps.get(1).getProtocol.equals("tcp"))
+    val volumes = execInfo.getContainer.getVolumesList
+    assert(volumes.get(0).getContainerPath.equals("/a"))
+    assert(volumes.get(0).getMode.equals(Volume.Mode.RW))
+    assert(volumes.get(1).getContainerPath.equals("/b"))
+    assert(volumes.get(1).getHostPath.equals("/b"))
+    assert(volumes.get(1).getMode.equals(Volume.Mode.RW))
+    assert(volumes.get(2).getContainerPath.equals("/c"))
+    assert(volumes.get(2).getHostPath.equals("/c"))
+    assert(volumes.get(2).getMode.equals(Volume.Mode.RW))
+    assert(volumes.get(3).getContainerPath.equals("/d"))
+    assert(volumes.get(3).getMode.equals(Volume.Mode.RO))
+    assert(volumes.get(4).getContainerPath.equals("/e"))
+    assert(volumes.get(4).getHostPath.equals("/e"))
+    assert(volumes.get(4).getMode.equals(Volume.Mode.RO))
+  }
+
+  test("mesos resource offers result in launching tasks") {
+    def createOffer(id: Int, mem: Int, cpu: Int): Offer = {
+      val builder = Offer.newBuilder()
+      builder.addResourcesBuilder()
+        .setName("mem")
+        .setType(Value.Type.SCALAR)
+        .setScalar(Scalar.newBuilder().setValue(mem))
+      builder.addResourcesBuilder()
+        .setName("cpus")
+        .setType(Value.Type.SCALAR)
+        .setScalar(Scalar.newBuilder().setValue(cpu))
+      builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build())
+        .setFrameworkId(FrameworkID.newBuilder().setValue("f1"))
+        .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}"))
+        .setHostname(s"host${id.toString}").build()
+    }
+
+    val driver = mock[SchedulerDriver]
+    val taskScheduler = mock[TaskSchedulerImpl]
+
+    val listenerBus = mock[LiveListenerBus]
+    listenerBus.post(
+      SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
+
+    val sc = mock[SparkContext]
+    when(sc.executorMemory).thenReturn(100)
+    when(sc.getSparkHome()).thenReturn(Option("/path"))
+    when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
+    when(sc.conf).thenReturn(new SparkConf)
+    when(sc.listenerBus).thenReturn(listenerBus)
+
+    val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master")
+
+    val minMem = backend.executorMemory(sc)
+    val minCpu = 4
+
+    val mesosOffers = new java.util.ArrayList[Offer]
+    mesosOffers.add(createOffer(1, minMem, minCpu))
+    mesosOffers.add(createOffer(2, minMem - 1, minCpu))
+    mesosOffers.add(createOffer(3, minMem, minCpu))
+
+    val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](2)
+    expectedWorkerOffers.append(new WorkerOffer(
+      mesosOffers.get(0).getSlaveId.getValue,
+      mesosOffers.get(0).getHostname,
+      (minCpu - backend.mesosExecutorCores).toInt
+    ))
+    expectedWorkerOffers.append(new WorkerOffer(
+      mesosOffers.get(2).getSlaveId.getValue,
+      mesosOffers.get(2).getHostname,
+      (minCpu - backend.mesosExecutorCores).toInt
+    ))
+    val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0)))
+    when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc)))
+    when(taskScheduler.CPUS_PER_TASK).thenReturn(2)
+
+    val capture = ArgumentCaptor.forClass(classOf[Collection[TaskInfo]])
+    when(
+      driver.launchTasks(
+        Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
+        capture.capture(),
+        any(classOf[Filters])
+      )
+    ).thenReturn(Status.valueOf(1))
+    when(driver.declineOffer(mesosOffers.get(1).getId)).thenReturn(Status.valueOf(1))
+    when(driver.declineOffer(mesosOffers.get(2).getId)).thenReturn(Status.valueOf(1))
+
+    backend.resourceOffers(driver, mesosOffers)
+
+    verify(driver, times(1)).launchTasks(
+      Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
+      capture.capture(),
+      any(classOf[Filters])
+    )
+    verify(driver, times(1)).declineOffer(mesosOffers.get(1).getId)
+    verify(driver, times(1)).declineOffer(mesosOffers.get(2).getId)
+    assert(capture.getValue.size() === 1)
+    val taskInfo = capture.getValue.iterator().next()
+    assert(taskInfo.getName.equals("n1"))
+    val cpus = taskInfo.getResourcesList.get(0)
+    assert(cpus.getName.equals("cpus"))
+    assert(cpus.getScalar.getValue.equals(2.0))
+    assert(taskInfo.getSlaveId.getValue.equals("s1"))
+
+    // Unwanted resources offered on an existing node. Make sure they are declined
+    val mesosOffers2 = new java.util.ArrayList[Offer]
+    mesosOffers2.add(createOffer(1, minMem, minCpu))
+    reset(taskScheduler)
+    reset(driver)
+    when(taskScheduler.resourceOffers(any(classOf[Seq[WorkerOffer]]))).thenReturn(Seq(Seq()))
+    when(taskScheduler.CPUS_PER_TASK).thenReturn(2)
+    when(driver.declineOffer(mesosOffers2.get(0).getId)).thenReturn(Status.valueOf(1))
+
+    backend.resourceOffers(driver, mesosOffers2)
+    verify(driver, times(1)).declineOffer(mesosOffers2.get(0).getId)
+  }
+
+  test("can handle multiple roles") {
+    val driver = mock[SchedulerDriver]
+    val taskScheduler = mock[TaskSchedulerImpl]
+
+    val listenerBus = mock[LiveListenerBus]
+    listenerBus.post(
+      SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
+
+    val sc = mock[SparkContext]
+    when(sc.executorMemory).thenReturn(100)
+    when(sc.getSparkHome()).thenReturn(Option("/path"))
+    when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
+    when(sc.conf).thenReturn(new SparkConf)
+    when(sc.listenerBus).thenReturn(listenerBus)
+
+    val id = 1
+    val builder = Offer.newBuilder()
+    builder.addResourcesBuilder()
+      .setName("mem")
+      .setType(Value.Type.SCALAR)
+      .setRole("prod")
+      .setScalar(Scalar.newBuilder().setValue(500))
+    builder.addResourcesBuilder()
+      .setName("cpus")
+      .setRole("prod")
+      .setType(Value.Type.SCALAR)
+      .setScalar(Scalar.newBuilder().setValue(1))
+    builder.addResourcesBuilder()
+      .setName("mem")
+      .setRole("dev")
+      .setType(Value.Type.SCALAR)
+      .setScalar(Scalar.newBuilder().setValue(600))
+    builder.addResourcesBuilder()
+      .setName("cpus")
+      .setRole("dev")
+      .setType(Value.Type.SCALAR)
+      .setScalar(Scalar.newBuilder().setValue(2))
+    val offer = builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build())
+      .setFrameworkId(FrameworkID.newBuilder().setValue("f1"))
+      .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}"))
+      .setHostname(s"host${id.toString}").build()
+
+    val mesosOffers = new java.util.ArrayList[Offer]
+    mesosOffers.add(offer)
+
+    val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master")
+
+    val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](1)
+    expectedWorkerOffers.append(new WorkerOffer(
+      mesosOffers.get(0).getSlaveId.getValue,
+      mesosOffers.get(0).getHostname,
+      2 // Deducting 1 for executor
+    ))
+
+    val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0)))
+    when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc)))
+    when(taskScheduler.CPUS_PER_TASK).thenReturn(1)
+
+    val capture = ArgumentCaptor.forClass(classOf[Collection[TaskInfo]])
+    when(
+      driver.launchTasks(
+        Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
+        capture.capture(),
+        any(classOf[Filters])
+      )
+    ).thenReturn(Status.valueOf(1))
+
+    backend.resourceOffers(driver, mesosOffers)
+
+    verify(driver, times(1)).launchTasks(
+      Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
+      capture.capture(),
+      any(classOf[Filters])
+    )
+
+    assert(capture.getValue.size() === 1)
+    val taskInfo = capture.getValue.iterator().next()
+    assert(taskInfo.getName.equals("n1"))
+    assert(taskInfo.getResourcesCount === 1)
+    val cpusDev = taskInfo.getResourcesList.get(0)
+    assert(cpusDev.getName.equals("cpus"))
+    assert(cpusDev.getScalar.getValue.equals(1.0))
+    assert(cpusDev.getRole.equals("dev"))
+    val executorResources = taskInfo.getExecutor.getResourcesList.asScala
+    assert(executorResources.exists { r =>
+      r.getName.equals("mem") && r.getScalar.getValue.equals(484.0) && r.getRole.equals("prod")
+    })
+    assert(executorResources.exists { r =>
+      r.getName.equals("cpus") && r.getScalar.getValue.equals(1.0) && r.getRole.equals("prod")
+    })
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
new file mode 100644
index 0000000..e3d7949
--- /dev/null
+++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import scala.collection.JavaConverters._
+import scala.language.reflectiveCalls
+
+import org.apache.mesos.Protos.{Resource, Value}
+import org.mockito.Mockito._
+import org.scalatest._
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
+
+class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoSugar {
+
+  // scalastyle:off structural.type
+  // this is the documented way of generating fixtures in scalatest
+  def fixture: Object {val sc: SparkContext; val sparkConf: SparkConf} = new {
+    val sparkConf = new SparkConf
+    val sc = mock[SparkContext]
+    when(sc.conf).thenReturn(sparkConf)
+  }
+
+  private def createTestPortResource(range: (Long, Long), role: Option[String] = None): Resource = {
+    val rangeValue = Value.Range.newBuilder()
+    rangeValue.setBegin(range._1)
+    rangeValue.setEnd(range._2)
+    val builder = Resource.newBuilder()
+      .setName("ports")
+      .setType(Value.Type.RANGES)
+      .setRanges(Value.Ranges.newBuilder().addRange(rangeValue))
+
+    role.foreach { r => builder.setRole(r) }
+    builder.build()
+  }
+
+  private def rangesResourcesToTuple(resources: List[Resource]): List[(Long, Long)] = {
+    resources.flatMap{resource => resource.getRanges.getRangeList
+      .asScala.map(range => (range.getBegin, range.getEnd))}
+  }
+
+  def arePortsEqual(array1: Array[(Long, Long)], array2: Array[(Long, Long)])
+    : Boolean = {
+    array1.sortBy(identity).deep == array2.sortBy(identity).deep
+  }
+
+  def arePortsEqual(array1: Array[Long], array2: Array[Long])
+    : Boolean = {
+    array1.sortBy(identity).deep == array2.sortBy(identity).deep
+  }
+
+  def getRangesFromResources(resources: List[Resource]): List[(Long, Long)] = {
+    resources.flatMap{ resource =>
+      resource.getRanges.getRangeList.asScala.toList.map{
+        range => (range.getBegin, range.getEnd)}}
+  }
+
+  val utils = new MesosSchedulerUtils { }
+  // scalastyle:on structural.type
+
+  test("use at-least minimum overhead") {
+    val f = fixture
+    when(f.sc.executorMemory).thenReturn(512)
+    utils.executorMemory(f.sc) shouldBe 896
+  }
+
+  test("use overhead if it is greater than minimum value") {
+    val f = fixture
+    when(f.sc.executorMemory).thenReturn(4096)
+    utils.executorMemory(f.sc) shouldBe 4505
+  }
+
+  test("use spark.mesos.executor.memoryOverhead (if set)") {
+    val f = fixture
+    when(f.sc.executorMemory).thenReturn(1024)
+    f.sparkConf.set("spark.mesos.executor.memoryOverhead", "512")
+    utils.executorMemory(f.sc) shouldBe 1536
+  }
+
+  test("parse a non-empty constraint string correctly") {
+    val expectedMap = Map(
+      "os" -> Set("centos7"),
+      "zone" -> Set("us-east-1a", "us-east-1b")
+    )
+    utils.parseConstraintString("os:centos7;zone:us-east-1a,us-east-1b") should be (expectedMap)
+  }
+
+  test("parse an empty constraint string correctly") {
+    utils.parseConstraintString("") shouldBe Map()
+  }
+
+  test("throw an exception when the input is malformed") {
+    an[IllegalArgumentException] should be thrownBy
+      utils.parseConstraintString("os;zone:us-east")
+  }
+
+  test("empty values for attributes' constraints matches all values") {
+    val constraintsStr = "os:"
+    val parsedConstraints = utils.parseConstraintString(constraintsStr)
+
+    parsedConstraints shouldBe Map("os" -> Set())
+
+    val zoneSet = Value.Set.newBuilder().addItem("us-east-1a").addItem("us-east-1b").build()
+    val noOsOffer = Map("zone" -> zoneSet)
+    val centosOffer = Map("os" -> Value.Text.newBuilder().setValue("centos").build())
+    val ubuntuOffer = Map("os" -> Value.Text.newBuilder().setValue("ubuntu").build())
+
+    utils.matchesAttributeRequirements(parsedConstraints, noOsOffer) shouldBe false
+    utils.matchesAttributeRequirements(parsedConstraints, centosOffer) shouldBe true
+    utils.matchesAttributeRequirements(parsedConstraints, ubuntuOffer) shouldBe true
+  }
+
+  test("subset match is performed for set attributes") {
+    val supersetConstraint = Map(
+      "os" -> Value.Text.newBuilder().setValue("ubuntu").build(),
+      "zone" -> Value.Set.newBuilder()
+        .addItem("us-east-1a")
+        .addItem("us-east-1b")
+        .addItem("us-east-1c")
+        .build())
+
+    val zoneConstraintStr = "os:;zone:us-east-1a,us-east-1c"
+    val parsedConstraints = utils.parseConstraintString(zoneConstraintStr)
+
+    utils.matchesAttributeRequirements(parsedConstraints, supersetConstraint) shouldBe true
+  }
+
+  test("less than equal match is performed on scalar attributes") {
+    val offerAttribs = Map("gpus" -> Value.Scalar.newBuilder().setValue(3).build())
+
+    val ltConstraint = utils.parseConstraintString("gpus:2")
+    val eqConstraint = utils.parseConstraintString("gpus:3")
+    val gtConstraint = utils.parseConstraintString("gpus:4")
+
+    utils.matchesAttributeRequirements(ltConstraint, offerAttribs) shouldBe true
+    utils.matchesAttributeRequirements(eqConstraint, offerAttribs) shouldBe true
+    utils.matchesAttributeRequirements(gtConstraint, offerAttribs) shouldBe false
+  }
+
+  test("contains match is performed for range attributes") {
+    val offerAttribs = Map("ports" -> Value.Range.newBuilder().setBegin(7000).setEnd(8000).build())
+    val ltConstraint = utils.parseConstraintString("ports:6000")
+    val eqConstraint = utils.parseConstraintString("ports:7500")
+    val gtConstraint = utils.parseConstraintString("ports:8002")
+    val multiConstraint = utils.parseConstraintString("ports:5000,7500,8300")
+
+    utils.matchesAttributeRequirements(ltConstraint, offerAttribs) shouldBe false
+    utils.matchesAttributeRequirements(eqConstraint, offerAttribs) shouldBe true
+    utils.matchesAttributeRequirements(gtConstraint, offerAttribs) shouldBe false
+    utils.matchesAttributeRequirements(multiConstraint, offerAttribs) shouldBe true
+  }
+
+  test("equality match is performed for text attributes") {
+    val offerAttribs = Map("os" -> Value.Text.newBuilder().setValue("centos7").build())
+
+    val trueConstraint = utils.parseConstraintString("os:centos7")
+    val falseConstraint = utils.parseConstraintString("os:ubuntu")
+
+    utils.matchesAttributeRequirements(trueConstraint, offerAttribs) shouldBe true
+    utils.matchesAttributeRequirements(falseConstraint, offerAttribs) shouldBe false
+  }
+
+  test("Port reservation is done correctly with user specified ports only") {
+    val conf = new SparkConf()
+    conf.set("spark.executor.port", "3000" )
+    conf.set("spark.blockManager.port", "4000")
+    val portResource = createTestPortResource((3000, 5000), Some("my_role"))
+
+    val (resourcesLeft, resourcesToBeUsed) = utils
+      .partitionPortResources(List(3000, 4000), List(portResource))
+    resourcesToBeUsed.length shouldBe 2
+
+    val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}.toArray
+
+    portsToUse.length shouldBe 2
+    arePortsEqual(portsToUse, Array(3000L, 4000L)) shouldBe true
+
+    val portRangesToBeUsed = rangesResourcesToTuple(resourcesToBeUsed)
+
+    val expectedUSed = Array((3000L, 3000L), (4000L, 4000L))
+
+    arePortsEqual(portRangesToBeUsed.toArray, expectedUSed) shouldBe true
+  }
+
+  test("Port reservation is done correctly with some user specified ports (spark.executor.port)") {
+    val conf = new SparkConf()
+    conf.set("spark.executor.port", "3100" )
+    val portResource = createTestPortResource((3000, 5000), Some("my_role"))
+
+    val (resourcesLeft, resourcesToBeUsed) = utils
+      .partitionPortResources(List(3100), List(portResource))
+
+    val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
+
+    portsToUse.length shouldBe 1
+    portsToUse.contains(3100) shouldBe true
+  }
+
+  test("Port reservation is done correctly with all random ports") {
+    val conf = new SparkConf()
+    val portResource = createTestPortResource((3000L, 5000L), Some("my_role"))
+
+    val (resourcesLeft, resourcesToBeUsed) = utils
+      .partitionPortResources(List(), List(portResource))
+    val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
+
+    portsToUse.isEmpty shouldBe true
+  }
+
+  test("Port reservation is done correctly with user specified ports only - multiple ranges") {
+    val conf = new SparkConf()
+    conf.set("spark.executor.port", "2100" )
+    conf.set("spark.blockManager.port", "4000")
+    val portResourceList = List(createTestPortResource((3000, 5000), Some("my_role")),
+      createTestPortResource((2000, 2500), Some("other_role")))
+    val (resourcesLeft, resourcesToBeUsed) = utils
+      .partitionPortResources(List(2100, 4000), portResourceList)
+    val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
+
+    portsToUse.length shouldBe 2
+    val portsRangesLeft = rangesResourcesToTuple(resourcesLeft)
+    val portRangesToBeUsed = rangesResourcesToTuple(resourcesToBeUsed)
+
+    val expectedUsed = Array((2100L, 2100L), (4000L, 4000L))
+
+    arePortsEqual(portsToUse.toArray, Array(2100L, 4000L)) shouldBe true
+    arePortsEqual(portRangesToBeUsed.toArray, expectedUsed) shouldBe true
+  }
+
+  test("Port reservation is done correctly with all random ports - multiple ranges") {
+    val conf = new SparkConf()
+    val portResourceList = List(createTestPortResource((3000, 5000), Some("my_role")),
+      createTestPortResource((2000, 2500), Some("other_role")))
+    val (resourcesLeft, resourcesToBeUsed) = utils
+      .partitionPortResources(List(), portResourceList)
+    val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
+    portsToUse.isEmpty shouldBe true
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala
----------------------------------------------------------------------
diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala
new file mode 100644
index 0000000..5a81bb3
--- /dev/null
+++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.nio.ByteBuffer
+
+import org.apache.spark.SparkFunSuite
+
+class MesosTaskLaunchDataSuite extends SparkFunSuite {
+  test("serialize and deserialize data must be same") {
+    val serializedTask = ByteBuffer.allocate(40)
+    (Range(100, 110).map(serializedTask.putInt(_)))
+    serializedTask.rewind
+    val attemptNumber = 100
+    val byteString = MesosTaskLaunchData(serializedTask, attemptNumber).toByteString
+    serializedTask.rewind
+    val mesosTaskLaunchData = MesosTaskLaunchData.fromByteString(byteString)
+    assert(mesosTaskLaunchData.attemptNumber == attemptNumber)
+    assert(mesosTaskLaunchData.serializedTask.equals(serializedTask))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
----------------------------------------------------------------------
diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
new file mode 100644
index 0000000..fa9406f
--- /dev/null
+++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.util.Collections
+
+import scala.collection.JavaConverters._
+
+import org.apache.mesos.Protos._
+import org.apache.mesos.Protos.Value.{Range => MesosRange, Ranges, Scalar}
+import org.apache.mesos.SchedulerDriver
+import org.mockito.{ArgumentCaptor, Matchers}
+import org.mockito.Mockito._
+
+object Utils {
+  def createOffer(
+      offerId: String,
+      slaveId: String,
+      mem: Int,
+      cpu: Int,
+      ports: Option[(Long, Long)] = None): Offer = {
+    val builder = Offer.newBuilder()
+    builder.addResourcesBuilder()
+      .setName("mem")
+      .setType(Value.Type.SCALAR)
+      .setScalar(Scalar.newBuilder().setValue(mem))
+    builder.addResourcesBuilder()
+      .setName("cpus")
+      .setType(Value.Type.SCALAR)
+      .setScalar(Scalar.newBuilder().setValue(cpu))
+    ports.foreach { resourcePorts =>
+      builder.addResourcesBuilder()
+        .setName("ports")
+        .setType(Value.Type.RANGES)
+        .setRanges(Ranges.newBuilder().addRange(MesosRange.newBuilder()
+          .setBegin(resourcePorts._1).setEnd(resourcePorts._2).build()))
+    }
+    builder.setId(createOfferId(offerId))
+      .setFrameworkId(FrameworkID.newBuilder()
+        .setValue("f1"))
+      .setSlaveId(SlaveID.newBuilder().setValue(slaveId))
+      .setHostname(s"host${slaveId}")
+      .build()
+  }
+
+  def verifyTaskLaunched(driver: SchedulerDriver, offerId: String): List[TaskInfo] = {
+    val captor = ArgumentCaptor.forClass(classOf[java.util.Collection[TaskInfo]])
+    verify(driver, times(1)).launchTasks(
+      Matchers.eq(Collections.singleton(createOfferId(offerId))),
+      captor.capture())
+    captor.getValue.asScala.toList
+  }
+
+  def createOfferId(offerId: String): OfferID = {
+    OfferID.newBuilder().setValue(offerId).build()
+  }
+
+  def createSlaveId(slaveId: String): SlaveID = {
+    SlaveID.newBuilder().setValue(slaveId).build()
+  }
+
+  def createExecutorId(executorId: String): ExecutorID = {
+    ExecutorID.newBuilder().setValue(executorId).build()
+  }
+
+  def createTaskId(taskId: String): TaskID = {
+    TaskID.newBuilder().setValue(taskId).build()
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9896582..74238db 100644
--- a/pom.xml
+++ b/pom.xml
@@ -119,8 +119,6 @@
     <java.version>1.7</java.version>
     <maven.version>3.3.9</maven.version>
     <sbt.project.name>spark</sbt.project.name>
-    <mesos.version>1.0.0</mesos.version>
-    <mesos.classifier>shaded-protobuf</mesos.classifier>
     <slf4j.version>1.7.16</slf4j.version>
     <log4j.version>1.2.17</log4j.version>
     <hadoop.version>2.2.0</hadoop.version>
@@ -528,18 +526,6 @@
         <scope>${hadoop.deps.scope}</scope>
       </dependency>
       <dependency>
-        <groupId>org.apache.mesos</groupId>
-        <artifactId>mesos</artifactId>
-        <version>${mesos.version}</version>
-        <classifier>${mesos.classifier}</classifier>
-        <exclusions>
-          <exclusion>
-            <groupId>com.google.protobuf</groupId>
-            <artifactId>protobuf-java</artifactId>
-          </exclusion>
-        </exclusions>
-      </dependency>
-      <dependency>
         <groupId>org.roaringbitmap</groupId>
         <artifactId>RoaringBitmap</artifactId>
         <version>0.5.11</version>
@@ -2528,6 +2514,13 @@
     </profile>
 
     <profile>
+      <id>mesos</id>
+      <modules>
+        <module>mesos</module>
+      </modules>
+    </profile>
+
+    <profile>
       <id>hive-thriftserver</id>
       <modules>
         <module>sql/hive-thriftserver</module>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org