You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/03 20:25:02 UTC

[11/21] git commit: Merge branch 'reorgscripts' into scripts-reorg

Merge branch 'reorgscripts' into scripts-reorg


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/84849baf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/84849baf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/84849baf

Branch: refs/heads/master
Commit: 84849baf88d31cfaaeee158a947c4db1abe94ce6
Parents: 714fdab 3a5aa92
Author: shane-huang <sh...@intel.com>
Authored: Fri Sep 27 09:28:33 2013 +0800
Committer: shane-huang <sh...@intel.com>
Committed: Fri Sep 27 09:28:33 2013 +0800

----------------------------------------------------------------------
 assembly/src/main/assembly/assembly.xml         |   11 +-
 bin/compute-classpath.cmd                       |   69 --
 bin/compute-classpath.sh                        |   61 --
 bin/pyspark                                     |   66 ++
 bin/pyspark.cmd                                 |   23 +
 bin/pyspark2.cmd                                |   55 +
 bin/run-example                                 |   81 ++
 bin/run-example.cmd                             |   23 +
 bin/run-example2.cmd                            |   61 ++
 bin/slaves.sh                                   |   74 --
 bin/spark                                       |   92 ++
 bin/spark-config.sh                             |   36 -
 bin/spark-daemon.sh                             |  164 ---
 bin/spark-daemons.sh                            |   35 -
 bin/spark-shell                                 |   87 ++
 bin/spark-shell.cmd                             |   23 +
 bin/start-all.sh                                |   34 -
 bin/start-master.sh                             |   52 -
 bin/start-slave.sh                              |   35 -
 bin/start-slaves.sh                             |   48 -
 bin/stop-all.sh                                 |   32 -
 bin/stop-master.sh                              |   27 -
 bin/stop-slaves.sh                              |   37 -
 .../spark/deploy/worker/ExecutorRunner.scala    |    2 +-
 .../mesos/CoarseMesosSchedulerBackend.scala     |    4 +-
 .../cluster/mesos/MesosSchedulerBackend.scala   |    4 +-
 .../apache/spark/ui/UIWorkloadGenerator.scala   |    2 +-
 .../scala/org/apache/spark/DriverSuite.scala    |    2 +-
 data/kmeans_data.txt                            |    6 +
 data/lr_data.txt                                | 1000 ++++++++++++++++++
 data/pagerank_data.txt                          |    6 +
 docs/running-on-yarn.md                         |    4 +-
 docs/spark-standalone.md                        |   14 +-
 kmeans_data.txt                                 |    6 -
 lr_data.txt                                     | 1000 ------------------
 make-distribution.sh                            |    5 +-
 pagerank_data.txt                               |    6 -
 pyspark                                         |   66 --
 pyspark.cmd                                     |   23 -
 pyspark2.cmd                                    |   55 -
 python/pyspark/java_gateway.py                  |    2 +-
 python/run-tests                                |    2 +-
 run-example                                     |   81 --
 run-example.cmd                                 |   23 -
 run-example2.cmd                                |   61 --
 sbin/compute-classpath.cmd                      |   69 ++
 sbin/compute-classpath.sh                       |   61 ++
 sbin/slaves.sh                                  |   74 ++
 sbin/spark-class                                |  117 ++
 sbin/spark-class.cmd                            |   23 +
 sbin/spark-class2.cmd                           |   78 ++
 sbin/spark-config.sh                            |   36 +
 sbin/spark-daemon.sh                            |  164 +++
 sbin/spark-daemons.sh                           |   35 +
 sbin/spark-executor                             |   23 +
 sbin/start-all.sh                               |   34 +
 sbin/start-master.sh                            |   52 +
 sbin/start-slave.sh                             |   35 +
 sbin/start-slaves.sh                            |   48 +
 sbin/stop-all.sh                                |   32 +
 sbin/stop-master.sh                             |   27 +
 sbin/stop-slaves.sh                             |   37 +
 spark-class                                     |  117 --
 spark-class.cmd                                 |   23 -
 spark-class2.cmd                                |   78 --
 spark-executor                                  |   22 -
 spark-shell                                     |   87 --
 spark-shell.cmd                                 |   22 -
 68 files changed, 2491 insertions(+), 2403 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/84849baf/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 8f2eef9,0000000..15b3397
