You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/09/01 23:59:17 UTC

[33/69] [abbrv] [partial] Initial work to rename package to org.apache.spark

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskSetManager.scala
new file mode 100644
index 0000000..648a3ef
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskSetManager.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import java.nio.ByteBuffer
+
+import org.apache.spark.TaskState.TaskState
+import org.apache.spark.scheduler.TaskSet
+
+/**
+ * Tracks and schedules the tasks within a single TaskSet. This class keeps track of the status of
+ * each task and is responsible for retries on failure and locality. The main interfaces to it
+ * are resourceOffer, which asks the TaskSet whether it wants to run a task on one node, and
+ * statusUpdate, which tells it that one of its tasks changed state (e.g. finished).
+ *
+ * THREADING: This class is designed to only be called from code with a lock on the TaskScheduler
+ * (e.g. its event handlers). It should not be called from other threads.
+ */
+private[spark] trait TaskSetManager extends Schedulable {
+  def schedulableQueue = null
+  
+  def schedulingMode = SchedulingMode.NONE
+  
+  def taskSet: TaskSet
+
+  def resourceOffer(
+      execId: String,
+      host: String,
+      availableCpus: Int,
+      maxLocality: TaskLocality.TaskLocality)
+    : Option[TaskDescription]
+
+  def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer)
+
+  def error(message: String)
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/cluster/WorkerOffer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/WorkerOffer.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/WorkerOffer.scala
new file mode 100644
index 0000000..938f628
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/WorkerOffer.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+/**
+ * Represents free resources available on an executor.
+ */
+private[spark]
+class WorkerOffer(val executorId: String, val host: String, val cores: Int)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
new file mode 100644
index 0000000..f0ebe66
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.local
+
+import java.io.File
+import java.lang.management.ManagementFactory
+import java.util.concurrent.atomic.AtomicInteger
+import java.nio.ByteBuffer
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.HashSet
+
+import org.apache.spark._
+import org.apache.spark.TaskState.TaskState
+import org.apache.spark.executor.ExecutorURLClassLoader
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster._
+import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
+import akka.actor._
+
+/**
+ * A FIFO or Fair TaskScheduler implementation that runs tasks locally in a thread pool. Optionally
+ * the scheduler also allows each task to fail up to maxFailures times, which is useful for
+ * testing fault recovery.
+ */
+
+private[spark]
+case class LocalReviveOffers()
+
+private[spark]
+case class LocalStatusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer)
+
+private[spark]
+class LocalActor(localScheduler: LocalScheduler, var freeCores: Int) extends Actor with Logging {
+
+  def receive = {
+    case LocalReviveOffers =>
+      launchTask(localScheduler.resourceOffer(freeCores))
+    case LocalStatusUpdate(taskId, state, serializeData) =>
+      freeCores += 1
+      localScheduler.statusUpdate(taskId, state, serializeData)
+      launchTask(localScheduler.resourceOffer(freeCores))
+  }
+
+  def launchTask(tasks : Seq[TaskDescription]) {
+    for (task <- tasks) {
+      freeCores -= 1
+      localScheduler.threadPool.submit(new Runnable {
+        def run() {
+          localScheduler.runTask(task.taskId, task.serializedTask)
+        }
+      })
+    }
+  }
+}
+
+private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc: SparkContext)
+  extends TaskScheduler
+  with Logging {
+
+  var attemptId = new AtomicInteger(0)
+  var threadPool = Utils.newDaemonFixedThreadPool(threads)
+  val env = SparkEnv.get
+  var listener: TaskSchedulerListener = null
+
+  // Application dependencies (added through SparkContext) that we've fetched so far on this node.
+  // Each map holds the master's timestamp for the version of that file or JAR we got.
+  val currentFiles: HashMap[String, Long] = new HashMap[String, Long]()
+  val currentJars: HashMap[String, Long] = new HashMap[String, Long]()
+
+  val classLoader = new ExecutorURLClassLoader(Array(), Thread.currentThread.getContextClassLoader)
+
+  var schedulableBuilder: SchedulableBuilder = null
+  var rootPool: Pool = null
+  val schedulingMode: SchedulingMode = SchedulingMode.withName(
+    System.getProperty("spark.cluster.schedulingmode", "FIFO"))
+  val activeTaskSets = new HashMap[String, TaskSetManager]
+  val taskIdToTaskSetId = new HashMap[Long, String]
+  val taskSetTaskIds = new HashMap[String, HashSet[Long]]
+
+  var localActor: ActorRef = null
+
+  override def start() {
+    // temporarily set rootPool name to empty
+    rootPool = new Pool("", schedulingMode, 0, 0)
+    schedulableBuilder = {
+      schedulingMode match {
+        case SchedulingMode.FIFO =>
+          new FIFOSchedulableBuilder(rootPool)
+        case SchedulingMode.FAIR =>
+          new FairSchedulableBuilder(rootPool)
+      }
+    }
+    schedulableBuilder.buildPools()
+
+    localActor = env.actorSystem.actorOf(Props(new LocalActor(this, threads)), "Test")
+  }
+
+  override def setListener(listener: TaskSchedulerListener) {
+    this.listener = listener
+  }
+
+  override def submitTasks(taskSet: TaskSet) {
+    synchronized {
+      val manager = new LocalTaskSetManager(this, taskSet)
+      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
+      activeTaskSets(taskSet.id) = manager
+      taskSetTaskIds(taskSet.id) = new HashSet[Long]()
+      localActor ! LocalReviveOffers
+    }
+  }
+
+  def resourceOffer(freeCores: Int): Seq[TaskDescription] = {
+    synchronized {
+      var freeCpuCores = freeCores
+      val tasks = new ArrayBuffer[TaskDescription](freeCores)
+      val sortedTaskSetQueue = rootPool.getSortedTaskSetQueue()
+      for (manager <- sortedTaskSetQueue) {
+        logDebug("parentName:%s,name:%s,runningTasks:%s".format(
+          manager.parent.name, manager.name, manager.runningTasks))
+      }
+
+      var launchTask = false
+      for (manager <- sortedTaskSetQueue) {
+        do {
+          launchTask = false
+          manager.resourceOffer(null, null, freeCpuCores, null) match {
+            case Some(task) =>
+              tasks += task
+              taskIdToTaskSetId(task.taskId) = manager.taskSet.id
+              taskSetTaskIds(manager.taskSet.id) += task.taskId
+              freeCpuCores -= 1
+              launchTask = true
+            case None => {}
+          }
+        } while(launchTask)
+      }
+      return tasks
+    }
+  }
+
+  def taskSetFinished(manager: TaskSetManager) {
+    synchronized {
+      activeTaskSets -= manager.taskSet.id
+      manager.parent.removeSchedulable(manager)
+      logInfo("Remove TaskSet %s from pool %s".format(manager.taskSet.id, manager.parent.name))
+      taskIdToTaskSetId --= taskSetTaskIds(manager.taskSet.id)
+      taskSetTaskIds -= manager.taskSet.id
+    }
+  }
+
+  def runTask(taskId: Long, bytes: ByteBuffer) {
+    logInfo("Running " + taskId)
+    val info = new TaskInfo(taskId, 0, System.currentTimeMillis(), "local", "local:1", TaskLocality.NODE_LOCAL)
+    // Set the Spark execution environment for the worker thread
+    SparkEnv.set(env)
+    val ser = SparkEnv.get.closureSerializer.newInstance()
+    val objectSer = SparkEnv.get.serializer.newInstance()
+    var attemptedTask: Option[Task[_]] = None   
+    val start = System.currentTimeMillis()
+    var taskStart: Long = 0
+    def getTotalGCTime = ManagementFactory.getGarbageCollectorMXBeans.map(g => g.getCollectionTime).sum
+    val startGCTime = getTotalGCTime
+
+    try {
+      Accumulators.clear()
+      Thread.currentThread().setContextClassLoader(classLoader)
+
+      // Serialize and deserialize the task so that accumulators are changed to thread-local ones;
+      // this adds a bit of unnecessary overhead but matches how the Mesos Executor works.
+      val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(bytes)
+      updateDependencies(taskFiles, taskJars)   // Download any files added with addFile
+      val deserializedTask = ser.deserialize[Task[_]](
+        taskBytes, Thread.currentThread.getContextClassLoader)
+      attemptedTask = Some(deserializedTask)
+      val deserTime = System.currentTimeMillis() - start
+      taskStart = System.currentTimeMillis()
+
+      // Run it
+      val result: Any = deserializedTask.run(taskId)
+
+      // Serialize and deserialize the result to emulate what the Mesos
+      // executor does. This is useful to catch serialization errors early
+      // on in development (so when users move their local Spark programs
+      // to the cluster, they don't get surprised by serialization errors).
+      val serResult = objectSer.serialize(result)
+      deserializedTask.metrics.get.resultSize = serResult.limit()
+      val resultToReturn = objectSer.deserialize[Any](serResult)
+      val accumUpdates = ser.deserialize[collection.mutable.Map[Long, Any]](
+        ser.serialize(Accumulators.values))
+      val serviceTime = System.currentTimeMillis() - taskStart
+      logInfo("Finished " + taskId)
+      deserializedTask.metrics.get.executorRunTime = serviceTime.toInt
+      deserializedTask.metrics.get.jvmGCTime = getTotalGCTime - startGCTime
+      deserializedTask.metrics.get.executorDeserializeTime = deserTime.toInt
+      val taskResult = new TaskResult(result, accumUpdates, deserializedTask.metrics.getOrElse(null))
+      val serializedResult = ser.serialize(taskResult)
+      localActor ! LocalStatusUpdate(taskId, TaskState.FINISHED, serializedResult)
+    } catch {
+      case t: Throwable => {
+        val serviceTime = System.currentTimeMillis() - taskStart
+        val metrics = attemptedTask.flatMap(t => t.metrics)
+        for (m <- metrics) {
+          m.executorRunTime = serviceTime.toInt
+          m.jvmGCTime = getTotalGCTime - startGCTime
+        }
+        val failure = new ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics)
+        localActor ! LocalStatusUpdate(taskId, TaskState.FAILED, ser.serialize(failure))
+      }
+    }
+  }
+
+  /**
+   * Download any missing dependencies if we receive a new set of files and JARs from the
+   * SparkContext. Also adds any new JARs we fetched to the class loader.
+   */
+  private def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap[String, Long]) {
+    synchronized {
+      // Fetch missing dependencies
+      for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) {
+        logInfo("Fetching " + name + " with timestamp " + timestamp)
+        Utils.fetchFile(name, new File(SparkFiles.getRootDirectory))
+        currentFiles(name) = timestamp
+      }
+
+      for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) {
+        logInfo("Fetching " + name + " with timestamp " + timestamp)
+        Utils.fetchFile(name, new File(SparkFiles.getRootDirectory))
+        currentJars(name) = timestamp
+        // Add it to our class loader
+        val localName = name.split("/").last
+        val url = new File(SparkFiles.getRootDirectory, localName).toURI.toURL
+        if (!classLoader.getURLs.contains(url)) {
+          logInfo("Adding " + url + " to class loader")
+          classLoader.addURL(url)
+        }
+      }
+    }
+  }
+
+  def statusUpdate(taskId :Long, state: TaskState, serializedData: ByteBuffer) {
+    synchronized {
+      val taskSetId = taskIdToTaskSetId(taskId)
+      val taskSetManager = activeTaskSets(taskSetId)
+      taskSetTaskIds(taskSetId) -= taskId
+      taskSetManager.statusUpdate(taskId, state, serializedData)
+    }
+  }
+
+   override def stop() {
+    threadPool.shutdownNow()
+  }
+
+  override def defaultParallelism() = threads
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala
new file mode 100644
index 0000000..e52cb99
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.local
+
+import java.nio.ByteBuffer
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.HashMap
+
+import org.apache.spark.{ExceptionFailure, Logging, SparkEnv, Success, TaskState}
+import org.apache.spark.TaskState.TaskState
+import org.apache.spark.scheduler.{Task, TaskResult, TaskSet}
+import org.apache.spark.scheduler.cluster.{Schedulable, TaskDescription, TaskInfo, TaskLocality, TaskSetManager}
+
+
+private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: TaskSet)
+  extends TaskSetManager with Logging {
+
+  var parent: Schedulable = null
+  var weight: Int = 1
+  var minShare: Int = 0
+  var runningTasks: Int = 0
+  var priority: Int = taskSet.priority
+  var stageId: Int = taskSet.stageId
+  var name: String = "TaskSet_" + taskSet.stageId.toString
+
+  var failCount = new Array[Int](taskSet.tasks.size)
+  val taskInfos = new HashMap[Long, TaskInfo]
+  val numTasks = taskSet.tasks.size
+  var numFinished = 0
+  val env = SparkEnv.get
+  val ser = env.closureSerializer.newInstance()
+  val copiesRunning = new Array[Int](numTasks)
+  val finished = new Array[Boolean](numTasks)
+  val numFailures = new Array[Int](numTasks)
+  val MAX_TASK_FAILURES = sched.maxFailures
+
+  override def increaseRunningTasks(taskNum: Int): Unit = {
+    runningTasks += taskNum
+    if (parent != null) {
+     parent.increaseRunningTasks(taskNum)
+    }
+  }
+
+  override def decreaseRunningTasks(taskNum: Int): Unit = {
+    runningTasks -= taskNum
+    if (parent != null) {
+      parent.decreaseRunningTasks(taskNum)
+    }
+  }
+
+  override def addSchedulable(schedulable: Schedulable): Unit = {
+    // nothing
+  }
+
+  override def removeSchedulable(schedulable: Schedulable): Unit = {
+    // nothing
+  }
+
+  override def getSchedulableByName(name: String): Schedulable = {
+    return null
+  }
+
+  override def executorLost(executorId: String, host: String): Unit = {
+    // nothing
+  }
+
+  override def checkSpeculatableTasks() = true
+
+  override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
+    var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
+    sortedTaskSetQueue += this
+    return sortedTaskSetQueue
+  }
+
+  override def hasPendingTasks() = true
+
+  def findTask(): Option[Int] = {
+    for (i <- 0 to numTasks-1) {
+      if (copiesRunning(i) == 0 && !finished(i)) {
+        return Some(i)
+      }
+    }
+    return None
+  }
+
+  override def resourceOffer(
+      execId: String,
+      host: String,
+      availableCpus: Int,
+      maxLocality: TaskLocality.TaskLocality)
+    : Option[TaskDescription] =
+  {
+    SparkEnv.set(sched.env)
+    logDebug("availableCpus:%d, numFinished:%d, numTasks:%d".format(
+      availableCpus.toInt, numFinished, numTasks))
+    if (availableCpus > 0 && numFinished < numTasks) {
+      findTask() match {
+        case Some(index) =>
+          val taskId = sched.attemptId.getAndIncrement()
+          val task = taskSet.tasks(index)
+          val info = new TaskInfo(taskId, index, System.currentTimeMillis(), "local", "local:1",
+            TaskLocality.NODE_LOCAL)
+          taskInfos(taskId) = info
+          // We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here
+          // we assume the task can be serialized without exceptions.
+          val bytes = Task.serializeWithDependencies(
+            task, sched.sc.addedFiles, sched.sc.addedJars, ser)
+          logInfo("Size of task " + taskId + " is " + bytes.limit + " bytes")
+          val taskName = "task %s:%d".format(taskSet.id, index)
+          copiesRunning(index) += 1
+          increaseRunningTasks(1)
+          taskStarted(task, info)
+          return Some(new TaskDescription(taskId, null, taskName, index, bytes))
+        case None => {}
+      }
+    }
+    return None
+  }
+
+  override def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
+    SparkEnv.set(env)
+    state match {
+      case TaskState.FINISHED =>
+        taskEnded(tid, state, serializedData)
+      case TaskState.FAILED =>
+        taskFailed(tid, state, serializedData)
+      case _ => {}
+    }
+  }
+
+  def taskStarted(task: Task[_], info: TaskInfo) {
+    sched.listener.taskStarted(task, info)
+  }
+
+  def taskEnded(tid: Long, state: TaskState, serializedData: ByteBuffer) {
+    val info = taskInfos(tid)
+    val index = info.index
+    val task = taskSet.tasks(index)
+    info.markSuccessful()
+    val result = ser.deserialize[TaskResult[_]](serializedData, getClass.getClassLoader)
+    result.metrics.resultSize = serializedData.limit()
+    sched.listener.taskEnded(task, Success, result.value, result.accumUpdates, info, result.metrics)
+    numFinished += 1
+    decreaseRunningTasks(1)
+    finished(index) = true
+    if (numFinished == numTasks) {
+      sched.taskSetFinished(this)
+    }
+  }
+
+  def taskFailed(tid: Long, state: TaskState, serializedData: ByteBuffer) {
+    val info = taskInfos(tid)
+    val index = info.index
+    val task = taskSet.tasks(index)
+    info.markFailed()
+    decreaseRunningTasks(1)
+    val reason: ExceptionFailure = ser.deserialize[ExceptionFailure](
+      serializedData, getClass.getClassLoader)
+    sched.listener.taskEnded(task, reason, null, null, info, reason.metrics.getOrElse(null))
+    if (!finished(index)) {
+      copiesRunning(index) -= 1
+      numFailures(index) += 1
+      val locs = reason.stackTrace.map(loc => "\tat %s".format(loc.toString))
+      logInfo("Loss was due to %s\n%s\n%s".format(
+        reason.className, reason.description, locs.mkString("\n")))
+      if (numFailures(index) > MAX_TASK_FAILURES) {
+        val errorMessage = "Task %s:%d failed more than %d times; aborting job %s".format(
+          taskSet.id, index, 4, reason.description)
+        decreaseRunningTasks(runningTasks)
+        sched.listener.taskSetFailed(taskSet, errorMessage)
+        // need to delete failed Taskset from schedule queue
+        sched.taskSetFinished(this)
+      }
+    }
+  }
+
+  override def error(message: String) {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
new file mode 100644
index 0000000..f6a2fea
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.mesos
+
+import com.google.protobuf.ByteString
+
+import org.apache.mesos.{Scheduler => MScheduler}
+import org.apache.mesos._
+import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
+
+import org.apache.spark.{SparkException, Utils, Logging, SparkContext}
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.collection.JavaConversions._
+import java.io.File
+import org.apache.spark.scheduler.cluster._
+import java.util.{ArrayList => JArrayList, List => JList}
+import java.util.Collections
+import org.apache.spark.TaskState
+
+/**
+ * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
+ * onto each Mesos node for the duration of the Spark job instead of relinquishing cores whenever
+ * a task is done. It launches Spark tasks within the coarse-grained Mesos tasks using the
+ * StandaloneBackend mechanism. This class is useful for lower and more predictable latency.
+ *
+ * Unfortunately this has a bit of duplication from MesosSchedulerBackend, but it seems hard to
+ * remove this.
+ */
+private[spark] class CoarseMesosSchedulerBackend(
+    scheduler: ClusterScheduler,
+    sc: SparkContext,
+    master: String,
+    appName: String)
+  extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem)
+  with MScheduler
+  with Logging {
+
+  val MAX_SLAVE_FAILURES = 2     // Blacklist a slave after this many failures
+
+  // Lock used to wait for scheduler to be registered
+  var isRegistered = false
+  val registeredLock = new Object()
+
+  // Driver for talking to Mesos
+  var driver: SchedulerDriver = null
+
+  // Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
+  val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt
+
+  // Cores we have acquired with each Mesos task ID
+  val coresByTaskId = new HashMap[Int, Int]
+  var totalCoresAcquired = 0
+
+  val slaveIdsWithExecutors = new HashSet[String]
+
+  val taskIdToSlaveId = new HashMap[Int, String]
+  val failuresBySlaveId = new HashMap[String, Int] // How many times tasks on each slave failed
+
+  val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException(
+    "Spark home is not set; set it through the spark.home system " +
+    "property, the SPARK_HOME environment variable or the SparkContext constructor"))
+
+  val extraCoresPerSlave = System.getProperty("spark.mesos.extra.cores", "0").toInt
+
+  var nextMesosTaskId = 0
+
+  def newMesosTaskId(): Int = {
+    val id = nextMesosTaskId
+    nextMesosTaskId += 1
+    id
+  }
+
+  override def start() {
+    super.start()
+
+    synchronized {
+      new Thread("CoarseMesosSchedulerBackend driver") {
+        setDaemon(true)
+        override def run() {
+          val scheduler = CoarseMesosSchedulerBackend.this
+          val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(appName).build()
+          driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
+          try { {
+            val ret = driver.run()
+            logInfo("driver.run() returned with code " + ret)
+          }
+          } catch {
+            case e: Exception => logError("driver.run() failed", e)
+          }
+        }
+      }.start()
+
+      waitForRegister()
+    }
+  }
+
+  def createCommand(offer: Offer, numCores: Int): CommandInfo = {
+    val environment = Environment.newBuilder()
+    sc.executorEnvs.foreach { case (key, value) =>
+      environment.addVariables(Environment.Variable.newBuilder()
+        .setName(key)
+        .setValue(value)
+        .build())
+    }
+    val command = CommandInfo.newBuilder()
+      .setEnvironment(environment)
+    val driverUrl = "akka://spark@%s:%s/user/%s".format(
+      System.getProperty("spark.driver.host"),
+      System.getProperty("spark.driver.port"),
+      StandaloneSchedulerBackend.ACTOR_NAME)
+    val uri = System.getProperty("spark.executor.uri")
+    if (uri == null) {
+      val runScript = new File(sparkHome, "spark-class").getCanonicalPath
+      command.setValue(
+        "\"%s\" org.apache.spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
+          runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
+    } else {
+      // Grab everything to the first '.'. We'll use that and '*' to
+      // glob the directory "correctly".
+      val basename = uri.split('/').last.split('.').head
+      command.setValue(
+        "cd %s*; ./spark-class org.apache.spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
+          basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
+      command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
+    }
+    return command.build()
+  }
+
+  override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
+
+  override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
+    logInfo("Registered as framework ID " + frameworkId.getValue)
+    registeredLock.synchronized {
+      isRegistered = true
+      registeredLock.notifyAll()
+    }
+  }
+
+  def waitForRegister() {
+    registeredLock.synchronized {
+      while (!isRegistered) {
+        registeredLock.wait()
+      }
+    }
+  }
+
+  override def disconnected(d: SchedulerDriver) {}
+
+  override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {}
+
+  /**
+   * Method called by Mesos to offer resources on slaves. We respond by launching an executor,
+   * unless we've already launched more than we wanted to.
+   */
+  override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
+    synchronized {
+      val filters = Filters.newBuilder().setRefuseSeconds(-1).build()
+
+      for (offer <- offers) {
+        val slaveId = offer.getSlaveId.toString
+        val mem = getResource(offer.getResourcesList, "mem")
+        val cpus = getResource(offer.getResourcesList, "cpus").toInt
+        if (totalCoresAcquired < maxCores && mem >= executorMemory && cpus >= 1 &&
+            failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
+            !slaveIdsWithExecutors.contains(slaveId)) {
+          // Launch an executor on the slave
+          val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
+          val taskId = newMesosTaskId()
+          taskIdToSlaveId(taskId) = slaveId
+          slaveIdsWithExecutors += slaveId
+          coresByTaskId(taskId) = cpusToUse
+          val task = MesosTaskInfo.newBuilder()
+            .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
+            .setSlaveId(offer.getSlaveId)
+            .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave))
+            .setName("Task " + taskId)
+            .addResources(createResource("cpus", cpusToUse))
+            .addResources(createResource("mem", executorMemory))
+            .build()
+          d.launchTasks(offer.getId, Collections.singletonList(task), filters)
+        } else {
+          // Filter it out
+          d.launchTasks(offer.getId, Collections.emptyList[MesosTaskInfo](), filters)
+        }
+      }
+    }
+  }
+
+  /** Helper function to pull out a resource from a Mesos Resources protobuf */
+  private def getResource(res: JList[Resource], name: String): Double = {
+    for (r <- res if r.getName == name) {
+      return r.getScalar.getValue
+    }
+    // If we reached here, no resource with the required name was present
+    throw new IllegalArgumentException("No resource called " + name + " in " + res)
+  }
+
+  /** Build a Mesos resource protobuf object */
+  private def createResource(resourceName: String, quantity: Double): Protos.Resource = {
+    Resource.newBuilder()
+      .setName(resourceName)
+      .setType(Value.Type.SCALAR)
+      .setScalar(Value.Scalar.newBuilder().setValue(quantity).build())
+      .build()
+  }
+
+  /** Check whether a Mesos task state represents a finished task */
+  private def isFinished(state: MesosTaskState) = {
+    state == MesosTaskState.TASK_FINISHED ||
+      state == MesosTaskState.TASK_FAILED ||
+      state == MesosTaskState.TASK_KILLED ||
+      state == MesosTaskState.TASK_LOST
+  }
+
+  override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
+    val taskId = status.getTaskId.getValue.toInt
+    val state = status.getState
+    logInfo("Mesos task " + taskId + " is now " + state)
+    synchronized {
+      if (isFinished(state)) {
+        val slaveId = taskIdToSlaveId(taskId)
+        slaveIdsWithExecutors -= slaveId
+        taskIdToSlaveId -= taskId
+        // Remove the cores we have remembered for this task, if it's in the hashmap
+        for (cores <- coresByTaskId.get(taskId)) {
+          totalCoresAcquired -= cores
+          coresByTaskId -= taskId
+        }
+        // If it was a failure, mark the slave as failed for blacklisting purposes
+        if (state == MesosTaskState.TASK_FAILED || state == MesosTaskState.TASK_LOST) {
+          failuresBySlaveId(slaveId) = failuresBySlaveId.getOrElse(slaveId, 0) + 1
+          if (failuresBySlaveId(slaveId) >= MAX_SLAVE_FAILURES) {
+            logInfo("Blacklisting Mesos slave " + slaveId + " due to too many failures; " +
+                "is Spark installed on it?")
+          }
+        }
+        driver.reviveOffers() // In case we'd rejected everything before but have now lost a node
+      }
+    }
+  }
+
+  override def error(d: SchedulerDriver, message: String) {
+    logError("Mesos error: " + message)
+    scheduler.error(message)
+  }
+
+  override def stop() {
+    super.stop()
+    if (driver != null) {
+      driver.stop()
+    }
+  }
+
+  override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
+
+  override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
+    logInfo("Mesos slave lost: " + slaveId.getValue)
+    synchronized {
+      if (slaveIdsWithExecutors.contains(slaveId.getValue)) {
+        // Note that the slave ID corresponds to the executor ID on that slave
+        slaveIdsWithExecutors -= slaveId.getValue
+        removeExecutor(slaveId.getValue, "Mesos slave lost")
+      }
+    }
+  }
+
+  override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int) {
+    logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue))
+    slaveLost(d, s)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackend.scala
new file mode 100644
index 0000000..e002af1
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackend.scala
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.mesos
+
+import com.google.protobuf.ByteString
+
+import org.apache.mesos.{Scheduler => MScheduler}
+import org.apache.mesos._
+import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
+
+import org.apache.spark.{SparkException, Utils, Logging, SparkContext}
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.collection.JavaConversions._
+import java.io.File
+import org.apache.spark.scheduler.cluster._
+import java.util.{ArrayList => JArrayList, List => JList}
+import java.util.Collections
+import org.apache.spark.TaskState
+
+/**
+ * A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a
+ * separate Mesos task, allowing multiple applications to share cluster nodes both in space (tasks
+ * from multiple apps can run on different cores) and in time (a core can switch ownership).
+ */
+private[spark] class MesosSchedulerBackend(
+    scheduler: ClusterScheduler,
+    sc: SparkContext,
+    master: String,
+    appName: String)
+  extends SchedulerBackend
+  with MScheduler
+  with Logging {
+
+  // Lock used to wait for scheduler to be registered
+  var isRegistered = false
+  val registeredLock = new Object()
+
+  // Driver for talking to Mesos
+  var driver: SchedulerDriver = null
+
+  // Which slave IDs we have executors on
+  val slaveIdsWithExecutors = new HashSet[String]
+  val taskIdToSlaveId = new HashMap[Long, String]
+
+  // An ExecutorInfo for our tasks
+  var execArgs: Array[Byte] = null
+
+  var classLoader: ClassLoader = null
+
+  override def start() {
+    synchronized {
+      classLoader = Thread.currentThread.getContextClassLoader
+
+      new Thread("MesosSchedulerBackend driver") {
+        setDaemon(true)
+        override def run() {
+          val scheduler = MesosSchedulerBackend.this
+          val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(appName).build()
+          driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
+          try {
+            val ret = driver.run()
+            logInfo("driver.run() returned with code " + ret)
+          } catch {
+            case e: Exception => logError("driver.run() failed", e)
+          }
+        }
+      }.start()
+
+      waitForRegister()
+    }
+  }
+
+  def createExecutorInfo(execId: String): ExecutorInfo = {
+    val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException(
+      "Spark home is not set; set it through the spark.home system " +
+      "property, the SPARK_HOME environment variable or the SparkContext constructor"))
+    val environment = Environment.newBuilder()
+    sc.executorEnvs.foreach { case (key, value) =>
+      environment.addVariables(Environment.Variable.newBuilder()
+        .setName(key)
+        .setValue(value)
+        .build())
+    }
+    val command = CommandInfo.newBuilder()
+      .setEnvironment(environment)
+    val uri = System.getProperty("spark.executor.uri")
+    if (uri == null) {
+      command.setValue(new File(sparkHome, "spark-executor").getCanonicalPath)
+    } else {
+      // Grab everything to the first '.'. We'll use that and '*' to
+      // glob the directory "correctly".
+      val basename = uri.split('/').last.split('.').head
+      command.setValue("cd %s*; ./spark-executor".format(basename))
+      command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
+    }
+    val memory = Resource.newBuilder()
+      .setName("mem")
+      .setType(Value.Type.SCALAR)
+      .setScalar(Value.Scalar.newBuilder().setValue(executorMemory).build())
+      .build()
+    ExecutorInfo.newBuilder()
+      .setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
+      .setCommand(command)
+      .setData(ByteString.copyFrom(createExecArg()))
+      .addResources(memory)
+      .build()
+  }
+
+  /**
+   * Create and serialize the executor argument to pass to Mesos. Our executor arg is an array
+   * containing all the spark.* system properties in the form of (String, String) pairs.
+   */
+  private def createExecArg(): Array[Byte] = {
+    if (execArgs == null) {
+      val props = new HashMap[String, String]
+      val iterator = System.getProperties.entrySet.iterator
+      while (iterator.hasNext) {
+        val entry = iterator.next
+        val (key, value) = (entry.getKey.toString, entry.getValue.toString)
+        if (key.startsWith("spark.")) {
+          props(key) = value
+        }
+      }
+      // Serialize the map as an array of (String, String) pairs
+      execArgs = Utils.serialize(props.toArray)
+    }
+    return execArgs
+  }
+
+  private def setClassLoader(): ClassLoader = {
+    val oldClassLoader = Thread.currentThread.getContextClassLoader
+    Thread.currentThread.setContextClassLoader(classLoader)
+    return oldClassLoader
+  }
+
+  private def restoreClassLoader(oldClassLoader: ClassLoader) {
+    Thread.currentThread.setContextClassLoader(oldClassLoader)
+  }
+
+  override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
+
+  override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
+    val oldClassLoader = setClassLoader()
+    try {
+      logInfo("Registered as framework ID " + frameworkId.getValue)
+      registeredLock.synchronized {
+        isRegistered = true
+        registeredLock.notifyAll()
+      }
+    } finally {
+      restoreClassLoader(oldClassLoader)
+    }
+  }
+
+  def waitForRegister() {
+    registeredLock.synchronized {
+      while (!isRegistered) {
+        registeredLock.wait()
+      }
+    }
+  }
+
+  override def disconnected(d: SchedulerDriver) {}
+
+  override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {}
+
+  /**
+   * Method called by Mesos to offer resources on slaves. We resond by asking our active task sets
+   * for tasks in order of priority. We fill each node with tasks in a round-robin manner so that
+   * tasks are balanced across the cluster.
+   */
+  override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
+    val oldClassLoader = setClassLoader()
+    try {
+      synchronized {
+        // Build a big list of the offerable workers, and remember their indices so that we can
+        // figure out which Offer to reply to for each worker
+        val offerableIndices = new ArrayBuffer[Int]
+        val offerableWorkers = new ArrayBuffer[WorkerOffer]
+
+        def enoughMemory(o: Offer) = {
+          val mem = getResource(o.getResourcesList, "mem")
+          val slaveId = o.getSlaveId.getValue
+          mem >= executorMemory || slaveIdsWithExecutors.contains(slaveId)
+        }
+
+        for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) {
+          offerableIndices += index
+          offerableWorkers += new WorkerOffer(
+            offer.getSlaveId.getValue,
+            offer.getHostname,
+            getResource(offer.getResourcesList, "cpus").toInt)
+        }
+
+        // Call into the ClusterScheduler
+        val taskLists = scheduler.resourceOffers(offerableWorkers)
+
+        // Build a list of Mesos tasks for each slave
+        val mesosTasks = offers.map(o => Collections.emptyList[MesosTaskInfo]())
+        for ((taskList, index) <- taskLists.zipWithIndex) {
+          if (!taskList.isEmpty) {
+            val offerNum = offerableIndices(index)
+            val slaveId = offers(offerNum).getSlaveId.getValue
+            slaveIdsWithExecutors += slaveId
+            mesosTasks(offerNum) = new JArrayList[MesosTaskInfo](taskList.size)
+            for (taskDesc <- taskList) {
+              taskIdToSlaveId(taskDesc.taskId) = slaveId
+              mesosTasks(offerNum).add(createMesosTask(taskDesc, slaveId))
+            }
+          }
+        }
+
+        // Reply to the offers
+        val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?
+        for (i <- 0 until offers.size) {
+          d.launchTasks(offers(i).getId, mesosTasks(i), filters)
+        }
+      }
+    } finally {
+      restoreClassLoader(oldClassLoader)
+    }
+  }
+
+  /** Helper function to pull out a resource from a Mesos Resources protobuf */
+  def getResource(res: JList[Resource], name: String): Double = {
+    for (r <- res if r.getName == name) {
+      return r.getScalar.getValue
+    }
+    // If we reached here, no resource with the required name was present
+    throw new IllegalArgumentException("No resource called " + name + " in " + res)
+  }
+
+  /** Turn a Spark TaskDescription into a Mesos task */
+  def createMesosTask(task: TaskDescription, slaveId: String): MesosTaskInfo = {
+    val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build()
+    val cpuResource = Resource.newBuilder()
+      .setName("cpus")
+      .setType(Value.Type.SCALAR)
+      .setScalar(Value.Scalar.newBuilder().setValue(1).build())
+      .build()
+    return MesosTaskInfo.newBuilder()
+      .setTaskId(taskId)
+      .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
+      .setExecutor(createExecutorInfo(slaveId))
+      .setName(task.name)
+      .addResources(cpuResource)
+      .setData(ByteString.copyFrom(task.serializedTask))
+      .build()
+  }
+
+  /** Check whether a Mesos task state represents a finished task */
+  def isFinished(state: MesosTaskState) = {
+    state == MesosTaskState.TASK_FINISHED ||
+      state == MesosTaskState.TASK_FAILED ||
+      state == MesosTaskState.TASK_KILLED ||
+      state == MesosTaskState.TASK_LOST
+  }
+
+  override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
+    val oldClassLoader = setClassLoader()
+    try {
+      val tid = status.getTaskId.getValue.toLong
+      val state = TaskState.fromMesos(status.getState)
+      synchronized {
+        if (status.getState == MesosTaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) {
+          // We lost the executor on this slave, so remember that it's gone
+          slaveIdsWithExecutors -= taskIdToSlaveId(tid)
+        }
+        if (isFinished(status.getState)) {
+          taskIdToSlaveId.remove(tid)
+        }
+      }
+      scheduler.statusUpdate(tid, state, status.getData.asReadOnlyByteBuffer)
+    } finally {
+      restoreClassLoader(oldClassLoader)
+    }
+  }
+
+  override def error(d: SchedulerDriver, message: String) {
+    val oldClassLoader = setClassLoader()
+    try {
+      logError("Mesos error: " + message)
+      scheduler.error(message)
+    } finally {
+      restoreClassLoader(oldClassLoader)
+    }
+  }
+
+  override def stop() {
+    if (driver != null) {
+      driver.stop()
+    }
+  }
+
+  override def reviveOffers() {
+    driver.reviveOffers()
+  }
+
+  override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
+
+  private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) {
+    val oldClassLoader = setClassLoader()
+    try {
+      logInfo("Mesos slave lost: " + slaveId.getValue)
+      synchronized {
+        slaveIdsWithExecutors -= slaveId.getValue
+      }
+      scheduler.executorLost(slaveId.getValue, reason)
+    } finally {
+      restoreClassLoader(oldClassLoader)
+    }
+  }
+
+  override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
+    recordSlaveLost(d, slaveId, SlaveLost())
+  }
+
+  override def executorLost(d: SchedulerDriver, executorId: ExecutorID,
+                            slaveId: SlaveID, status: Int) {
+    logInfo("Executor lost: %s, marking slave %s as lost".format(executorId.getValue,
+                                                                 slaveId.getValue))
+    recordSlaveLost(d, slaveId, ExecutorExited(status))
+  }
+
+  // TODO: query Mesos for number of cores
+  override def defaultParallelism() = System.getProperty("spark.default.parallelism", "8").toInt
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
new file mode 100644
index 0000000..160cca4
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.{EOFException, InputStream, OutputStream}
+import java.nio.ByteBuffer
+
+import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
+
+import org.apache.spark.util.{NextIterator, ByteBufferInputStream}
+
+
+/**
+ * A serializer. Because some serialization libraries are not thread safe, this class is used to
+ * create [[org.apache.spark.serializer.SerializerInstance]] objects that do the actual serialization and are
+ * guaranteed to only be called from one thread at a time.
+ */
+trait Serializer {
+  def newInstance(): SerializerInstance
+}
+
+
+/**
+ * An instance of a serializer, for use by one thread at a time.
+ */
+trait SerializerInstance {
+  def serialize[T](t: T): ByteBuffer
+
+  def deserialize[T](bytes: ByteBuffer): T
+
+  def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T
+
+  def serializeStream(s: OutputStream): SerializationStream
+
+  def deserializeStream(s: InputStream): DeserializationStream
+
+  def serializeMany[T](iterator: Iterator[T]): ByteBuffer = {
+    // Default implementation uses serializeStream
+    val stream = new FastByteArrayOutputStream()
+    serializeStream(stream).writeAll(iterator)
+    val buffer = ByteBuffer.allocate(stream.position.toInt)
+    buffer.put(stream.array, 0, stream.position.toInt)
+    buffer.flip()
+    buffer
+  }
+
+  def deserializeMany(buffer: ByteBuffer): Iterator[Any] = {
+    // Default implementation uses deserializeStream
+    buffer.rewind()
+    deserializeStream(new ByteBufferInputStream(buffer)).asIterator
+  }
+}
+
+
+/**
+ * A stream for writing serialized objects.
+ */
+trait SerializationStream {
+  def writeObject[T](t: T): SerializationStream
+  def flush(): Unit
+  def close(): Unit
+
+  def writeAll[T](iter: Iterator[T]): SerializationStream = {
+    while (iter.hasNext) {
+      writeObject(iter.next())
+    }
+    this
+  }
+}
+
+
+/**
+ * A stream for reading serialized objects.
+ */
+trait DeserializationStream {
+  def readObject[T](): T
+  def close(): Unit
+
+  /**
+   * Read the elements of this stream through an iterator. This can only be called once, as
+   * reading each element will consume data from the input source.
+   */
+  def asIterator: Iterator[Any] = new NextIterator[Any] {
+    override protected def getNext() = {
+      try {
+        readObject[Any]()
+      } catch {
+        case eof: EOFException =>
+          finished = true
+      }
+    }
+
+    override protected def close() {
+      DeserializationStream.this.close()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
new file mode 100644
index 0000000..2955986
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.util.concurrent.ConcurrentHashMap
+
+
+/**
+ * A service that returns a serializer object given the serializer's class name. If a previous
+ * instance of the serializer object has been created, the get method returns that instead of
+ * creating a new one.
+ */
+private[spark] class SerializerManager {
+
+  private val serializers = new ConcurrentHashMap[String, Serializer]
+  private var _default: Serializer = _
+
+  def default = _default
+
+  def setDefault(clsName: String): Serializer = {
+    _default = get(clsName)
+    _default
+  }
+
+  def get(clsName: String): Serializer = {
+    if (clsName == null) {
+      default
+    } else {
+      var serializer = serializers.get(clsName)
+      if (serializer != null) {
+        // If the serializer has been created previously, reuse that.
+        serializer
+      } else this.synchronized {
+        // Otherwise, create a new one. But make sure no other thread has attempted
+        // to create another new one at the same time.
+        serializer = serializers.get(clsName)
+        if (serializer == null) {
+          val clsLoader = Thread.currentThread.getContextClassLoader
+          serializer =
+            Class.forName(clsName, true, clsLoader).newInstance().asInstanceOf[Serializer]
+          serializers.put(clsName, serializer)
+        }
+        serializer
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/storage/BlockException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockException.scala b/core/src/main/scala/org/apache/spark/storage/BlockException.scala
new file mode 100644
index 0000000..290dbce
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/storage/BlockException.scala
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+private[spark]
+case class BlockException(blockId: String, message: String) extends Exception(message)
+

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/storage/BlockFetchTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetchTracker.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetchTracker.scala
new file mode 100644
index 0000000..2e0b0e6
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/storage/BlockFetchTracker.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+private[spark] trait BlockFetchTracker {
+  def totalBlocks : Int
+  def numLocalBlocks: Int
+  def numRemoteBlocks: Int
+  def remoteFetchTime : Long
+  def fetchWaitTime: Long
+  def remoteBytesRead : Long
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
new file mode 100644
index 0000000..c91f0fc
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.nio.ByteBuffer
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.HashSet
+import scala.collection.mutable.Queue
+
+import io.netty.buffer.ByteBuf
+
+import org.apache.spark.Logging
+import org.apache.spark.Utils
+import org.apache.spark.SparkException
+import org.apache.spark.network.BufferMessage
+import org.apache.spark.network.ConnectionManagerId
+import org.apache.spark.network.netty.ShuffleCopier
+import org.apache.spark.serializer.Serializer
+
+
+/**
+ * A block fetcher iterator interface. There are two implementations:
+ *
+ * BasicBlockFetcherIterator: uses a custom-built NIO communication layer.
+ * NettyBlockFetcherIterator: uses Netty (OIO) as the communication layer.
+ *
+ * Eventually we would like the two to converge and use a single NIO-based communication layer,
+ * but extensive tests show that under some circumstances (e.g. large shuffles with lots of cores),
+ * NIO would perform poorly and thus the need for the Netty OIO one.
+ */
+
+private[storage]
+trait BlockFetcherIterator extends Iterator[(String, Option[Iterator[Any]])]
+  with Logging with BlockFetchTracker {
+  def initialize()
+}
+
+
+private[storage]
+object BlockFetcherIterator {
+
+  // A request to fetch one or more blocks, complete with their sizes
+  class FetchRequest(val address: BlockManagerId, val blocks: Seq[(String, Long)]) {
+    val size = blocks.map(_._2).sum
+  }
+
+  // A result of a fetch. Includes the block ID, size in bytes, and a function to deserialize
+  // the block (since we want all deserializaton to happen in the calling thread); can also
+  // represent a fetch failure if size == -1.
+  class FetchResult(val blockId: String, val size: Long, val deserialize: () => Iterator[Any]) {
+    def failed: Boolean = size == -1
+  }
+
+  class BasicBlockFetcherIterator(
+      private val blockManager: BlockManager,
+      val blocksByAddress: Seq[(BlockManagerId, Seq[(String, Long)])],
+      serializer: Serializer)
+    extends BlockFetcherIterator {
+
+    import blockManager._
+
+    private var _remoteBytesRead = 0l
+    private var _remoteFetchTime = 0l
+    private var _fetchWaitTime = 0l
+
+    if (blocksByAddress == null) {
+      throw new IllegalArgumentException("BlocksByAddress is null")
+    }
+
+    // Total number blocks fetched (local + remote). Also number of FetchResults expected
+    protected var _numBlocksToFetch = 0
+
+    protected var startTime = System.currentTimeMillis
+
+    // This represents the number of local blocks, also counting zero-sized blocks
+    private var numLocal = 0
+    // BlockIds for local blocks that need to be fetched. Excludes zero-sized blocks
+    protected val localBlocksToFetch = new ArrayBuffer[String]()
+
+    // This represents the number of remote blocks, also counting zero-sized blocks
+    private var numRemote = 0
+    // BlockIds for remote blocks that need to be fetched. Excludes zero-sized blocks
+    protected val remoteBlocksToFetch = new HashSet[String]()
+
+    // A queue to hold our results.
+    protected val results = new LinkedBlockingQueue[FetchResult]
+
+    // Queue of fetch requests to issue; we'll pull requests off this gradually to make sure that
+    // the number of bytes in flight is limited to maxBytesInFlight
+    private val fetchRequests = new Queue[FetchRequest]
+
+    // Current bytes in flight from our requests
+    private var bytesInFlight = 0L
+
+    protected def sendRequest(req: FetchRequest) {
+      logDebug("Sending request for %d blocks (%s) from %s".format(
+        req.blocks.size, Utils.bytesToString(req.size), req.address.hostPort))
+      val cmId = new ConnectionManagerId(req.address.host, req.address.port)
+      val blockMessageArray = new BlockMessageArray(req.blocks.map {
+        case (blockId, size) => BlockMessage.fromGetBlock(GetBlock(blockId))
+      })
+      bytesInFlight += req.size
+      val sizeMap = req.blocks.toMap  // so we can look up the size of each blockID
+      val fetchStart = System.currentTimeMillis()
+      val future = connectionManager.sendMessageReliably(cmId, blockMessageArray.toBufferMessage)
+      future.onSuccess {
+        case Some(message) => {
+          val fetchDone = System.currentTimeMillis()
+          _remoteFetchTime += fetchDone - fetchStart
+          val bufferMessage = message.asInstanceOf[BufferMessage]
+          val blockMessageArray = BlockMessageArray.fromBufferMessage(bufferMessage)
+          for (blockMessage <- blockMessageArray) {
+            if (blockMessage.getType != BlockMessage.TYPE_GOT_BLOCK) {
+              throw new SparkException(
+                "Unexpected message " + blockMessage.getType + " received from " + cmId)
+            }
+            val blockId = blockMessage.getId
+            val networkSize = blockMessage.getData.limit()
+            results.put(new FetchResult(blockId, sizeMap(blockId),
+              () => dataDeserialize(blockId, blockMessage.getData, serializer)))
+            _remoteBytesRead += networkSize
+            logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime))
+          }
+        }
+        case None => {
+          logError("Could not get block(s) from " + cmId)
+          for ((blockId, size) <- req.blocks) {
+            results.put(new FetchResult(blockId, -1, null))
+          }
+        }
+      }
+    }
+
+    protected def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = {
+      // Split local and remote blocks. Remote blocks are further split into FetchRequests of size
+      // at most maxBytesInFlight in order to limit the amount of data in flight.
+      val remoteRequests = new ArrayBuffer[FetchRequest]
+      for ((address, blockInfos) <- blocksByAddress) {
+        if (address == blockManagerId) {
+          numLocal = blockInfos.size
+          // Filter out zero-sized blocks
+          localBlocksToFetch ++= blockInfos.filter(_._2 != 0).map(_._1)
+          _numBlocksToFetch += localBlocksToFetch.size
+        } else {
+          numRemote += blockInfos.size
+          // Make our requests at least maxBytesInFlight / 5 in length; the reason to keep them
+          // smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5
+          // nodes, rather than blocking on reading output from one node.
+          val minRequestSize = math.max(maxBytesInFlight / 5, 1L)
+          logInfo("maxBytesInFlight: " + maxBytesInFlight + ", minRequest: " + minRequestSize)
+          val iterator = blockInfos.iterator
+          var curRequestSize = 0L
+          var curBlocks = new ArrayBuffer[(String, Long)]
+          while (iterator.hasNext) {
+            val (blockId, size) = iterator.next()
+            // Skip empty blocks
+            if (size > 0) {
+              curBlocks += ((blockId, size))
+              remoteBlocksToFetch += blockId
+              _numBlocksToFetch += 1
+              curRequestSize += size
+            } else if (size < 0) {
+              throw new BlockException(blockId, "Negative block size " + size)
+            }
+            if (curRequestSize >= minRequestSize) {
+              // Add this FetchRequest
+              remoteRequests += new FetchRequest(address, curBlocks)
+              curRequestSize = 0
+              curBlocks = new ArrayBuffer[(String, Long)]
+            }
+          }
+          // Add in the final request
+          if (!curBlocks.isEmpty) {
+            remoteRequests += new FetchRequest(address, curBlocks)
+          }
+        }
+      }
+      logInfo("Getting " + _numBlocksToFetch + " non-zero-bytes blocks out of " +
+        totalBlocks + " blocks")
+      remoteRequests
+    }
+
+    protected def getLocalBlocks() {
+      // Get the local blocks while remote blocks are being fetched. Note that it's okay to do
+      // these all at once because they will just memory-map some files, so they won't consume
+      // any memory that might exceed our maxBytesInFlight
+      for (id <- localBlocksToFetch) {
+        getLocalFromDisk(id, serializer) match {
+          case Some(iter) => {
+            // Pass 0 as size since it's not in flight
+            results.put(new FetchResult(id, 0, () => iter))
+            logDebug("Got local block " + id)
+          }
+          case None => {
+            throw new BlockException(id, "Could not get block " + id + " from local machine")
+          }
+        }
+      }
+    }
+
+    override def initialize() {
+      // Split local and remote blocks.
+      val remoteRequests = splitLocalRemoteBlocks()
+      // Add the remote requests into our queue in a random order
+      fetchRequests ++= Utils.randomize(remoteRequests)
+
+      // Send out initial requests for blocks, up to our maxBytesInFlight
+      while (!fetchRequests.isEmpty &&
+        (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
+        sendRequest(fetchRequests.dequeue())
+      }
+
+      val numGets = remoteRequests.size - fetchRequests.size
+      logInfo("Started " + numGets + " remote gets in " + Utils.getUsedTimeMs(startTime))
+
+      // Get Local Blocks
+      startTime = System.currentTimeMillis
+      getLocalBlocks()
+      logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms")
+    }
+
+    //an iterator that will read fetched blocks off the queue as they arrive.
+    @volatile protected var resultsGotten = 0
+
+    override def hasNext: Boolean = resultsGotten < _numBlocksToFetch
+
+    override def next(): (String, Option[Iterator[Any]]) = {
+      resultsGotten += 1
+      val startFetchWait = System.currentTimeMillis()
+      val result = results.take()
+      val stopFetchWait = System.currentTimeMillis()
+      _fetchWaitTime += (stopFetchWait - startFetchWait)
+      if (! result.failed) bytesInFlight -= result.size
+      while (!fetchRequests.isEmpty &&
+        (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
+        sendRequest(fetchRequests.dequeue())
+      }
+      (result.blockId, if (result.failed) None else Some(result.deserialize()))
+    }
+
+    // Implementing BlockFetchTracker trait.
+    override def totalBlocks: Int = numLocal + numRemote
+    override def numLocalBlocks: Int = numLocal
+    override def numRemoteBlocks: Int = numRemote
+    override def remoteFetchTime: Long = _remoteFetchTime
+    override def fetchWaitTime: Long = _fetchWaitTime
+    override def remoteBytesRead: Long = _remoteBytesRead
+  }
+  // End of BasicBlockFetcherIterator
+
+  class NettyBlockFetcherIterator(
+      blockManager: BlockManager,
+      blocksByAddress: Seq[(BlockManagerId, Seq[(String, Long)])],
+      serializer: Serializer)
+    extends BasicBlockFetcherIterator(blockManager, blocksByAddress, serializer) {
+
+    import blockManager._
+
+    val fetchRequestsSync = new LinkedBlockingQueue[FetchRequest]
+
+    private def startCopiers(numCopiers: Int): List[_ <: Thread] = {
+      (for ( i <- Range(0,numCopiers) ) yield {
+        val copier = new Thread {
+          override def run(){
+            try {
+              while(!isInterrupted && !fetchRequestsSync.isEmpty) {
+                sendRequest(fetchRequestsSync.take())
+              }
+            } catch {
+              case x: InterruptedException => logInfo("Copier Interrupted")
+              //case _ => throw new SparkException("Exception Throw in Shuffle Copier")
+            }
+          }
+        }
+        copier.start
+        copier
+      }).toList
+    }
+
+    // keep this to interrupt the threads when necessary
+    private def stopCopiers() {
+      for (copier <- copiers) {
+        copier.interrupt()
+      }
+    }
+
+    override protected def sendRequest(req: FetchRequest) {
+
+      def putResult(blockId: String, blockSize: Long, blockData: ByteBuf) {
+        val fetchResult = new FetchResult(blockId, blockSize,
+          () => dataDeserialize(blockId, blockData.nioBuffer, serializer))
+        results.put(fetchResult)
+      }
+
+      logDebug("Sending request for %d blocks (%s) from %s".format(
+        req.blocks.size, Utils.bytesToString(req.size), req.address.host))
+      val cmId = new ConnectionManagerId(req.address.host, req.address.nettyPort)
+      val cpier = new ShuffleCopier
+      cpier.getBlocks(cmId, req.blocks, putResult)
+      logDebug("Sent request for remote blocks " + req.blocks + " from " + req.address.host )
+    }
+
+    private var copiers: List[_ <: Thread] = null
+
+    override def initialize() {
+      // Split Local Remote Blocks and set numBlocksToFetch
+      val remoteRequests = splitLocalRemoteBlocks()
+      // Add the remote requests into our queue in a random order
+      for (request <- Utils.randomize(remoteRequests)) {
+        fetchRequestsSync.put(request)
+      }
+
+      copiers = startCopiers(System.getProperty("spark.shuffle.copier.threads", "6").toInt)
+      logInfo("Started " + fetchRequestsSync.size + " remote gets in " +
+        Utils.getUsedTimeMs(startTime))
+
+      // Get Local Blocks
+      startTime = System.currentTimeMillis
+      getLocalBlocks()
+      logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms")
+    }
+
+    override def next(): (String, Option[Iterator[Any]]) = {
+      resultsGotten += 1
+      val result = results.take()
+      // If all the results has been retrieved, copiers will exit automatically
+      (result.blockId, if (result.failed) None else Some(result.deserialize()))
+    }
+  }
+  // End of NettyBlockFetcherIterator
+}