You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/03 20:25:02 UTC
[11/21] git commit: Merge branch 'reorgscripts' into scripts-reorg
Merge branch 'reorgscripts' into scripts-reorg
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/84849baf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/84849baf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/84849baf
Branch: refs/heads/master
Commit: 84849baf88d31cfaaeee158a947c4db1abe94ce6
Parents: 714fdab 3a5aa92
Author: shane-huang <sh...@intel.com>
Authored: Fri Sep 27 09:28:33 2013 +0800
Committer: shane-huang <sh...@intel.com>
Committed: Fri Sep 27 09:28:33 2013 +0800
----------------------------------------------------------------------
assembly/src/main/assembly/assembly.xml | 11 +-
bin/compute-classpath.cmd | 69 --
bin/compute-classpath.sh | 61 --
bin/pyspark | 66 ++
bin/pyspark.cmd | 23 +
bin/pyspark2.cmd | 55 +
bin/run-example | 81 ++
bin/run-example.cmd | 23 +
bin/run-example2.cmd | 61 ++
bin/slaves.sh | 74 --
bin/spark | 92 ++
bin/spark-config.sh | 36 -
bin/spark-daemon.sh | 164 ---
bin/spark-daemons.sh | 35 -
bin/spark-shell | 87 ++
bin/spark-shell.cmd | 23 +
bin/start-all.sh | 34 -
bin/start-master.sh | 52 -
bin/start-slave.sh | 35 -
bin/start-slaves.sh | 48 -
bin/stop-all.sh | 32 -
bin/stop-master.sh | 27 -
bin/stop-slaves.sh | 37 -
.../spark/deploy/worker/ExecutorRunner.scala | 2 +-
.../mesos/CoarseMesosSchedulerBackend.scala | 4 +-
.../cluster/mesos/MesosSchedulerBackend.scala | 4 +-
.../apache/spark/ui/UIWorkloadGenerator.scala | 2 +-
.../scala/org/apache/spark/DriverSuite.scala | 2 +-
data/kmeans_data.txt | 6 +
data/lr_data.txt | 1000 ++++++++++++++++++
data/pagerank_data.txt | 6 +
docs/running-on-yarn.md | 4 +-
docs/spark-standalone.md | 14 +-
kmeans_data.txt | 6 -
lr_data.txt | 1000 ------------------
make-distribution.sh | 5 +-
pagerank_data.txt | 6 -
pyspark | 66 --
pyspark.cmd | 23 -
pyspark2.cmd | 55 -
python/pyspark/java_gateway.py | 2 +-
python/run-tests | 2 +-
run-example | 81 --
run-example.cmd | 23 -
run-example2.cmd | 61 --
sbin/compute-classpath.cmd | 69 ++
sbin/compute-classpath.sh | 61 ++
sbin/slaves.sh | 74 ++
sbin/spark-class | 117 ++
sbin/spark-class.cmd | 23 +
sbin/spark-class2.cmd | 78 ++
sbin/spark-config.sh | 36 +
sbin/spark-daemon.sh | 164 +++
sbin/spark-daemons.sh | 35 +
sbin/spark-executor | 23 +
sbin/start-all.sh | 34 +
sbin/start-master.sh | 52 +
sbin/start-slave.sh | 35 +
sbin/start-slaves.sh | 48 +
sbin/stop-all.sh | 32 +
sbin/stop-master.sh | 27 +
sbin/stop-slaves.sh | 37 +
spark-class | 117 --
spark-class.cmd | 23 -
spark-class2.cmd | 78 --
spark-executor | 22 -
spark-shell | 87 --
spark-shell.cmd | 22 -
68 files changed, 2491 insertions(+), 2403 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/84849baf/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 8f2eef9,0000000..15b3397
mode 100644,000000..100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@@ -1,286 -1,0 +1,286 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.io.File
+import java.util.{ArrayList => JArrayList, List => JList}
+import java.util.Collections
+
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.collection.JavaConversions._
+
+import com.google.protobuf.ByteString
+import org.apache.mesos.{Scheduler => MScheduler}
+import org.apache.mesos._
+import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
+
+import org.apache.spark.{SparkException, Logging, SparkContext, TaskState}
+import org.apache.spark.scheduler.cluster.{ClusterScheduler, StandaloneSchedulerBackend}
+
+/**
+ * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
+ * onto each Mesos node for the duration of the Spark job instead of relinquishing cores whenever
+ * a task is done. It launches Spark tasks within the coarse-grained Mesos tasks using the
+ * StandaloneBackend mechanism. This class is useful for lower and more predictable latency.
+ *
+ * Unfortunately this has a bit of duplication from MesosSchedulerBackend, but it seems hard to
+ * remove this.
+ */
+private[spark] class CoarseMesosSchedulerBackend(
+ scheduler: ClusterScheduler,
+ sc: SparkContext,
+ master: String,
+ appName: String)
+ extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem)
+ with MScheduler
+ with Logging {
+
+ val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures
+
+ // Lock used to wait for scheduler to be registered
+ var isRegistered = false
+ val registeredLock = new Object()
+
+ // Driver for talking to Mesos
+ var driver: SchedulerDriver = null
+
+ // Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
+ val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt
+
+ // Cores we have acquired with each Mesos task ID
+ val coresByTaskId = new HashMap[Int, Int]
+ var totalCoresAcquired = 0
+
+ val slaveIdsWithExecutors = new HashSet[String]
+
+ val taskIdToSlaveId = new HashMap[Int, String]
+ val failuresBySlaveId = new HashMap[String, Int] // How many times tasks on each slave failed
+
+ val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException(
+ "Spark home is not set; set it through the spark.home system " +
+ "property, the SPARK_HOME environment variable or the SparkContext constructor"))
+
+ val extraCoresPerSlave = System.getProperty("spark.mesos.extra.cores", "0").toInt
+
+ var nextMesosTaskId = 0
+
+ def newMesosTaskId(): Int = {
+ val id = nextMesosTaskId
+ nextMesosTaskId += 1
+ id
+ }
+
+ override def start() {
+ super.start()
+
+ synchronized {
+ new Thread("CoarseMesosSchedulerBackend driver") {
+ setDaemon(true)
+ override def run() {
+ val scheduler = CoarseMesosSchedulerBackend.this
+ val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(appName).build()
+ driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
+ try { {
+ val ret = driver.run()
+ logInfo("driver.run() returned with code " + ret)
+ }
+ } catch {
+ case e: Exception => logError("driver.run() failed", e)
+ }
+ }
+ }.start()
+
+ waitForRegister()
+ }
+ }
+
+ def createCommand(offer: Offer, numCores: Int): CommandInfo = {
+ val environment = Environment.newBuilder()
+ sc.executorEnvs.foreach { case (key, value) =>
+ environment.addVariables(Environment.Variable.newBuilder()
+ .setName(key)
+ .setValue(value)
+ .build())
+ }
+ val command = CommandInfo.newBuilder()
+ .setEnvironment(environment)
+ val driverUrl = "akka://spark@%s:%s/user/%s".format(
+ System.getProperty("spark.driver.host"),
+ System.getProperty("spark.driver.port"),
+ StandaloneSchedulerBackend.ACTOR_NAME)
+ val uri = System.getProperty("spark.executor.uri")
+ if (uri == null) {
- val runScript = new File(sparkHome, "spark-class").getCanonicalPath
++ val runScript = new File(sparkHome, "./sbin/spark-class").getCanonicalPath
+ command.setValue(
+ "\"%s\" org.apache.spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
+ runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
+ } else {
+ // Grab everything to the first '.'. We'll use that and '*' to
+ // glob the directory "correctly".
+ val basename = uri.split('/').last.split('.').head
+ command.setValue(
- "cd %s*; ./spark-class org.apache.spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
++ "cd %s*; ./sbin/spark-class org.apache.spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
+ basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
+ command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
+ }
+ return command.build()
+ }
+
+ override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
+
+ override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
+ logInfo("Registered as framework ID " + frameworkId.getValue)
+ registeredLock.synchronized {
+ isRegistered = true
+ registeredLock.notifyAll()
+ }
+ }
+
+ def waitForRegister() {
+ registeredLock.synchronized {
+ while (!isRegistered) {
+ registeredLock.wait()
+ }
+ }
+ }
+
+ override def disconnected(d: SchedulerDriver) {}
+
+ override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {}
+
+ /**
+ * Method called by Mesos to offer resources on slaves. We respond by launching an executor,
+ * unless we've already launched more than we wanted to.
+ */
+ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
+ synchronized {
+ val filters = Filters.newBuilder().setRefuseSeconds(-1).build()
+
+ for (offer <- offers) {
+ val slaveId = offer.getSlaveId.toString
+ val mem = getResource(offer.getResourcesList, "mem")
+ val cpus = getResource(offer.getResourcesList, "cpus").toInt
+ if (totalCoresAcquired < maxCores && mem >= executorMemory && cpus >= 1 &&
+ failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
+ !slaveIdsWithExecutors.contains(slaveId)) {
+ // Launch an executor on the slave
+ val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
+ val taskId = newMesosTaskId()
+ taskIdToSlaveId(taskId) = slaveId
+ slaveIdsWithExecutors += slaveId
+ coresByTaskId(taskId) = cpusToUse
+ val task = MesosTaskInfo.newBuilder()
+ .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
+ .setSlaveId(offer.getSlaveId)
+ .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave))
+ .setName("Task " + taskId)
+ .addResources(createResource("cpus", cpusToUse))
+ .addResources(createResource("mem", executorMemory))
+ .build()
+ d.launchTasks(offer.getId, Collections.singletonList(task), filters)
+ } else {
+ // Filter it out
+ d.launchTasks(offer.getId, Collections.emptyList[MesosTaskInfo](), filters)
+ }
+ }
+ }
+ }
+
+ /** Helper function to pull out a resource from a Mesos Resources protobuf */
+ private def getResource(res: JList[Resource], name: String): Double = {
+ for (r <- res if r.getName == name) {
+ return r.getScalar.getValue
+ }
+ // If we reached here, no resource with the required name was present
+ throw new IllegalArgumentException("No resource called " + name + " in " + res)
+ }
+
+ /** Build a Mesos resource protobuf object */
+ private def createResource(resourceName: String, quantity: Double): Protos.Resource = {
+ Resource.newBuilder()
+ .setName(resourceName)
+ .setType(Value.Type.SCALAR)
+ .setScalar(Value.Scalar.newBuilder().setValue(quantity).build())
+ .build()
+ }
+
+ /** Check whether a Mesos task state represents a finished task */
+ private def isFinished(state: MesosTaskState) = {
+ state == MesosTaskState.TASK_FINISHED ||
+ state == MesosTaskState.TASK_FAILED ||
+ state == MesosTaskState.TASK_KILLED ||
+ state == MesosTaskState.TASK_LOST
+ }
+
+ override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
+ val taskId = status.getTaskId.getValue.toInt
+ val state = status.getState
+ logInfo("Mesos task " + taskId + " is now " + state)
+ synchronized {
+ if (isFinished(state)) {
+ val slaveId = taskIdToSlaveId(taskId)
+ slaveIdsWithExecutors -= slaveId
+ taskIdToSlaveId -= taskId
+ // Remove the cores we have remembered for this task, if it's in the hashmap
+ for (cores <- coresByTaskId.get(taskId)) {
+ totalCoresAcquired -= cores
+ coresByTaskId -= taskId
+ }
+ // If it was a failure, mark the slave as failed for blacklisting purposes
+ if (state == MesosTaskState.TASK_FAILED || state == MesosTaskState.TASK_LOST) {
+ failuresBySlaveId(slaveId) = failuresBySlaveId.getOrElse(slaveId, 0) + 1
+ if (failuresBySlaveId(slaveId) >= MAX_SLAVE_FAILURES) {
+ logInfo("Blacklisting Mesos slave " + slaveId + " due to too many failures; " +
+ "is Spark installed on it?")
+ }
+ }
+ driver.reviveOffers() // In case we'd rejected everything before but have now lost a node
+ }
+ }
+ }
+
+ override def error(d: SchedulerDriver, message: String) {
+ logError("Mesos error: " + message)
+ scheduler.error(message)
+ }
+
+ override def stop() {
+ super.stop()
+ if (driver != null) {
+ driver.stop()
+ }
+ }
+
+ override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
+
+ override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
+ logInfo("Mesos slave lost: " + slaveId.getValue)
+ synchronized {
+ if (slaveIdsWithExecutors.contains(slaveId.getValue)) {
+ // Note that the slave ID corresponds to the executor ID on that slave
+ slaveIdsWithExecutors -= slaveId.getValue
+ removeExecutor(slaveId.getValue, "Mesos slave lost")
+ }
+ }
+ }
+
+ override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int) {
+ logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue))
+ slaveLost(d, s)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/84849baf/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 50cbc2c,0000000..7e9c05c
mode 100644,000000..100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@@ -1,345 -1,0 +1,345 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.io.File
+import java.util.{ArrayList => JArrayList, List => JList}
+import java.util.Collections
+
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.collection.JavaConversions._
+
+import com.google.protobuf.ByteString
+import org.apache.mesos.{Scheduler => MScheduler}
+import org.apache.mesos._
+import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
+
+import org.apache.spark.{Logging, SparkException, SparkContext, TaskState}
+import org.apache.spark.scheduler.TaskDescription
+import org.apache.spark.scheduler.cluster.{ClusterScheduler, ExecutorExited, ExecutorLossReason}
+import org.apache.spark.scheduler.cluster.{SchedulerBackend, SlaveLost, WorkerOffer}
+import org.apache.spark.util.Utils
+
+/**
+ * A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a
+ * separate Mesos task, allowing multiple applications to share cluster nodes both in space (tasks
+ * from multiple apps can run on different cores) and in time (a core can switch ownership).
+ */
+private[spark] class MesosSchedulerBackend(
+ scheduler: ClusterScheduler,
+ sc: SparkContext,
+ master: String,
+ appName: String)
+ extends SchedulerBackend
+ with MScheduler
+ with Logging {
+
+ // Lock used to wait for scheduler to be registered
+ var isRegistered = false
+ val registeredLock = new Object()
+
+ // Driver for talking to Mesos
+ var driver: SchedulerDriver = null
+
+ // Which slave IDs we have executors on
+ val slaveIdsWithExecutors = new HashSet[String]
+ val taskIdToSlaveId = new HashMap[Long, String]
+
+ // An ExecutorInfo for our tasks
+ var execArgs: Array[Byte] = null
+
+ var classLoader: ClassLoader = null
+
+ override def start() {
+ synchronized {
+ classLoader = Thread.currentThread.getContextClassLoader
+
+ new Thread("MesosSchedulerBackend driver") {
+ setDaemon(true)
+ override def run() {
+ val scheduler = MesosSchedulerBackend.this
+ val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(appName).build()
+ driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
+ try {
+ val ret = driver.run()
+ logInfo("driver.run() returned with code " + ret)
+ } catch {
+ case e: Exception => logError("driver.run() failed", e)
+ }
+ }
+ }.start()
+
+ waitForRegister()
+ }
+ }
+
+ def createExecutorInfo(execId: String): ExecutorInfo = {
+ val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException(
+ "Spark home is not set; set it through the spark.home system " +
+ "property, the SPARK_HOME environment variable or the SparkContext constructor"))
+ val environment = Environment.newBuilder()
+ sc.executorEnvs.foreach { case (key, value) =>
+ environment.addVariables(Environment.Variable.newBuilder()
+ .setName(key)
+ .setValue(value)
+ .build())
+ }
+ val command = CommandInfo.newBuilder()
+ .setEnvironment(environment)
+ val uri = System.getProperty("spark.executor.uri")
+ if (uri == null) {
- command.setValue(new File(sparkHome, "spark-executor").getCanonicalPath)
++ command.setValue(new File(sparkHome, "/sbin/spark-executor").getCanonicalPath)
+ } else {
+ // Grab everything to the first '.'. We'll use that and '*' to
+ // glob the directory "correctly".
+ val basename = uri.split('/').last.split('.').head
- command.setValue("cd %s*; ./spark-executor".format(basename))
++ command.setValue("cd %s*; ./sbin/spark-executor".format(basename))
+ command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
+ }
+ val memory = Resource.newBuilder()
+ .setName("mem")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Value.Scalar.newBuilder().setValue(executorMemory).build())
+ .build()
+ ExecutorInfo.newBuilder()
+ .setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
+ .setCommand(command)
+ .setData(ByteString.copyFrom(createExecArg()))
+ .addResources(memory)
+ .build()
+ }
+
+ /**
+ * Create and serialize the executor argument to pass to Mesos. Our executor arg is an array
+ * containing all the spark.* system properties in the form of (String, String) pairs.
+ */
+ private def createExecArg(): Array[Byte] = {
+ if (execArgs == null) {
+ val props = new HashMap[String, String]
+ val iterator = System.getProperties.entrySet.iterator
+ while (iterator.hasNext) {
+ val entry = iterator.next
+ val (key, value) = (entry.getKey.toString, entry.getValue.toString)
+ if (key.startsWith("spark.")) {
+ props(key) = value
+ }
+ }
+ // Serialize the map as an array of (String, String) pairs
+ execArgs = Utils.serialize(props.toArray)
+ }
+ return execArgs
+ }
+
+ private def setClassLoader(): ClassLoader = {
+ val oldClassLoader = Thread.currentThread.getContextClassLoader
+ Thread.currentThread.setContextClassLoader(classLoader)
+ return oldClassLoader
+ }
+
+ private def restoreClassLoader(oldClassLoader: ClassLoader) {
+ Thread.currentThread.setContextClassLoader(oldClassLoader)
+ }
+
+ override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
+
+ override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
+ val oldClassLoader = setClassLoader()
+ try {
+ logInfo("Registered as framework ID " + frameworkId.getValue)
+ registeredLock.synchronized {
+ isRegistered = true
+ registeredLock.notifyAll()
+ }
+ } finally {
+ restoreClassLoader(oldClassLoader)
+ }
+ }
+
+ def waitForRegister() {
+ registeredLock.synchronized {
+ while (!isRegistered) {
+ registeredLock.wait()
+ }
+ }
+ }
+
+ override def disconnected(d: SchedulerDriver) {}
+
+ override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {}
+
+ /**
+ * Method called by Mesos to offer resources on slaves. We resond by asking our active task sets
+ * for tasks in order of priority. We fill each node with tasks in a round-robin manner so that
+ * tasks are balanced across the cluster.
+ */
+ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
+ val oldClassLoader = setClassLoader()
+ try {
+ synchronized {
+ // Build a big list of the offerable workers, and remember their indices so that we can
+ // figure out which Offer to reply to for each worker
+ val offerableIndices = new ArrayBuffer[Int]
+ val offerableWorkers = new ArrayBuffer[WorkerOffer]
+
+ def enoughMemory(o: Offer) = {
+ val mem = getResource(o.getResourcesList, "mem")
+ val slaveId = o.getSlaveId.getValue
+ mem >= executorMemory || slaveIdsWithExecutors.contains(slaveId)
+ }
+
+ for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) {
+ offerableIndices += index
+ offerableWorkers += new WorkerOffer(
+ offer.getSlaveId.getValue,
+ offer.getHostname,
+ getResource(offer.getResourcesList, "cpus").toInt)
+ }
+
+ // Call into the ClusterScheduler
+ val taskLists = scheduler.resourceOffers(offerableWorkers)
+
+ // Build a list of Mesos tasks for each slave
+ val mesosTasks = offers.map(o => Collections.emptyList[MesosTaskInfo]())
+ for ((taskList, index) <- taskLists.zipWithIndex) {
+ if (!taskList.isEmpty) {
+ val offerNum = offerableIndices(index)
+ val slaveId = offers(offerNum).getSlaveId.getValue
+ slaveIdsWithExecutors += slaveId
+ mesosTasks(offerNum) = new JArrayList[MesosTaskInfo](taskList.size)
+ for (taskDesc <- taskList) {
+ taskIdToSlaveId(taskDesc.taskId) = slaveId
+ mesosTasks(offerNum).add(createMesosTask(taskDesc, slaveId))
+ }
+ }
+ }
+
+ // Reply to the offers
+ val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?
+ for (i <- 0 until offers.size) {
+ d.launchTasks(offers(i).getId, mesosTasks(i), filters)
+ }
+ }
+ } finally {
+ restoreClassLoader(oldClassLoader)
+ }
+ }
+
+ /** Helper function to pull out a resource from a Mesos Resources protobuf */
+ def getResource(res: JList[Resource], name: String): Double = {
+ for (r <- res if r.getName == name) {
+ return r.getScalar.getValue
+ }
+ // If we reached here, no resource with the required name was present
+ throw new IllegalArgumentException("No resource called " + name + " in " + res)
+ }
+
+ /** Turn a Spark TaskDescription into a Mesos task */
+ def createMesosTask(task: TaskDescription, slaveId: String): MesosTaskInfo = {
+ val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build()
+ val cpuResource = Resource.newBuilder()
+ .setName("cpus")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Value.Scalar.newBuilder().setValue(1).build())
+ .build()
+ return MesosTaskInfo.newBuilder()
+ .setTaskId(taskId)
+ .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
+ .setExecutor(createExecutorInfo(slaveId))
+ .setName(task.name)
+ .addResources(cpuResource)
+ .setData(ByteString.copyFrom(task.serializedTask))
+ .build()
+ }
+
+ /** Check whether a Mesos task state represents a finished task */
+ def isFinished(state: MesosTaskState) = {
+ state == MesosTaskState.TASK_FINISHED ||
+ state == MesosTaskState.TASK_FAILED ||
+ state == MesosTaskState.TASK_KILLED ||
+ state == MesosTaskState.TASK_LOST
+ }
+
+ override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
+ val oldClassLoader = setClassLoader()
+ try {
+ val tid = status.getTaskId.getValue.toLong
+ val state = TaskState.fromMesos(status.getState)
+ synchronized {
+ if (status.getState == MesosTaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) {
+ // We lost the executor on this slave, so remember that it's gone
+ slaveIdsWithExecutors -= taskIdToSlaveId(tid)
+ }
+ if (isFinished(status.getState)) {
+ taskIdToSlaveId.remove(tid)
+ }
+ }
+ scheduler.statusUpdate(tid, state, status.getData.asReadOnlyByteBuffer)
+ } finally {
+ restoreClassLoader(oldClassLoader)
+ }
+ }
+
+ override def error(d: SchedulerDriver, message: String) {
+ val oldClassLoader = setClassLoader()
+ try {
+ logError("Mesos error: " + message)
+ scheduler.error(message)
+ } finally {
+ restoreClassLoader(oldClassLoader)
+ }
+ }
+
+ override def stop() {
+ if (driver != null) {
+ driver.stop()
+ }
+ }
+
+ override def reviveOffers() {
+ driver.reviveOffers()
+ }
+
+ override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
+
+ private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) {
+ val oldClassLoader = setClassLoader()
+ try {
+ logInfo("Mesos slave lost: " + slaveId.getValue)
+ synchronized {
+ slaveIdsWithExecutors -= slaveId.getValue
+ }
+ scheduler.executorLost(slaveId.getValue, reason)
+ } finally {
+ restoreClassLoader(oldClassLoader)
+ }
+ }
+
+ override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
+ recordSlaveLost(d, slaveId, SlaveLost())
+ }
+
+ override def executorLost(d: SchedulerDriver, executorId: ExecutorID,
+ slaveId: SlaveID, status: Int) {
+ logInfo("Executor lost: %s, marking slave %s as lost".format(executorId.getValue,
+ slaveId.getValue))
+ recordSlaveLost(d, slaveId, ExecutorExited(status))
+ }
+
+ // TODO: query Mesos for number of cores
+ override def defaultParallelism() = System.getProperty("spark.default.parallelism", "8").toInt
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/84849baf/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
----------------------------------------------------------------------