mode 100644,000000..100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@@ -1,286 -1,0 +1,286 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.spark.scheduler.cluster.mesos
 +
 +import java.io.File
 +import java.util.{ArrayList => JArrayList, List => JList}
 +import java.util.Collections
 +
 +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
 +import scala.collection.JavaConversions._
 +
 +import com.google.protobuf.ByteString
 +import org.apache.mesos.{Scheduler => MScheduler}
 +import org.apache.mesos._
 +import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
 +
 +import org.apache.spark.{SparkException, Logging, SparkContext, TaskState}
 +import org.apache.spark.scheduler.cluster.{ClusterScheduler, StandaloneSchedulerBackend}
 +
 +/**
 + * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
 + * onto each Mesos node for the duration of the Spark job instead of relinquishing cores whenever
 + * a task is done. It launches Spark tasks within the coarse-grained Mesos tasks using the
 + * StandaloneBackend mechanism. This class is useful for lower and more predictable latency.
 + *
 + * Unfortunately this has a bit of duplication from MesosSchedulerBackend, but it seems hard to
 + * remove this.
 + */
 +private[spark] class CoarseMesosSchedulerBackend(
 +    scheduler: ClusterScheduler,
 +    sc: SparkContext,
 +    master: String,
 +    appName: String)
 +  extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem)
 +  with MScheduler
 +  with Logging {
 +
 +  val MAX_SLAVE_FAILURES = 2     // Blacklist a slave after this many failures
 +
 +  // Lock used to wait for scheduler to be registered
 +  var isRegistered = false
 +  val registeredLock = new Object()
 +
 +  // Driver for talking to Mesos
 +  var driver: SchedulerDriver = null
 +
 +  // Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
 +  val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt
 +
 +  // Cores we have acquired with each Mesos task ID
 +  val coresByTaskId = new HashMap[Int, Int]
 +  var totalCoresAcquired = 0
 +
 +  val slaveIdsWithExecutors = new HashSet[String]
 +
 +  val taskIdToSlaveId = new HashMap[Int, String]
 +  val failuresBySlaveId = new HashMap[String, Int] // How many times tasks on each slave failed
 +
 +  val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException(
 +    "Spark home is not set; set it through the spark.home system " +
 +    "property, the SPARK_HOME environment variable or the SparkContext constructor"))
 +
 +  val extraCoresPerSlave = System.getProperty("spark.mesos.extra.cores", "0").toInt
 +
 +  var nextMesosTaskId = 0
 +
 +  def newMesosTaskId(): Int = {
 +    val id = nextMesosTaskId
 +    nextMesosTaskId += 1
 +    id
 +  }
 +
 +  override def start() {
 +    super.start()
 +
 +    synchronized {
 +      new Thread("CoarseMesosSchedulerBackend driver") {
 +        setDaemon(true)
 +        override def run() {
 +          val scheduler = CoarseMesosSchedulerBackend.this
 +          val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(appName).build()
 +          driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
 +          try { {
 +            val ret = driver.run()
 +            logInfo("driver.run() returned with code " + ret)
 +          }
 +          } catch {
 +            case e: Exception => logError("driver.run() failed", e)
 +          }
 +        }
 +      }.start()
 +
 +      waitForRegister()
 +    }
 +  }
 +
 +  def createCommand(offer: Offer, numCores: Int): CommandInfo = {
 +    val environment = Environment.newBuilder()
 +    sc.executorEnvs.foreach { case (key, value) =>
 +      environment.addVariables(Environment.Variable.newBuilder()
 +        .setName(key)
 +        .setValue(value)
 +        .build())
 +    }
 +    val command = CommandInfo.newBuilder()
 +      .setEnvironment(environment)
 +    val driverUrl = "akka://spark@%s:%s/user/%s".format(
 +      System.getProperty("spark.driver.host"),
 +      System.getProperty("spark.driver.port"),
 +      StandaloneSchedulerBackend.ACTOR_NAME)
 +    val uri = System.getProperty("spark.executor.uri")
 +    if (uri == null) {
-       val runScript = new File(sparkHome, "spark-class").getCanonicalPath
++      val runScript = new File(sparkHome, "./sbin/spark-class").getCanonicalPath
 +      command.setValue(
 +        "\"%s\" org.apache.spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
 +          runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
 +    } else {
 +      // Grab everything to the first '.'. We'll use that and '*' to
 +      // glob the directory "correctly".
 +      val basename = uri.split('/').last.split('.').head
 +      command.setValue(
-         "cd %s*; ./spark-class org.apache.spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
++        "cd %s*; ./sbin/spark-class org.apache.spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
 +          basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
 +      command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
 +    }
 +    return command.build()
 +  }
 +
 +  override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
 +
 +  override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
 +    logInfo("Registered as framework ID " + frameworkId.getValue)
 +    registeredLock.synchronized {
 +      isRegistered = true
 +      registeredLock.notifyAll()
 +    }
 +  }
 +
 +  def waitForRegister() {
 +    registeredLock.synchronized {
 +      while (!isRegistered) {
 +        registeredLock.wait()
 +      }
 +    }
 +  }
 +
 +  override def disconnected(d: SchedulerDriver) {}
 +
 +  override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {}
 +
 +  /**
 +   * Method called by Mesos to offer resources on slaves. We respond by launching an executor,
 +   * unless we've already launched more than we wanted to.
 +   */
 +  override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
 +    synchronized {
 +      val filters = Filters.newBuilder().setRefuseSeconds(-1).build()
 +
 +      for (offer <- offers) {
 +        val slaveId = offer.getSlaveId.toString
 +        val mem = getResource(offer.getResourcesList, "mem")
 +        val cpus = getResource(offer.getResourcesList, "cpus").toInt
 +        if (totalCoresAcquired < maxCores && mem >= executorMemory && cpus >= 1 &&
 +            failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
 +            !slaveIdsWithExecutors.contains(slaveId)) {
 +          // Launch an executor on the slave
 +          val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
 +          val taskId = newMesosTaskId()
 +          taskIdToSlaveId(taskId) = slaveId
 +          slaveIdsWithExecutors += slaveId
 +          coresByTaskId(taskId) = cpusToUse
 +          val task = MesosTaskInfo.newBuilder()
 +            .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
 +            .setSlaveId(offer.getSlaveId)
 +            .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave))
 +            .setName("Task " + taskId)
 +            .addResources(createResource("cpus", cpusToUse))
 +            .addResources(createResource("mem", executorMemory))
 +            .build()
 +          d.launchTasks(offer.getId, Collections.singletonList(task), filters)
 +        } else {
 +          // Filter it out
 +          d.launchTasks(offer.getId, Collections.emptyList[MesosTaskInfo](), filters)
 +        }
 +      }
 +    }
 +  }
 +
 +  /** Helper function to pull out a resource from a Mesos Resources protobuf */
 +  private def getResource(res: JList[Resource], name: String): Double = {
 +    for (r <- res if r.getName == name) {
 +      return r.getScalar.getValue
 +    }
 +    // If we reached here, no resource with the required name was present
 +    throw new IllegalArgumentException("No resource called " + name + " in " + res)
 +  }
 +
 +  /** Build a Mesos resource protobuf object */
 +  private def createResource(resourceName: String, quantity: Double): Protos.Resource = {
 +    Resource.newBuilder()
 +      .setName(resourceName)
 +      .setType(Value.Type.SCALAR)
 +      .setScalar(Value.Scalar.newBuilder().setValue(quantity).build())
 +      .build()
 +  }
 +
 +  /** Check whether a Mesos task state represents a finished task */
 +  private def isFinished(state: MesosTaskState) = {
 +    state == MesosTaskState.TASK_FINISHED ||
 +      state == MesosTaskState.TASK_FAILED ||
 +      state == MesosTaskState.TASK_KILLED ||
 +      state == MesosTaskState.TASK_LOST
 +  }
 +
 +  override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
 +    val taskId = status.getTaskId.getValue.toInt
 +    val state = status.getState
 +    logInfo("Mesos task " + taskId + " is now " + state)
 +    synchronized {
 +      if (isFinished(state)) {
 +        val slaveId = taskIdToSlaveId(taskId)
 +        slaveIdsWithExecutors -= slaveId
 +        taskIdToSlaveId -= taskId
 +        // Remove the cores we have remembered for this task, if it's in the hashmap
 +        for (cores <- coresByTaskId.get(taskId)) {
 +          totalCoresAcquired -= cores
 +          coresByTaskId -= taskId
 +        }
 +        // If it was a failure, mark the slave as failed for blacklisting purposes
 +        if (state == MesosTaskState.TASK_FAILED || state == MesosTaskState.TASK_LOST) {
 +          failuresBySlaveId(slaveId) = failuresBySlaveId.getOrElse(slaveId, 0) + 1
 +          if (failuresBySlaveId(slaveId) >= MAX_SLAVE_FAILURES) {
 +            logInfo("Blacklisting Mesos slave " + slaveId + " due to too many failures; " +
 +                "is Spark installed on it?")
 +          }
 +        }
 +        driver.reviveOffers() // In case we'd rejected everything before but have now lost a node
 +      }
 +    }
 +  }
 +
 +  override def error(d: SchedulerDriver, message: String) {
 +    logError("Mesos error: " + message)
 +    scheduler.error(message)
 +  }
 +
 +  override def stop() {
 +    super.stop()
 +    if (driver != null) {
 +      driver.stop()
 +    }
 +  }
 +
 +  override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
 +
 +  override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
 +    logInfo("Mesos slave lost: " + slaveId.getValue)
 +    synchronized {
 +      if (slaveIdsWithExecutors.contains(slaveId.getValue)) {
 +        // Note that the slave ID corresponds to the executor ID on that slave
 +        slaveIdsWithExecutors -= slaveId.getValue
 +        removeExecutor(slaveId.getValue, "Mesos slave lost")
 +      }
 +    }
 +  }
 +
 +  override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int) {
 +    logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue))
 +    slaveLost(d, s)
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/84849baf/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 50cbc2c,0000000..7e9c05c
mode 100644,000000..100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@@ -1,345 -1,0 +1,345 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.spark.scheduler.cluster.mesos
 +
 +import java.io.File
 +import java.util.{ArrayList => JArrayList, List => JList}
 +import java.util.Collections
 +
 +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
 +import scala.collection.JavaConversions._
 +
 +import com.google.protobuf.ByteString
 +import org.apache.mesos.{Scheduler => MScheduler}
 +import org.apache.mesos._
 +import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
 +
 +import org.apache.spark.{Logging, SparkException, SparkContext, TaskState}
 +import org.apache.spark.scheduler.TaskDescription
 +import org.apache.spark.scheduler.cluster.{ClusterScheduler, ExecutorExited, ExecutorLossReason}
 +import org.apache.spark.scheduler.cluster.{SchedulerBackend, SlaveLost, WorkerOffer}
 +import org.apache.spark.util.Utils
 +
 +/**
 + * A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a
 + * separate Mesos task, allowing multiple applications to share cluster nodes both in space (tasks
 + * from multiple apps can run on different cores) and in time (a core can switch ownership).
 + */
 +private[spark] class MesosSchedulerBackend(
 +    scheduler: ClusterScheduler,
 +    sc: SparkContext,
 +    master: String,
 +    appName: String)
 +  extends SchedulerBackend
 +  with MScheduler
 +  with Logging {
 +
 +  // Lock used to wait for scheduler to be registered
 +  var isRegistered = false
 +  val registeredLock = new Object()
 +
 +  // Driver for talking to Mesos
 +  var driver: SchedulerDriver = null
 +
 +  // Which slave IDs we have executors on
 +  val slaveIdsWithExecutors = new HashSet[String]
 +  val taskIdToSlaveId = new HashMap[Long, String]
 +
 +  // An ExecutorInfo for our tasks
 +  var execArgs: Array[Byte] = null
 +
 +  var classLoader: ClassLoader = null
 +
 +  override def start() {
 +    synchronized {
 +      classLoader = Thread.currentThread.getContextClassLoader
 +
 +      new Thread("MesosSchedulerBackend driver") {
 +        setDaemon(true)
 +        override def run() {
 +          val scheduler = MesosSchedulerBackend.this
 +          val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(appName).build()
 +          driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
 +          try {
 +            val ret = driver.run()
 +            logInfo("driver.run() returned with code " + ret)
 +          } catch {
 +            case e: Exception => logError("driver.run() failed", e)
 +          }
 +        }
 +      }.start()
 +
 +      waitForRegister()
 +    }
 +  }
 +
 +  def createExecutorInfo(execId: String): ExecutorInfo = {
 +    val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException(
 +      "Spark home is not set; set it through the spark.home system " +
 +      "property, the SPARK_HOME environment variable or the SparkContext constructor"))
 +    val environment = Environment.newBuilder()
 +    sc.executorEnvs.foreach { case (key, value) =>
 +      environment.addVariables(Environment.Variable.newBuilder()
 +        .setName(key)
 +        .setValue(value)
 +        .build())
 +    }
 +    val command = CommandInfo.newBuilder()
 +      .setEnvironment(environment)
 +    val uri = System.getProperty("spark.executor.uri")
 +    if (uri == null) {
-       command.setValue(new File(sparkHome, "spark-executor").getCanonicalPath)
++      command.setValue(new File(sparkHome, "/sbin/spark-executor").getCanonicalPath)
 +    } else {
 +      // Grab everything to the first '.'. We'll use that and '*' to
 +      // glob the directory "correctly".
 +      val basename = uri.split('/').last.split('.').head
-       command.setValue("cd %s*; ./spark-executor".format(basename))
++      command.setValue("cd %s*; ./sbin/spark-executor".format(basename))
 +      command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
 +    }
 +    val memory = Resource.newBuilder()
 +      .setName("mem")
 +      .setType(Value.Type.SCALAR)
 +      .setScalar(Value.Scalar.newBuilder().setValue(executorMemory).build())
 +      .build()
 +    ExecutorInfo.newBuilder()
 +      .setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
 +      .setCommand(command)
 +      .setData(ByteString.copyFrom(createExecArg()))
 +      .addResources(memory)
 +      .build()
 +  }
 +
 +  /**
 +   * Create and serialize the executor argument to pass to Mesos. Our executor arg is an array
 +   * containing all the spark.* system properties in the form of (String, String) pairs.
 +   */
 +  private def createExecArg(): Array[Byte] = {
 +    if (execArgs == null) {
 +      val props = new HashMap[String, String]
 +      val iterator = System.getProperties.entrySet.iterator
 +      while (iterator.hasNext) {
 +        val entry = iterator.next
 +        val (key, value) = (entry.getKey.toString, entry.getValue.toString)
 +        if (key.startsWith("spark.")) {
 +          props(key) = value
 +        }
 +      }
 +      // Serialize the map as an array of (String, String) pairs
 +      execArgs = Utils.serialize(props.toArray)
 +    }
 +    return execArgs
 +  }
 +
 +  private def setClassLoader(): ClassLoader = {
 +    val oldClassLoader = Thread.currentThread.getContextClassLoader
 +    Thread.currentThread.setContextClassLoader(classLoader)
 +    return oldClassLoader
 +  }
 +
 +  private def restoreClassLoader(oldClassLoader: ClassLoader) {
 +    Thread.currentThread.setContextClassLoader(oldClassLoader)
 +  }
 +
 +  override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
 +
 +  override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
 +    val oldClassLoader = setClassLoader()
 +    try {
 +      logInfo("Registered as framework ID " + frameworkId.getValue)
 +      registeredLock.synchronized {
 +        isRegistered = true
 +        registeredLock.notifyAll()
 +      }
 +    } finally {
 +      restoreClassLoader(oldClassLoader)
 +    }
 +  }
 +
 +  def waitForRegister() {
 +    registeredLock.synchronized {
 +      while (!isRegistered) {
 +        registeredLock.wait()
 +      }
 +    }
 +  }
 +
 +  override def disconnected(d: SchedulerDriver) {}
 +
 +  override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {}
 +
 +  /**
 +   * Method called by Mesos to offer resources on slaves. We resond by asking our active task sets
 +   * for tasks in order of priority. We fill each node with tasks in a round-robin manner so that
 +   * tasks are balanced across the cluster.
 +   */
 +  override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
 +    val oldClassLoader = setClassLoader()
 +    try {
 +      synchronized {
 +        // Build a big list of the offerable workers, and remember their indices so that we can
 +        // figure out which Offer to reply to for each worker
 +        val offerableIndices = new ArrayBuffer[Int]
 +        val offerableWorkers = new ArrayBuffer[WorkerOffer]
 +
 +        def enoughMemory(o: Offer) = {
 +          val mem = getResource(o.getResourcesList, "mem")
 +          val slaveId = o.getSlaveId.getValue
 +          mem >= executorMemory || slaveIdsWithExecutors.contains(slaveId)
 +        }
 +
 +        for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) {
 +          offerableIndices += index
 +          offerableWorkers += new WorkerOffer(
 +            offer.getSlaveId.getValue,
 +            offer.getHostname,
 +            getResource(offer.getResourcesList, "cpus").toInt)
 +        }
 +
 +        // Call into the ClusterScheduler
 +        val taskLists = scheduler.resourceOffers(offerableWorkers)
 +
 +        // Build a list of Mesos tasks for each slave
 +        val mesosTasks = offers.map(o => Collections.emptyList[MesosTaskInfo]())
 +        for ((taskList, index) <- taskLists.zipWithIndex) {
 +          if (!taskList.isEmpty) {
 +            val offerNum = offerableIndices(index)
 +            val slaveId = offers(offerNum).getSlaveId.getValue
 +            slaveIdsWithExecutors += slaveId
 +            mesosTasks(offerNum) = new JArrayList[MesosTaskInfo](taskList.size)
 +            for (taskDesc <- taskList) {
 +              taskIdToSlaveId(taskDesc.taskId) = slaveId
 +              mesosTasks(offerNum).add(createMesosTask(taskDesc, slaveId))
 +            }
 +          }
 +        }
 +
 +        // Reply to the offers
 +        val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?
 +        for (i <- 0 until offers.size) {
 +          d.launchTasks(offers(i).getId, mesosTasks(i), filters)
 +        }
 +      }
 +    } finally {
 +      restoreClassLoader(oldClassLoader)
 +    }
 +  }
 +
 +  /** Helper function to pull out a resource from a Mesos Resources protobuf */
 +  def getResource(res: JList[Resource], name: String): Double = {
 +    for (r <- res if r.getName == name) {
 +      return r.getScalar.getValue
 +    }
 +    // If we reached here, no resource with the required name was present
 +    throw new IllegalArgumentException("No resource called " + name + " in " + res)
 +  }
 +
 +  /** Turn a Spark TaskDescription into a Mesos task */
 +  def createMesosTask(task: TaskDescription, slaveId: String): MesosTaskInfo = {
 +    val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build()
 +    val cpuResource = Resource.newBuilder()
 +      .setName("cpus")
 +      .setType(Value.Type.SCALAR)
 +      .setScalar(Value.Scalar.newBuilder().setValue(1).build())
 +      .build()
 +    return MesosTaskInfo.newBuilder()
 +      .setTaskId(taskId)
 +      .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
 +      .setExecutor(createExecutorInfo(slaveId))
 +      .setName(task.name)
 +      .addResources(cpuResource)
 +      .setData(ByteString.copyFrom(task.serializedTask))
 +      .build()
 +  }
 +
 +  /** Check whether a Mesos task state represents a finished task */
 +  def isFinished(state: MesosTaskState) = {
 +    state == MesosTaskState.TASK_FINISHED ||
 +      state == MesosTaskState.TASK_FAILED ||
 +      state == MesosTaskState.TASK_KILLED ||
 +      state == MesosTaskState.TASK_LOST
 +  }
 +
 +  override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
 +    val oldClassLoader = setClassLoader()
 +    try {
 +      val tid = status.getTaskId.getValue.toLong
 +      val state = TaskState.fromMesos(status.getState)
 +      synchronized {
 +        if (status.getState == MesosTaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) {
 +          // We lost the executor on this slave, so remember that it's gone
 +          slaveIdsWithExecutors -= taskIdToSlaveId(tid)
 +        }
 +        if (isFinished(status.getState)) {
 +          taskIdToSlaveId.remove(tid)
 +        }
 +      }
 +      scheduler.statusUpdate(tid, state, status.getData.asReadOnlyByteBuffer)
 +    } finally {
 +      restoreClassLoader(oldClassLoader)
 +    }
 +  }
 +
 +  override def error(d: SchedulerDriver, message: String) {
 +    val oldClassLoader = setClassLoader()
 +    try {
 +      logError("Mesos error: " + message)
 +      scheduler.error(message)
 +    } finally {
 +      restoreClassLoader(oldClassLoader)
 +    }
 +  }
 +
 +  override def stop() {
 +    if (driver != null) {
 +      driver.stop()
 +    }
 +  }
 +
 +  override def reviveOffers() {
 +    driver.reviveOffers()
 +  }
 +
 +  override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
 +
 +  private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) {
 +    val oldClassLoader = setClassLoader()
 +    try {
 +      logInfo("Mesos slave lost: " + slaveId.getValue)
 +      synchronized {
 +        slaveIdsWithExecutors -= slaveId.getValue
 +      }
 +      scheduler.executorLost(slaveId.getValue, reason)
 +    } finally {
 +      restoreClassLoader(oldClassLoader)
 +    }
 +  }
 +
 +  override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
 +    recordSlaveLost(d, slaveId, SlaveLost())
 +  }
 +
 +  override def executorLost(d: SchedulerDriver, executorId: ExecutorID,
 +                            slaveId: SlaveID, status: Int) {
 +    logInfo("Executor lost: %s, marking slave %s as lost".format(executorId.getValue,
 +                                                                 slaveId.getValue))
 +    recordSlaveLost(d, slaveId, ExecutorExited(status))
 +  }
 +
 +  // TODO: query Mesos for number of cores
 +  override def defaultParallelism() = System.getProperty("spark.default.parallelism", "8").toInt
 +}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/84849baf/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
----------------------------------------------------------------------